BewareMyPower commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1519320193


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -289,20 +378,29 @@ private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Reader<
                                reader.getTopic(),
                                messagesRead,
                                durationMillis / 1000.0);
-                       future.complete(reader);
+                       future.complete(null);
                    }
                 });
     }
 
     private void readTailMessages(Reader<T> reader) {
         reader.readNextAsync()
                 .thenAccept(msg -> {
-                    handleMessage(msg);
+                    try {
+                        handleMessage(msg);
+                    } finally {
+                        msg.release();
+                    }

Review Comment:
   `handleMessage` already releases `msg`, you should not release it again.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -226,50 +250,115 @@ private void handleMessage(Message<T> msg) {
                     }
                 }
             }
+            checkAllFreshTask(msg);
         } finally {
             msg.release();
         }
     }
 
-    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> 
reader) {
+    @Override
+    public CompletableFuture<Void> refreshAsync() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        reader.thenCompose(reader -> 
getLastMessageIds(reader).thenAccept(lastMessageIds -> {
+            // After get the response of lastMessageIds, put the future and 
result into `refreshMap`
+            // and then filter out partitions that has been read to the 
lastMessageID.
+            refreshRequests.put(completableFuture, lastMessageIds);
+            filterReceivedMessages(lastMessageIds);
+            // If there is no new messages, the refresh operation could be 
completed right now.
+            if (lastMessageIds.isEmpty()) {
+                refreshRequests.remove(completableFuture);
+                completableFuture.complete(null);
+            }
+        })).exceptionally(throwable -> {
+            completableFuture.completeExceptionally(throwable);
+            refreshRequests.remove(completableFuture);
+            return null;
+        });
+        return completableFuture;
+    }
+
+    @Override
+    public void refresh() throws PulsarClientException {
+        refreshAsync().join();
+    }
+
+    private CompletableFuture<Void> readAllExistingMessages(Reader<T> reader) {
         long startTime = System.nanoTime();
         AtomicLong messagesRead = new AtomicLong();
 
-        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
-            Map<String, TopicMessageId> maxMessageIds = new 
ConcurrentHashMap<>();
-            lastMessageIds.forEach(topicMessageId -> {
-                maxMessageIds.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
-            });
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        getLastMessageIds(reader).thenAccept(maxMessageIds -> {
             readAllExistingMessages(reader, future, startTime, messagesRead, 
maxMessageIds);
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
         });
-        future.thenAccept(__ -> readTailMessages(reader));
         return future;
     }
 
-    private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Reader<T>> future, long startTime,
+    private CompletableFuture<Map<String, TopicMessageId>> 
getLastMessageIds(Reader<T> reader) {
+        return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new 
ConcurrentHashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
+            });
+            return maxMessageIds;
+        });
+    }
+
+    private void filterReceivedMessages(Map<String, TopicMessageId> 
lastMessageIds) {
+        // The `lastMessageIds` and `readPositions` is concurrency-safe data 
types.
+        lastMessageIds.forEach((partition, lastMessageId) -> {
+            MessageId messageId = readPositions.get(partition);
+            if (messageId != null && lastMessageId.compareTo(messageId) <= 0) {
+                lastMessageIds.remove(partition);
+            }
+        });
+    }
+
+    private boolean checkFreshTask(Map<String, TopicMessageId> maxMessageIds, 
CompletableFuture<Void> future,
+                                   Message<T> msg) {
+        // The message received from multi-consumer/multi-reader is processed 
to TopicMessageImpl.
+        String topicName = msg.getTopicName();
+        TopicMessageId maxMessageId = maxMessageIds.get(topicName);
+        // We need remove the partition from the maxMessageIds map
+        // once the partition has been read completely.
+        if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) 
>= 0) {
+            maxMessageIds.remove(topicName);
+        }
+        if (maxMessageIds.isEmpty()) {
+            future.complete(null);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void checkAllFreshTask(Message<T> msg) {
+        refreshRequests.forEach((future, maxMessageIds) -> {
+            if (checkFreshTask(maxMessageIds, future, msg)) {
+                refreshRequests.remove(future);
+            }
+        });
+    }
+
+    private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Void> future, long startTime,
                                          AtomicLong messagesRead, Map<String, 
TopicMessageId> maxMessageIds) {
         reader.hasMessageAvailableAsync()
                 .thenAccept(hasMessage -> {
                    if (hasMessage) {
                        reader.readNextAsync()
                                .thenAccept(msg -> {
                                   messagesRead.incrementAndGet();
-                                  // We need remove the partition from the 
maxMessageIds map
-                                  // once the partition has been read 
completely.
-                                  TopicMessageId maxMessageId = 
maxMessageIds.get(msg.getTopicName());
-                                  if (maxMessageId != null && 
msg.getMessageId().compareTo(maxMessageId) >= 0) {
-                                      maxMessageIds.remove(msg.getTopicName());
-                                  }
-                                  handleMessage(msg);
-                                  if (maxMessageIds.isEmpty()) {
-                                      future.complete(reader);
-                                  } else {
-                                      readAllExistingMessages(reader, future, 
startTime, messagesRead, maxMessageIds);
-                                  }
+                                   try {
+                                       handleMessage(msg);
+                                       if (!checkFreshTask(maxMessageIds, 
future, msg)) {
+                                           readAllExistingMessages(reader, 
future, startTime,
+                                                   messagesRead, 
maxMessageIds);
+                                       }
+                                   } finally {
+                                       msg.release();
+                                   }

Review Comment:
   Sorry my [previous 
comment](https://github.com/apache/pulsar/pull/21417#discussion_r1519288345) 
ignores the case here. 
   
   `handleMessage` will release the reference count of `msg`, so it's unsafe to 
pass the `msg` to `checkFreshTask` after that. You might get around it like:
   
   ```java
   String topicName = msg.getTopicName();
   MessageId msgId = msg.getMessageId();
   handleMessage(msg);
   if (!checkFreshTask(maxMessageIds, future, topicName, msgId)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to