robshep commented on issue #8429:
URL: https://github.com/apache/pulsar/issues/8429#issuecomment-723861562
Hi, thanks for taking a look.
There are 3 topics in use in this deployment.
All have schemas.
None of this code, or the Payload objects have changed for many months.
I notice in App1 there is another topic `-RETRY` created presumably to
handle redelivery. I don't maintain a schema here.
The log above is from restarting `App1` below but I have seen the same issue
with the "Null Schema" also when restarting `App3` also.
**App1:**
_1 x consumer, 1 x producer_
```
@Bean(destroyMethod = "close")
@Autowired
public Consumer<UpdateMessageV1> updateListener(PulsarUpdateListener
pulsarUpdateListener) throws PulsarClientException {
return
pulsarClient().newConsumer(AvroSchema.of(UpdateMessageV1.class))
.messageListener(pulsarUpdateListener)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionName("app-tp-updater")
.negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // try
again after 20 seconds
.topic("persistent://myorg/myappB/update")
.enableRetry(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
}
@Bean(destroyMethod = "close")
public Producer notifyExpiryProducer() throws PulsarClientException {
return
pulsarClient().newProducer(AvroSchema.of(ExpiryNotificationV1.class))
.topic("persistent://myorg/myappB/notify:expiry")
.create();
}
```
**App 2:**
_1 x producer_
```
final Producer<IngressMessageV1> producer =
pulsarClient.newProducer(AvroSchema.of(IngressMessageV1.class))
.topic("persistent://myorg/myappA/ingress")
.create();
```
**App3:**
_1 x producer, 2 x consumer_
```
producer = pulsarClient.newProducer(AvroSchema.of(UpdateMessageV1.class))
.producerName("myapp-tp-update-consumer")
.topic("persistent://myorg/myappB/update")
.create();
consumerTPNotifsV1 =
pulsarClient.newConsumer(AvroSchema.of(ExpiryNotificationV1.class))
.subscriptionName("app-tp-expiry")
.topic( "persistent://myorg/appB/notify:expiry")
.subscribe();
consumerIngressV1 =
pulsarClient.newConsumer(AvroSchema.of(IngressMessageV1.class))
.subscriptionName("myapp-ingress")
.topic("persistent://myorg/myappA/ingress")
.subscribe();
```
Thanks
Rob
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]