liangyepianzhou commented on code in PR #18316:
URL: https://github.com/apache/pulsar/pull/18316#discussion_r1017755620


##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java:
##########
@@ -648,5 +647,80 @@ private void batchReceiveAndRedelivery(Consumer<String> 
consumer, int expected)
         Assert.assertEquals(expected * 2, messageReceived);
     }
 
+
+    @Test(timeOut = 30000)
+    public void testBatchReceiveTheSameTopicMessages() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/testBatchReceiveTheSameTopicMessages" + 
UUID.randomUUID();
+        final String singleTopicBatchReceiveSub = 
"singleTopicBatchReceiveSub-sub";
+        final String multiTopicBatchReceiveSub = 
"multiTopicBatchReceiveSub-sub";
+        admin.topics().createPartitionedTopic(topic, 5);
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        @Cleanup
+        Consumer<String> singleTopicBatchReceiveConsumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                
.batchReceivePolicy(BatchReceivePolicy.DEFAULT_MULTI_TOPICS_DISABLE_POLICY)
+                .subscriptionName(singleTopicBatchReceiveSub)
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> multiTopicBatchReceiveConsumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY)
+                .subscriptionName(multiTopicBatchReceiveSub)
+                .subscribe();
+
+        // prepare messages
+        int number = 1000;
+        for (int i = 0; i < number; i++) {
+            producer.sendAsync(i + "");
+        }
+
+        // test receive single topic messages
+        // if this flag become true, it means the batch receive multi-number 
messages
+        boolean multiNumberFlag = false;
+
+        // if number = 0, it means all the messages has been consumed
+        while (number != 0) {
+            Messages<String> messages = 
singleTopicBatchReceiveConsumer.batchReceive();
+            if (messages.size() > 0) {
+                if (messages.size() > 1) {
+                    multiNumberFlag = true;
+                }
+                String topicName = null;
+                for (Message<String> message : messages) {
+                    number--;
+                    if (topicName != null) {
+                        // check if the topicName is the same
+                        Assert.assertEquals(message.getTopicName(), topicName);
+                    }
+                    topicName = message.getTopicName();
+                }
+            }
+        }
+        Assert.assertTrue(multiNumberFlag);
+
+        // test default batch policy can receive the multi topics messages
+        while (true) {
+            Messages<String> messages = 
multiTopicBatchReceiveConsumer.batchReceive();
+            if (messages.size() > 0) {
+                String topicName = null;
+                for (Message<String> message : messages) {
+                    if (topicName != null) {
+                        // receive the different topic messages in one batch 
receive
+                        if (!topicName.equals(message.getTopicName())) {
+                            return;
+                        }
+                    }
+                    topicName = message.getTopicName();
+                }
+            }
+        }

Review Comment:
   Good work! Leave a little suggestion.
   Maybe we should add an `Assert.fail()` here.
   If the multiTopicBatchReceiveConsumer can not receive the multiple topics 
messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to