BewareMyPower commented on code in PR #21270:
URL: https://github.com/apache/pulsar/pull/21270#discussion_r1372795495
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -258,13 +258,13 @@ private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Reader<
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
- handleMessage(msg);
- // The message is read one by one in a
single thread,
- // so it's fine that uses a hashmap to store
last message ID.
+ // We need remove the partition form the
maxMessageIds map
Review Comment:
typo: "form" -> "from"
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -258,13 +258,13 @@ private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Reader<
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
- handleMessage(msg);
- // The message is read one by one in a
single thread,
- // so it's fine that uses a hashmap to store
last message ID.
+ // We need remove the partition form the
maxMessageIds map
+ // once the partition has been read
completely.
TopicMessageId maxMessageId =
maxMessageIds.get(msg.getTopicName());
if (maxMessageId != null &&
msg.getMessageId().compareTo(maxMessageId) >= 0) {
maxMessageIds.remove(msg.getTopicName());
}
+ handleMessage(msg);
Review Comment:
Please fix the indent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]