liangyepianzhou commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1519367456


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +140,126 @@ private Set<String> publishMessages(String topic, int 
count, boolean enableBatch
         return keys;
     }
 
+    @DataProvider(name = "partition")
+    public static Object[][] partition () {
+        return new Object[][] {
+                { 3 }, { 0 }
+        };
+    }
+
+    /**
+     * Case1:
+     * 1. Slow down the rate of reading messages.
+     * 2. Send some messages
+     * 3. Call new `refresh` API, it will wait for reading all the messages 
completed.
+     * Case2:
+     * 1. No new messages.
+     * 2. Call new `refresh` API, it will be completed immediately.
+     * Case3:
+     * 1. multi-partition topic, p1, p2 has new message, p3 has no new 
messages.
+     * 2. Call new `refresh` API, it will be completed after read new messages.
+     */
+    @Test(dataProvider = "partition")
+    public void testRefreshAPI(int partition) throws Exception {
+        // 1. Prepare resource.
+        String topic = "persistent://public/default/testRefreshAPI" + 
RandomUtils.nextLong();
+        if (partition == 0) {
+            admin.topics().createNonPartitionedTopic(topic);
+        } else {
+            admin.topics().createPartitionedTopic(topic, partition);
+        }
+
+        @Cleanup
+        TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+                .topic(topic)
+                .create();
+        // 2. Add a listen action to provide the test environment.
+        // The listen action will be triggered when there are incoming 
messages every time.
+        // This is a sync operation, so sleep in the listen action can slow 
down the reading rate of messages.
+        tv.listen((k, v) -> {
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        // 3. Send 20 messages. After refresh, all the messages should be 
received.
+        int count = 20;
+        Set<String> keys = this.publishMessages(topic, count, false);
+        // After message sending completely, the table view will take at least 
2 seconds to receive all the messages.
+        // If there is not the refresh operation, all messages will not be 
received.
+        tv.refresh();
+        // The key of each message is different.
+        assertEquals(tv.size(), count);
+        assertEquals(tv.keySet(), keys);
+        // 4. Test refresh operation can be completed when there is a 
partition with on new messages
+        // or no new message for no partition topic.
+        if (partition > 0) {
+            publishMessages(topic, count, partition - 1, false, false);
+            tv.refreshAsync().get(5, TimeUnit.SECONDS);
+            assertEquals(tv.size(), count + partition - 1);
+        } else {
+            tv.refreshAsync().get(5, TimeUnit.SECONDS);

Review Comment:
   This tests the refresh operation can be completed in 5 seconds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to