BewareMyPower commented on code in PR #19031:
URL: https://github.com/apache/pulsar/pull/19031#discussion_r1059323030
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -204,6 +208,10 @@
private final AtomicReference<ClientCnx>
clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new
CopyOnWriteArrayList<Throwable>();
+ // Key is the ledger id and the entry id, entry is the acker that
represents which single messages are acknowledged
+ private final ConcurrentNavigableMap<Pair<Long, Long>, BatchMessageAcker>
batchMessageToAcker =
Review Comment:
I think it's a very hacky to acknowledge the rest messages in a batch after
seek. Just like the example I mentioned, if you think that's message lost,
users can easily simulate the **message lost** like:
```java
private Consumer<String> subscribe(String topic) throws Exception {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.subscribe();
}
@Test
public void testAckAfterSeek() throws Exception {
final String topic = "testAckAfterSeek";
Consumer<String> consumer = subscribe(topic);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batchingMaxPublishDelay(Long.MAX_VALUE,
TimeUnit.MILLISECONDS)
.create();
final int messages = 10;
for (int i = 0; i < messages; i++) {
producer.sendAsync("New message - " + i);
}
producer.flush();
final List<MessageId> messageIdList = new ArrayList<>();
for (int i = 0; i < messages; i++) {
final MessageId messageId = consumer.receive().getMessageId();
messageIdList.add(messageId);
if (i != 4) {
consumer.acknowledge(messageId);
}
}
consumer.seek(MessageId.earliest);
Thread.sleep(100);
consumer.acknowledge(messageIdList.get(4));
consumer.close();
consumer = subscribe(topic);
final Message<String> message = consumer.receive(1,
TimeUnit.SECONDS);
// NOTE: message is null
```
How could you explain to users that you should not acknowledge
`messageIdList.get(4)`, otherwise message lost will happen? BTW, if users left
one message ID (`messageIdList.get(4)`) to acknowledge after `seek` and they
didn't acknowledge other message IDs, what will they expect? Did they expect
messages could be received again?
In short, I think we should not handle this corner case. It's more like a
"bug" on purpose.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]