Shoothzj commented on code in PR #277:
URL: https://github.com/apache/pulsar-client-cpp/pull/277#discussion_r1219830159


##########
tests/brokermetadata/BrokerMetadataTest.cc:
##########
@@ -20,24 +20,74 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include "lib/Latch.h"
+
 using namespace pulsar;
 
 TEST(BrokerMetadataTest, testConsumeSuccess) {
     Client client{"pulsar://localhost:6650"};
     Producer producer;
-    Result producerResult = 
client.createProducer("persistent://public/default/testConsumeSuccess", 
producer);
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setBatchingEnabled(false);
+    Result producerResult =
+        client.createProducer("persistent://public/default/topic-non-batch", 
producerConfiguration, producer);
     ASSERT_EQ(producerResult, ResultOk);
     Consumer consumer;
-    Result consumerResult =
-        client.subscribe("persistent://public/default/testConsumeSuccess", 
"testConsumeSuccess", consumer);
+    Result consumerResult = 
client.subscribe("persistent://public/default/topic-non-batch", "sub", 
consumer);
     ASSERT_EQ(consumerResult, ResultOk);
-    const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
-    Result sendResult = producer.send(msg);
-    ASSERT_EQ(sendResult, ResultOk);
+    for (int i = 0; i < 10; i++) {
+        std::string content = "testConsumeSuccess" + std::to_string(i);
+        const auto msg = MessageBuilder().setContent(content).build();
+        Result sendResult = producer.send(msg);
+        ASSERT_EQ(sendResult, ResultOk);
+    }
+
+    Message receivedMsg;
+    for (int i = 0; i < 10; i++) {
+        Result receiveResult =
+            consumer.receive(receivedMsg, 1000);  // Assumed that we wait 1000 
ms for each message
+        printf("receive index: %d\n", i);
+        ASSERT_EQ(receiveResult, ResultOk);
+        ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + 
std::to_string(i));
+        ASSERT_EQ(receivedMsg.getIndex(), i);
+        Result ackResult = consumer.acknowledge(receivedMsg);
+        ASSERT_EQ(ackResult, ResultOk);
+    }
+    client.close();
+}
+
+TEST(BrokerMetadataTest, testConsumeBatchSuccess) {
+    Client client{"pulsar://localhost:6650"};
+    Producer producer;
+    Result producerResult = 
client.createProducer("persistent://public/default/topic-batch", producer);
+    ASSERT_EQ(producerResult, ResultOk);
+    Consumer consumer;
+    Result consumerResult = 
client.subscribe("persistent://public/default/topic-batch", "sub", consumer);
+    ASSERT_EQ(consumerResult, ResultOk);
+
+    Latch latch(10);
+    auto sendCallback = [&latch](Result result, const MessageId& id) {
+        ASSERT_EQ(result, ResultOk);
+        latch.countdown();
+    };
+
+    for (int i = 0; i < 10; i++) {
+        std::string content = "testConsumeSuccess" + std::to_string(i);
+        const auto msg = MessageBuilder().setContent(content).build();
+        producer.sendAsync(msg, sendCallback);
+    }
+
+    latch.wait();
+
     Message receivedMsg;
-    Result receiveResult = consumer.receive(receivedMsg);
-    ASSERT_EQ(receiveResult, ResultOk);
-    ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+    for (int i = 0; i < 10; i++) {
+        Result receiveResult =
+            consumer.receive(receivedMsg, 1000);  // Assumed that we wait 1000 
ms for each message
+        ASSERT_EQ(receiveResult, ResultOk);
+        ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + 
std::to_string(i));
+        ASSERT_GE(receivedMsg.getIndex(), 0);
+        ASSERT_LT(receivedMsg.getIndex(), 10);

Review Comment:
   @BewareMyPower Sorry, I missed push a commit. I will fix it after going to 
home.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to