BewareMyPower commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1519359197
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +140,126 @@ private Set<String> publishMessages(String topic, int
count, boolean enableBatch
return keys;
}
+ @DataProvider(name = "partition")
+ public static Object[][] partition () {
+ return new Object[][] {
+ { 3 }, { 0 }
+ };
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the rate of reading messages.
+ * 2. Send some messages
+ * 3. Call new `refresh` API, it will wait for reading all the messages
completed.
+ * Case2:
+ * 1. No new messages.
+ * 2. Call new `refresh` API, it will be completed immediately.
+ * Case3:
+ * 1. multi-partition topic, p1, p2 has new message, p3 has no new
messages.
+ * 2. Call new `refresh` API, it will be completed after read new messages.
+ */
+ @Test(dataProvider = "partition")
+ public void testRefreshAPI(int partition) throws Exception {
+ // 1. Prepare resource.
+ String topic = "persistent://public/default/testRefreshAPI" +
RandomUtils.nextLong();
+ if (partition == 0) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, partition);
+ }
+
+ @Cleanup
+ TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic)
+ .create();
+ // 2. Add a listen action to provide the test environment.
+ // The listen action will be triggered when there are incoming
messages every time.
+ // This is a sync operation, so sleep in the listen action can slow
down the reading rate of messages.
+ tv.listen((k, v) -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // 3. Send 20 messages. After refresh, all the messages should be
received.
+ int count = 20;
+ Set<String> keys = this.publishMessages(topic, count, false);
+ // After message sending completely, the table view will take at least
2 seconds to receive all the messages.
+ // If there is not the refresh operation, all messages will not be
received.
+ tv.refresh();
+ // The key of each message is different.
+ assertEquals(tv.size(), count);
+ assertEquals(tv.keySet(), keys);
+ // 4. Test refresh operation can be completed when there is a
partition with on new messages
+ // or no new message for no partition topic.
+ if (partition > 0) {
+ publishMessages(topic, count, partition - 1, false, false);
+ tv.refreshAsync().get(5, TimeUnit.SECONDS);
+ assertEquals(tv.size(), count + partition - 1);
+ } else {
+ tv.refreshAsync().get(5, TimeUnit.SECONDS);
Review Comment:
Can we remove this line because there is no assertion here and no action is
performed after the previous `refresh` call.
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +140,126 @@ private Set<String> publishMessages(String topic, int
count, boolean enableBatch
return keys;
}
+ @DataProvider(name = "partition")
+ public static Object[][] partition () {
+ return new Object[][] {
+ { 3 }, { 0 }
+ };
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the rate of reading messages.
+ * 2. Send some messages
+ * 3. Call new `refresh` API, it will wait for reading all the messages
completed.
+ * Case2:
+ * 1. No new messages.
+ * 2. Call new `refresh` API, it will be completed immediately.
+ * Case3:
+ * 1. multi-partition topic, p1, p2 has new message, p3 has no new
messages.
+ * 2. Call new `refresh` API, it will be completed after read new messages.
+ */
+ @Test(dataProvider = "partition")
+ public void testRefreshAPI(int partition) throws Exception {
+ // 1. Prepare resource.
+ String topic = "persistent://public/default/testRefreshAPI" +
RandomUtils.nextLong();
+ if (partition == 0) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, partition);
+ }
+
+ @Cleanup
+ TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
+ .topic(topic)
+ .create();
+ // 2. Add a listen action to provide the test environment.
+ // The listen action will be triggered when there are incoming
messages every time.
+ // This is a sync operation, so sleep in the listen action can slow
down the reading rate of messages.
+ tv.listen((k, v) -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ // 3. Send 20 messages. After refresh, all the messages should be
received.
+ int count = 20;
+ Set<String> keys = this.publishMessages(topic, count, false);
+ // After message sending completely, the table view will take at least
2 seconds to receive all the messages.
+ // If there is not the refresh operation, all messages will not be
received.
+ tv.refresh();
+ // The key of each message is different.
+ assertEquals(tv.size(), count);
+ assertEquals(tv.keySet(), keys);
+ // 4. Test refresh operation can be completed when there is a
partition with on new messages
+ // or no new message for no partition topic.
+ if (partition > 0) {
+ publishMessages(topic, count, partition - 1, false, false);
Review Comment:
It seems you passed wrong arguments?
```java
private Set<String> publishMessages(String topic, int keyStartPosition,
int count, boolean enableBatch,
boolean enableEncryption) throws
Exception {
```
The 2nd argument should be `keyStartPosition` while the 3rd argument is the
count.
Though it does not make any difference to the test result.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]