BewareMyPower commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1519130025


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +142,134 @@ private Set<String> publishMessages(String topic, int 
count, boolean enableBatch
         return keys;
     }
 
+    @DataProvider(name = "partition")
+    public static Object[][] partition () {
+        return new Object[][] {
+                { 3 }, { 0 }
+        };
+    }
+
+    /**
+     * Case1:
+     * 1. Slow down the rate of reading messages.
+     * 2. Send some messages
+     * 3. Call new `refresh` API, it will wait for reading all the messages 
completed.
+     * Case2:
+     * 1. No new messages.
+     * 2. Call new `refresh` API, it will be completed immediately.
+     * Case3:
+     * 1. multi-partition topic, p1, p2 has new message, p3 has no new 
messages.
+     * 2. Call new `refresh` API, it will be completed after read new messages.
+     */
+    @Test(dataProvider = "partition")
+    public void testRefreshAPI(int partition) throws Exception {
+        // 1. Prepare resource.
+        String topic = "persistent://public/default/testRefreshAPI" + 
RandomUtils.nextLong();
+        if (partition == 0) {
+            admin.topics().createNonPartitionedTopic(topic);
+        } else {
+            admin.topics().createPartitionedTopic(topic, partition);
+        }
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .subscribe();

Review Comment:
   ```suggestion
   ```
   
   Should it be removed?



##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +142,134 @@ private Set<String> publishMessages(String topic, int 
count, boolean enableBatch
         return keys;
     }
 
+    @DataProvider(name = "partition")
+    public static Object[][] partition () {
+        return new Object[][] {
+                { 3 }, { 0 }
+        };
+    }
+
+    /**
+     * Case1:
+     * 1. Slow down the rate of reading messages.
+     * 2. Send some messages
+     * 3. Call new `refresh` API, it will wait for reading all the messages 
completed.
+     * Case2:
+     * 1. No new messages.
+     * 2. Call new `refresh` API, it will be completed immediately.
+     * Case3:
+     * 1. multi-partition topic, p1, p2 has new message, p3 has no new 
messages.
+     * 2. Call new `refresh` API, it will be completed after read new messages.
+     */
+    @Test(dataProvider = "partition")
+    public void testRefreshAPI(int partition) throws Exception {
+        // 1. Prepare resource.
+        String topic = "persistent://public/default/testRefreshAPI" + 
RandomUtils.nextLong();
+        if (partition == 0) {
+            admin.topics().createNonPartitionedTopic(topic);
+        } else {
+            admin.topics().createPartitionedTopic(topic, partition);
+        }
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("my-sub")
+                .subscribe();
+        @Cleanup
+        TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+                .topic(topic)
+                .create();
+        // 2. Add a listen action to provider the test environment.

Review Comment:
   ```suggestion
           // 2. Add a listen action to provide the test environment.
   ```
   
   typo



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -180,96 +204,157 @@ public void close() throws PulsarClientException {
     }
 
     private void handleMessage(Message<T> msg) {
-        try {
-            if (msg.hasKey()) {
-                String key = msg.getKey();
-                T cur = msg.size() > 0 ? msg.getValue() : null;
-                if (log.isDebugEnabled()) {
-                    log.debug("Applying message from topic {}. key={} 
value={}",
+        readPositions.put(msg.getTopicName(), msg.getMessageId());
+        if (msg.hasKey()) {
+            String key = msg.getKey();
+            T cur = msg.size() > 0 ? msg.getValue() : null;
+            if (log.isDebugEnabled()) {
+                log.debug("Applying message from topic {}. key={} value={}",
+                        conf.getTopicName(),
+                        key,
+                        cur);
+            }
+
+            boolean update = true;
+            if (compactionStrategy != null) {
+                T prev = data.get(key);
+                update = !compactionStrategy.shouldKeepLeft(prev, cur);
+                if (!update) {
+                    log.info("Skipped the message from topic {}. key={} 
value={} prev={}",
                             conf.getTopicName(),
                             key,
-                            cur);
+                            cur,
+                            prev);
+                    compactionStrategy.handleSkippedMessage(key, cur);
                 }
+            }
 
-                boolean update = true;
-                if (compactionStrategy != null) {
-                    T prev = data.get(key);
-                    update = !compactionStrategy.shouldKeepLeft(prev, cur);
-                    if (!update) {
-                        log.info("Skipped the message from topic {}. key={} 
value={} prev={}",
-                                conf.getTopicName(),
-                                key,
-                                cur,
-                                prev);
-                        compactionStrategy.handleSkippedMessage(key, cur);
+            if (update) {
+                try {
+                    listenersMutex.lock();
+                    if (null == cur) {
+                        data.remove(key);
+                    } else {
+                        data.put(key, cur);
                     }
-                }
-
-                if (update) {
-                    try {
-                        listenersMutex.lock();
-                        if (null == cur) {
-                            data.remove(key);
-                        } else {
-                            data.put(key, cur);
-                        }
 
-                        for (BiConsumer<String, T> listener : listeners) {
-                            try {
-                                listener.accept(key, cur);
-                            } catch (Throwable t) {
-                                log.error("Table view listener raised an 
exception", t);
-                            }
+                    for (BiConsumer<String, T> listener : listeners) {
+                        try {
+                            listener.accept(key, cur);
+                        } catch (Throwable t) {
+                            log.error("Table view listener raised an 
exception", t);
                         }
-                    } finally {
-                        listenersMutex.unlock();
                     }
+                } finally {
+                    listenersMutex.unlock();
                 }
             }
-        } finally {
-            msg.release();
         }
     }
 
-    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> 
reader) {
+    @Override
+    public CompletableFuture<Void> refreshAsync() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        reader.thenCompose(reader -> buildFreshTask(reader, 
completableFuture).thenAccept(lastMessageIds -> {
+            // After get the response of lastMessageIds, put the future and 
result into `refreshMap`
+            // and then filter out partitions that has been read to the 
lastMessageID.
+            synchronized (this) {
+                refreshRequests.put(completableFuture, lastMessageIds);
+                filterReceivedMessages(lastMessageIds);
+                // If there is no new messages, the refresh operation could be 
completed right now.
+                if (lastMessageIds.isEmpty()) {
+                    refreshRequests.remove(completableFuture);
+                    completableFuture.complete(null);

Review Comment:
   You should move `completableFuture.complete(null)` out of the synchronized 
block because it should not be synchronized. For example, when users call
   
   ```java
           tv.refreshAsync().thenAccept(__ -> {
               try {
                   Thread.sleep(1000L); // simulate a time cost task
               } catch (InterruptedException e) {
                   throw new RuntimeException(e);
               }
           });
   ```
   
   The callback that sleeps for 1000 seconds will be called by 
`completableFuture.complete(null)` and the lock will be held until the callback 
is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to