liangyepianzhou commented on code in PR #21417:
URL: https://github.com/apache/pulsar/pull/21417#discussion_r1519287046
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java:
##########
@@ -136,6 +142,134 @@ private Set<String> publishMessages(String topic, int
count, boolean enableBatch
return keys;
}
+ @DataProvider(name = "partition")
+ public static Object[][] partition () {
+ return new Object[][] {
+ { 3 }, { 0 }
+ };
+ }
+
+ /**
+ * Case1:
+ * 1. Slow down the rate of reading messages.
+ * 2. Send some messages
+ * 3. Call new `refresh` API, it will wait for reading all the messages
completed.
+ * Case2:
+ * 1. No new messages.
+ * 2. Call new `refresh` API, it will be completed immediately.
+ * Case3:
+ * 1. multi-partition topic, p1, p2 has new message, p3 has no new
messages.
+ * 2. Call new `refresh` API, it will be completed after read new messages.
+ */
+ @Test(dataProvider = "partition")
+ public void testRefreshAPI(int partition) throws Exception {
+ // 1. Prepare resource.
+ String topic = "persistent://public/default/testRefreshAPI" +
RandomUtils.nextLong();
+ if (partition == 0) {
+ admin.topics().createNonPartitionedTopic(topic);
+ } else {
+ admin.topics().createPartitionedTopic(topic, partition);
+ }
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("my-sub")
+ .subscribe();
Review Comment:
The read position of the reader is `startMessageId(MessageId.earliest)` and
the retention time is long, so it is fine to delete this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]