BewareMyPower commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1519257249
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +142,134 @@ private Set<String> publishMessages(String topic, int
count, boolean enableBatch
return keys;
}
+ @DataProvider(name = "partition")
+ public static Object[][] partition () {
+ return new Object[][] {
+ { 3 }, { 0 }
+ };
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the rate of reading messages.
+ * 2. Send some messages
+ * 3. Call new `refresh` API, it will wait for reading all the messages
completed.
+ * Case2:
+ * 1. No new messages.
+ * 2. Call new `refresh` API, it will be completed immediately.
+ * Case3:
+ * 1. multi-partition topic, p1, p2 has new message, p3 has no new
messages.
+ * 2. Call new `refresh` API, it will be completed after read new messages.
+ */
+ @Test(dataProvider = "partition")
+ public void testRefreshAPI(int partition) throws Exception {
+ // 1. Prepare resource.
+ String topic = "persistent://public/default/testRefreshAPI" +
RandomUtils.nextLong();
+ if (partition == 0) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, partition);
+ }
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
+ @Cleanup
+ TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic)
+ .create();
+ // 2. Add a listen action to provide the test environment.
+ // The listen action will be triggered when there are incoming
messages every time.
+ // This is a sync operation, so sleep in the listen action can slow
down the reading rate of messages.
+ tv.listen((k, v) -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // 3. Send 20 messages. After refresh, all the messages should be
received.
+ int count = 20;
+ Set<String> keys = this.publishMessages(topic, count, false);
+ // After message sending completely, the table view will take at least
2 seconds to receive all the messages.
+ // If there is not the refresh operation, all messages will not be
received.
+ tv.refresh();
+ // The key of each message is different.
+ assertEquals(tv.size(), count);
+ assertEquals(tv.keySet(), keys);
+ // 4. Test refresh operation can be completed when there is a
partition with on new messages
+ // or no new message for no partition topic.
+ if (partition > 0) {
+ publishMessages(topic, count, partition - 1, false, false);
+ tv.refreshAsync().get(5, TimeUnit.SECONDS);
+ assertEquals(tv.size(), count + partition - 1);
+ } else {
+ tv.refreshAsync().get(5, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the read of reading messages.
+ * 2. Send some messages.
+ * 3. Call new `refresh` API.
+ * 4. Close the reader of the tableview.
+ * 5. The refresh operation will be failed with a `AlreadyClosedException`.
+ * Case2:
+ * 1. Close the reader of the tableview.
+ * 2. Call new `refresh` API.
+ * 3. The refresh operation will be fail with a `AlreadyClosedException`.
+ */
+ @Test
+ public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws
Exception {
+ // 1. Prepare resource.
+ String topic1 =
"persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-1";
+ admin.topics().createNonPartitionedTopic(topic1);
+ String topic2 =
"persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2";
+ admin.topics().createNonPartitionedTopic(topic2);
+ @Cleanup
+ TableView<byte[]> tv1 = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic1)
+ .create();
+ @Cleanup
+ TableView<byte[]> tv2 = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic1)
+ .create();
+ // 2. Slow down the rate of reading messages.
+ tv1.listen((k, v) -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ publishMessages(topic1, 20, false);
+ Field field = TableViewImpl.class.getDeclaredField("reader");
+ field.setAccessible(true);
+ CompletableFuture<Reader<byte[]>> readerFuture1 =
(CompletableFuture<Reader<byte[]>>) field.get(tv1);
Review Comment:
Is it necessary to use reflection here? You can just call
`TableView#closeAsync` instead of closing the internal reader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]