robshep commented on issue #8429:
URL: https://github.com/apache/pulsar/issues/8429#issuecomment-723861562


   Hi, thanks for taking a look. 
   There are 3 topics in use in this deployment. 
   All have schemas. 
   None of this code, or the Payload objects have changed for many months.
   
   I notice in App1 there is another topic `-RETRY` created presumably to 
handle redelivery.  I don't maintain a schema here. 
   
   The log above is from restarting `App1` below but I have seen the same issue 
with the "Null Schema" also when restarting `App3` also. 
   
   **App1:**
   _1 x consumer, 1 x producer_
   
   ```
   @Bean(destroyMethod = "close")
   @Autowired
   public Consumer<UpdateMessageV1> updateListener(PulsarUpdateListener 
pulsarUpdateListener) throws PulsarClientException {
           return 
pulsarClient().newConsumer(AvroSchema.of(UpdateMessageV1.class))
                   .messageListener(pulsarUpdateListener)
                   .subscriptionMode(SubscriptionMode.Durable)
                   .subscriptionName("app-tp-updater")
                   .negativeAckRedeliveryDelay(20, TimeUnit.SECONDS) // try 
again after 20 seconds
                   .topic("persistent://myorg/myappB/update")
                   .enableRetry(true)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
       }
   
       @Bean(destroyMethod = "close")
       public Producer notifyExpiryProducer() throws PulsarClientException {
           return 
pulsarClient().newProducer(AvroSchema.of(ExpiryNotificationV1.class))
                   .topic("persistent://myorg/myappB/notify:expiry")
                   .create();
       }
   ```
   
   **App 2:**
   _1 x producer_
   ```
   final Producer<IngressMessageV1> producer = 
pulsarClient.newProducer(AvroSchema.of(IngressMessageV1.class))
       .topic("persistent://myorg/myappA/ingress")
       .create();
   ```
   
   
   **App3:**
   _1 x producer, 2 x consumer_
   
   ```
   producer = pulsarClient.newProducer(AvroSchema.of(UpdateMessageV1.class))
                           .producerName("myapp-tp-update-consumer")
                           .topic("persistent://myorg/myappB/update")
                           .create();
   
   consumerTPNotifsV1 = 
pulsarClient.newConsumer(AvroSchema.of(ExpiryNotificationV1.class))
                   .subscriptionName("app-tp-expiry")
                   .topic( "persistent://myorg/appB/notify:expiry")
                   .subscribe();
   
   consumerIngressV1 = 
pulsarClient.newConsumer(AvroSchema.of(IngressMessageV1.class))
                   .subscriptionName("myapp-ingress")
                   .topic("persistent://myorg/myappA/ingress")
                   .subscribe();
   
   ```
   
   
   Thanks
   
   Rob
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to