codelipenghui commented on a change in pull request #8285:
URL: https://github.com/apache/pulsar/pull/8285#discussion_r509057589
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -116,6 +122,91 @@ public void testSeek() throws Exception {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}
+ @Test
+ public void testSeekForBatch() throws Exception {
+ final String topicName =
"persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new
ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ futureMessageIds.forEach(future -> {
+ MessageId messageId = null;
+ try {
+ messageId = future.get();
+ messageIds.add(messageId);
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ });
Review comment:
```suggestion
FutureUtil.waitForAll(futureMessageIds).get()
```
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -116,6 +122,91 @@ public void testSeek() throws Exception {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}
+ @Test
+ public void testSeekForBatch() throws Exception {
+ final String topicName =
"persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new
ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ futureMessageIds.forEach(future -> {
+ MessageId messageId = null;
+ try {
+ messageId = future.get();
+ messageIds.add(messageId);
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ });
+ producer.flush();
Review comment:
Since the messageId is returned, we don't need to flush here.
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -116,6 +122,91 @@ public void testSeek() throws Exception {
assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
}
+ @Test
+ public void testSeekForBatch() throws Exception {
+ final String topicName =
"persistent://prop/use/ns-abcd/testSeekForBatch";
+ String subscriptionName = "my-subscription-batch";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ .topic(topicName).create();
+
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new
ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ futureMessageIds.forEach(future -> {
+ MessageId messageId = null;
+ try {
+ messageId = future.get();
+ messageIds.add(messageId);
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ });
+ producer.flush();
+ producer.close();
+
+
+ org.apache.pulsar.client.api.Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .startMessageIdInclusive()
+ .subscribe();
+
+ PersistentTopic topicRef = (PersistentTopic)
pulsar.getBrokerService().getTopicReference(topicName).get();
+ assertNotNull(topicRef);
+
+ assertEquals(topicRef.getSubscriptions().size(), 1);
+
+ MessageId resetId = messageIds.get(4);
+ consumer.seek(resetId);
+ // Wait for consumer to reconnect
+ Thread.sleep(500);
+
+ Message<String> nextMessage = consumer.receive();
+ MessageId nextId = nextMessage.getMessageId();
+ consumer.acknowledge(nextId);
+ String expectedMessage = messages.get(4);
+ log.info("\nexpected next message: {}, next message {}",
expectedMessage, nextMessage.getValue());
+ assertEquals(nextMessage.getValue(), expectedMessage);
+
+ resetId = messageIds.get(3);
+ consumer.seek(resetId);
+ // Wait for consumer to reconnect
+ Thread.sleep(500);
+
+ nextMessage = consumer.receive();
+ nextId = nextMessage.getMessageId();
+ consumer.acknowledge(nextId);
+ expectedMessage = messages.get(3);
+ log.info("expected next message2: {}, next message2 {}",
expectedMessage, nextMessage.getValue());
+
+ assertEquals(nextMessage.getValue(), expectedMessage);
+
+ resetId = messageIds.get(2);
+ consumer.seek(resetId);
+ // Wait for consumer to reconnect
+ Thread.sleep(500);
+
+ nextMessage = consumer.receive();
+ nextId = nextMessage.getMessageId();
+ consumer.acknowledge(nextId);
+ expectedMessage = messages.get(2);
+ log.info("expected next message3: {}, next message3 {}",
expectedMessage, nextMessage.getValue());
+ assertEquals(nextMessage.getValue(), expectedMessage);
Review comment:
Could you please also add some corner case test? such as reset to the
last message to check the consumer can't receive any message?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]