BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059655770


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
 
     private final AtomicReference<ClientCnx> 
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
     private final List<Throwable> previousExceptions = new 
CopyOnWriteArrayList<Throwable>();
+    // Key is the ledger id and the entry id, entry is the acker that 
represents which single messages are acknowledged
+    private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker> 
batchMessageToAcker =

Review Comment:
   > If users try to cache the messages or just inflight messages, users need 
to guarantee they will not ack the inflight or cached messages before the seek 
operation.
   
   Just like this rule you've mentioned here, when message IDs from 
deserializations are used, there is a rule that all message IDs should be 
acknowledged (for simplicity, it does not count the cumulative ACK case). IMO, 
if users load a set of message IDs and he only acknowledges partial of them 
before seeking, it should be an incorrect use.
   
   > It's not the same case as I provided which is normal usage.
   
   As I've explained above, it's not normal usage when you use message IDs from 
deserializations.
   
   The wrong code users should not write:
   
   ```java
   var id0 = loadMessageId();
   consumer.acknowledge(id0); // batch index: 0, batch size: 2
   var id1 = loadMessageId(); // batch index: 1, batch size: 2
   // User does not acknowledge id1 before seek
   consumer.seek(MessageId.earliest);
   // Instead, user acknowledges the outdated id1 after seek.
   consumer.acknowledge(id1);
   ```
   
   The correct code users should write:
   
   ```java
   var id0 = loadMessageId();
   consumer.acknowledge(id0); // batch index: 0, batch size: 2
   consumer.seek(MessageId.earliest);
   storeMessageId(consumer.receive().getMessageId());  // batch index: 0, batch 
size: 2
   storeMessageId(consumer.receive().getMessageId());  // batch index: 1, batch 
size: 2
   var id1 = loadMessageId(); // batch index: 0, batch size: 2
   var id2 = loadMessageId(); // batch index: 1, batch size: 2
   consumer.acknowledge(id1);
   consumer.acknowledge(id2);
   ```
   
   ----
   
   Regarding the reconnection or ack failure, it's just the same. When 
`acknowledgeAsync` is called on all message IDs of a batch, the cache will be 
removed no matter if the acknowledgment succeeded. If only partial message IDs 
of a batch are acknowledged, when users received messages again, they should 
persist message IDs again and use the new message IDs, including the repeated 
positions, instead of the old message IDs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to