Technoboy- opened a new pull request #14246:
URL: https://github.com/apache/pulsar/pull/14246


   ### Motivation
   For #13383 fix batch message ack issue, but recently users find that there 
may exist the unacked-count be negative which will print the log :
   
https://github.com/apache/pulsar/blob/ca64c67acac41c34edf15ffa6ca8b5beac6f99ea/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L932-L934
   
   Then we use the below test to reproduce the case :
   ```
       @Test
       public void testNegativeAcks()
               throws Exception {
           String topic = BrokerTestUtil.newUniqueName("testNegativeAcks");
   
           @Cleanup
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("sub1")
                   .subscriptionType(SubscriptionType.Shared)
                   .enableBatchIndexAcknowledgment(true)
                   .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
                   .ackTimeout(1000, TimeUnit.MILLISECONDS)
                   .subscribe();
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .enableBatching(true)
                   .batchingMaxMessages(10)
                   .create();
   
           Set<String> sentMessages = new HashSet<>();
   
           final int N = 10000;
           for (int i = 0; i < N; i++) {
               String value = "test-" + i;
               producer.sendAsync(value);
               sentMessages.add(value);
           }
           producer.flush();
   
           // group1
           for (int i = 0; i < 50; i++) {
               Message<String> msg = consumer.receive();
               if (i % 2 == 0) {
                   consumer.acknowledgeAsync(msg);
               } else {
                   consumer.negativeAcknowledge(msg);
               }
           }
           // group2
           for (int i = 0; i < 50; i++) {
               Message<String> msg = consumer.receive();
               if (i % 2 == 0) {
                   consumer.acknowledgeAsync(msg);
               } else {
                   consumer.negativeAcknowledge(msg);
               }
           }
           // group3
           for (int i = 0; i < 10; i++) {
               Message<String> msg = consumer.receive();
               if (i % 2 == 0) {
                   consumer.acknowledgeAsync(msg);
               } else {
                   consumer.negativeAcknowledge(msg);
               }
           }
           Message<String> receive;
           do{
               receive = consumer.receive(20, TimeUnit.SECONDS);
               consumer.acknowledgeAsync(receive);
           } while (receive != null);
       }
   ```
   
   Because when redeliver occurs, ack bitset does not bring back to the broker, 
we can't calculate how many ack-msg we received. So in this case, the unack-msg 
count may be negative.  But nothing impact. 
   To reduce user confusion, suggest deleting the log now.
   Later we may find a good way to resolve the redelivering case. 
   
   ### Documentation
   
     
   - [x] `no-need-doc` 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to