shibd commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r992298046


##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + 
uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" 
+ uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { 
testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = 
"persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + 
"admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { 
testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { 
testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = 
"persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + 
"admin/v2/persistent/public/default/test-batch-receive-close" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");

Review Comment:
   Sorry, it is useless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to