Technoboy- opened a new pull request, #15312:
URL: https://github.com/apache/pulsar/pull/15312

   ### Motivation
   When consumer subscribes to more than one topic, if uses negative API to 
redelivery msg, it will fail with the below error:
   ```
   java.lang.IllegalArgumentException: null
        at 
com.google.common.base.Preconditions.checkArgument(Preconditions.java:131) 
~[guava-31.0.1-jre.jar:?]
        at 
org.apache.pulsar.client.impl.ConsumerImpl.redeliverUnacknowledgedMessages(ConsumerImpl.java:1901)
 ~[classes/:?]
        at 
org.apache.pulsar.client.impl.NegativeAcksTracker.triggerRedelivery(NegativeAcksTracker.java:82)
 ~[classes/:?]
   ```
   
   Because if subscribes with multi-topics, the internal message will be 
`TopicMessageIdImpl` and add to NegativeAcksTracker without conversion to 
MessageIdImpl(user not config with `negativeAckRedeliveryBackoff`):
   
https://github.com/apache/pulsar/blob/35c4b68f586774d0e2b7a3a2a6ab1d6a20ac1452/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java#L87-L112
   
   Then NegativeAcksTracker will trigger redeliver and throw 
IllegalArgumentException at here :
   
https://github.com/apache/pulsar/blob/35c4b68f586774d0e2b7a3a2a6ab1d6a20ac1452/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1896-L1902
   
   ### Documentation
   
   - [x] `no-need-doc` 
   (Please explain why)
   
   
   ### Reproduce Test
   
   ```
   @Test
       @SneakyThrows
       public void testNegativeAcks() {
           String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
           @Cleanup
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("sub1")
                   .subscriptionType(SubscriptionType.Shared)
                   .enableRetry(true)
                   .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
                   .subscribe();
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .create();
   
           producer.sendAsync("test-ack");
           producer.flush();
           while(true) {
               Message<String> msg1 = consumer.receive();
               System.out.println("received : " + msg1);
               consumer.negativeAcknowledge(msg1);
           }
       }
   ```
     


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to