codelipenghui commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059252347
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
+ // Key is the ledger id and the entry id, entry is the acker that
represents which single messages are acknowledged
+ private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker>
batchMessageToAcker =
Review Comment:
> Why should we clean up the batchMessageToAcker after a seek operation?
The seek API will lead the consumer to consume from a new start message ID.
It can be an earlier position.
Suppose you have 3 batch messages.
0:(0,1,2,3),
1:(0,1,2),
2:(0,1,2,3,4,5)
The message 0, 1, and 2:(0,2,4) are acknowledged. Then the consumer seeks to
the earliest position.
Then the consumer will receive messages 0, 1, and 2 again.
Due to we have 2:(0,2,4) in the `batchMessageToAcker` and If only 2:(1,3,5)
is acked after the seek operation.
But the client will ack the whole message 2.
From a user perspective, it can be a data loss. We should guarantee the
at-lease-once semantic after the seek operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]