liangyepianzhou commented on code in PR #21170:
URL: https://github.com/apache/pulsar/pull/21170#discussion_r1335323440
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -235,20 +238,33 @@ private CompletableFuture<Reader<T>>
readAllExistingMessages(Reader<T> reader) {
AtomicLong messagesRead = new AtomicLong();
CompletableFuture<Reader<T>> future = new CompletableFuture<>();
- readAllExistingMessages(reader, future, startTime, messagesRead);
+ reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+ Optional<TopicMessageId> optionalTopicMessageId =
lastMessageIds.stream().max(Comparator.naturalOrder());
Review Comment:
> I think that the algorithm should store every last message from each
partition (first issue). Then the stop mechanism should be based on the exact
message delivery.
I totally agree. We only ensure that messages in the same partition are
ordered, so the strategy of only judging the largest message ID is wrong.
Thanks for your reminder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]