codelipenghui commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1525761529
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -60,6 +60,26 @@ public class TableViewImpl<T> implements TableView<T> {
private final boolean isPersistentTopic;
private TopicCompactionStrategy<T> compactionStrategy;
+ /**
+ * Store the refresh tasks. When read to the position recording in the
right map,
+ * then remove the position in the right map. If the right map is empty,
complete the future in the left.
+ * There should be no timeout exception here, because the caller can only
retry for TimeoutException.
+ * It will only be completed exceptionally when no more messages can be
read.
+ */
+ private final ConcurrentHashMap<CompletableFuture<Void>, Map<String,
TopicMessageId>> refreshRequests;
+
+ /**
+ * This map stored the read position of each partition. It is used for the
following case:
+ * <p>
+ * 1. Get last message ID.
+ * 2. Receive message p1-1:1, p2-1:1, p2-1:2, p3-1:1
+ * 3. Receive response of step1 {|p1-1:1|p2-2:2|p3-3:6|}
+ * 4. No more messages are written to this topic.
+ * As a result, the refresh operation will never be completed.
+ * </p>
+ */
+ private final ConcurrentHashMap<String, MessageId> readPositions;
Review Comment:
```suggestion
private final ConcurrentHashMap<String, MessageId> lastReadPositions;
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -60,6 +60,26 @@ public class TableViewImpl<T> implements TableView<T> {
private final boolean isPersistentTopic;
private TopicCompactionStrategy<T> compactionStrategy;
+ /**
+ * Store the refresh tasks. When read to the position recording in the
right map,
+ * then remove the position in the right map. If the right map is empty,
complete the future in the left.
+ * There should be no timeout exception here, because the caller can only
retry for TimeoutException.
+ * It will only be completed exceptionally when no more messages can be
read.
+ */
+ private final ConcurrentHashMap<CompletableFuture<Void>, Map<String,
TopicMessageId>> refreshRequests;
Review Comment:
```suggestion
private final ConcurrentHashMap<CompletableFuture<Void>, Map<String,
TopicMessageId>> pendingRefreshRequests;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]