liangyepianzhou commented on code in PR #21270:
URL: https://github.com/apache/pulsar/pull/21270#discussion_r1372662378


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> 
readAllExistingMessages(Reader<T> reader) {
         AtomicLong messagesRead = new AtomicLong();
 
         CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        readAllExistingMessages(reader, future, startTime, messagesRead);
+        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new HashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
+            });
+            readAllExistingMessages(reader, future, startTime, messagesRead, 
maxMessageIds);

Review Comment:
   We need to check whether all the partitions have been read completely. So, 
we need to do an update operation to the HashMap after a partition has been 
read completely. This operation could be removing this partition from the map 
or adding a flag to mark that the partition has been read completely.
   How about changing it to `ConcurrentHashMap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to