BewareMyPower commented on issue #6796:
URL: https://github.com/apache/pulsar/pull/6796#issuecomment-617923123


   @merlimat Thanks for your quick reply.
   
   IMO, The cumulative ack only applies to the single partition but not all the 
partitions. See [the implement of Java 
client](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java):
   
   ```java
       protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                       Map<String,Long> 
properties,
                                                       TransactionImpl txnImpl) 
{
           // ...
           if (ackType == AckType.Cumulative) {
               // find the single topic consumer by the topic partition name 
which contains the partition id
               Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicPartitionName());
               if (individualConsumer != null) {
                   // call acknowledgeCumulative() of this single topic consumer
                   MessageId innerId = topicMessageId.getInnerMessageId();
                   return 
individualConsumer.acknowledgeCumulativeAsync(innerId);
               } else {
                   // ...
               }
           } else {
               // ...
           }
       }
   ```
   
   I have tested the Java client, by:
   
   1. Create a 3 partitions topic;
   2. Produce 10 messages in round robin routing mode;
   3. Start a consumer subscribing to this topic, only call 
`acknowledgeCumulative()` after 9 messages received, then stop it;
   4. Restart the consumer with the same subscription.
   
   Here're the output of 1st and 2nd consuming:
   
   ```
   Message: C | topic: persistent://public/default/FooTest-partition-0
   Message: A | topic: persistent://public/default/FooTest-partition-1
   Message: B | topic: persistent://public/default/FooTest-partition-2
   Message: F | topic: persistent://public/default/FooTest-partition-0
   Message: D | topic: persistent://public/default/FooTest-partition-1
   Message: E | topic: persistent://public/default/FooTest-partition-2
   Message: I | topic: persistent://public/default/FooTest-partition-0
   Message: G | topic: persistent://public/default/FooTest-partition-1
   Message: H | topic: persistent://public/default/FooTest-partition-2
   Message: J | topic: persistent://public/default/FooTest-partition-1
   Message: 1 | topic: persistent://public/default/FooTest-partition-2
   acknowledgeCumulative
   ```
   
   ```
   Message: C | topic: persistent://public/default/FooTest-partition-0
   Message: F | topic: persistent://public/default/FooTest-partition-0
   Message: A | topic: persistent://public/default/FooTest-partition-1
   Message: D | topic: persistent://public/default/FooTest-partition-1
   Message: I | topic: persistent://public/default/FooTest-partition-0
   Message: G | topic: persistent://public/default/FooTest-partition-1
   Message: J | topic: persistent://public/default/FooTest-partition-1
   ```
   
   We can see when we `acknowledgeCumulative` the Message `1` of partition-2, 
only the previous messages in partition-2 were acknowledged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to