BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059231502


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that 
represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> 
batchMessageToAcker =

Review Comment:
   I didn't get it. A new map entry could only be only added when a message was 
acknowledged.
   
   ```java
       private MessageIdImpl getMessageIdToAcknowledge(BatchMessageIdImpl 
messageId, AckType ackType) {
           final BatchMessageAcker acker;
           if (messageId.getAcker() instanceof BatchMessageAckerDisabled) {
               acker = batchMessageToAcker.computeIfAbsent(
   ```
   
   Why should we clean up the `batchMessageToAcker` after a seek operation?
   
   Regarding the message redelivering or reconnection, if the batch message id 
was not created by deserialization, the acker inside the `BatchMessageIdImpl` 
would not be modified. If we only clean up the `batchMessageToAcker` for 
deserialized batch message id, the behavior would be different.
   
   For example, assuming there are 2 batch message ids (id0 and id1) of the 
same entry and for the following steps:
   1. Acknowledge `id0`
   2. Reconnection
   3. Acknowledge `id1`
   
   If they are retrieved from the deserialization, the entry will not be 
acknowledged because the 2nd step cleared the cache, and the 
`BatchMessageAcker` will be "XO" (`X` represents not acknowledged).
   
   However, if they are just saved from the `message.getMessageId()`, the entry 
will be acknowledged because the shared `BatchMessageAcker` is "OO" and not 
affected by the reconnection.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to