BewareMyPower commented on code in PR #21:
URL: https://github.com/apache/pulsar-client-cpp/pull/21#discussion_r991979143


##########
lib/ConsumerImplBase.h:
##########
@@ -20,23 +20,38 @@
 #define PULSAR_CONSUMER_IMPL_BASE_HEADER
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
-
+#include "HandlerBase.h"
+#include <queue>
 #include <set>
 
 namespace pulsar {
 class ConsumerImplBase;
+class HandlerBase;
 
 typedef std::weak_ptr<ConsumerImplBase> ConsumerImplBaseWeakPtr;
 
-class ConsumerImplBase {
+class OpBatchReceive {
    public:
-    virtual ~ConsumerImplBase() {}
+    OpBatchReceive();
+    explicit OpBatchReceive(const BatchReceiveCallback& batchReceiveCallback);
+    const BatchReceiveCallback batchReceiveCallback_;
+    const long createAt_;

Review Comment:
   ```suggestion
       const int64_t createAt_;
   ```
   
   To make the type definition consistent with what 
`TimeUtils::currentTimeMillis` returns.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& 
topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, 
ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > 
conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), 
userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << 
userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than 
maxReceiverQueueSize: {"
+                                                        << 
conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           
"maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, 
the timer will fail, and it
+            // cannot continue at this time, and the request needs to be 
ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - 
(TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    if (hasEnoughMessagesForBatchReceive()) {
+        Lock lock(batchPendingReceiveMutex_);
+        notifyBatchPendingReceivedCallback(callback);
+        lock.unlock();

Review Comment:
   ```suggestion
           notifyBatchPendingReceivedCallback(callback);
   ```
   
   IIUC, `batchPendingReceiveMutex_` is used to make the accesses to 
`batchPendingReceives_` thread safe. We don't need the lock here.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& 
topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, 
ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > 
conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), 
userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << 
userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than 
maxReceiverQueueSize: {"
+                                                        << 
conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           
"maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, 
the timer will fail, and it
+            // cannot continue at this time, and the request needs to be 
ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - 
(TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+    }
+}
+
+void ConsumerImplBase::batchReceiveAsync(BatchReceiveCallback callback) {
+    // fail the callback if consumer is closing or closed
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed, Messages());
+        return;
+    }
+
+    if (hasEnoughMessagesForBatchReceive()) {
+        Lock lock(batchPendingReceiveMutex_);
+        notifyBatchPendingReceivedCallback(callback);
+        lock.unlock();
+    } else {
+        // expectmoreIncomingMessages();

Review Comment:
   ```suggestion
   ```



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + 
uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" 
+ uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);

Review Comment:
   ```suggestion
   ```



##########
lib/MessagesImpl.cc:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentNumberOfMessages_(0),
+      currentSizeOfMessages_(0) {
+    messageList_ = std::vector<Message>();
+}
+
+const std::vector<Message>& MessagesImpl::getMessageList() const { return 
messageList_; }
+
+bool MessagesImpl::canAdd(const Message& message) const {
+    if (currentNumberOfMessages_ == 0) {
+        return true;
+    }
+
+    if (maxNumberOfMessages_ > 0 && currentNumberOfMessages_ + 1 > 
maxNumberOfMessages_) {
+        return false;
+    }
+
+    if (maxSizeOfMessages_ > 0 && currentSizeOfMessages_ + message.getLength() 
> maxSizeOfMessages_) {
+        return false;
+    }
+
+    return true;
+}
+
+void MessagesImpl::add(const Message& message) {
+    if (!canAdd(message)) {
+        throw std::invalid_argument("No more space to add messages.");
+    }
+    currentNumberOfMessages_++;

Review Comment:
   Though Java implementation adds this field to represent the size of 
`messagesList`. I think it's redundant. Could you remove this field and use 
`messagesList_.size()` instead?



##########
tests/MessagesImplTest.cc:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+    // 0. test not limits
+    {
+        MessagesImpl messages(-1, -1);
+        ASSERT_TRUE(messages.canAdd(Message()));
+    }
+
+    // 1. test max number of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(10, -1);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        try {
+            messages.add(msg);
+            FAIL() << "Should be failed.";
+        } catch (std::invalid_argument& e) {
+        }
+
+        messages.clear();
+        ASSERT_TRUE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 0);
+    }
+
+    // 2. test max size of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(-1, 10);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        try {
+            messages.add(msg);
+            FAIL() << "Should be failed.";
+        } catch (std::invalid_argument& e) {
+        }

Review Comment:
   Use `ASSERT_THROW`



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + 
uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" 
+ uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { 
testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = 
"persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + 
"admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeout) { 
testBatchReceiveTimeout(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveTimeoutWithMultiConsumer) { 
testBatchReceiveTimeout(true); }
+
+void testBatchReceiveClose(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = 
"persistent://public/default/test-batch-receive-close" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + 
"admin/v2/persistent/public/default/test-batch-receive-close" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");

Review Comment:
   Could you explain why did you set these properties?



##########
lib/ConsumerImpl.cc:
##########
@@ -527,11 +495,58 @@ void ConsumerImpl::failPendingReceiveCallback() {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
         
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                              shared_from_this(), 
ResultAlreadyClosed, msg, callback));
+                                              get_shared_this_ptr(), 
ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
 
+void ConsumerImpl::executeNotifyCallback(Message& msg) {
+    Lock lock(pendingReceiveMutex_);
+    // if asyncReceive is waiting then notify callback without adding to 
incomingMessages queue
+    bool asyncReceivedWaiting = !pendingReceives_.empty();
+    ReceiveCallback callback;
+    if (asyncReceivedWaiting) {
+        callback = pendingReceives_.front();
+        pendingReceives_.pop();
+    }
+    lock.unlock();
+
+    // has pending receive, direct callback.
+    if (asyncReceivedWaiting) {
+        
listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                              get_shared_this_ptr(), ResultOk, 
msg, callback));
+        return;
+    }
+
+    // try to add incoming messages.
+    // config_.getReceiverQueueSize() != 0 or waiting For ZeroQueueSize 
Message`
+    if (messageListener_ || config_.getReceiverQueueSize() != 0 || 
waitingForZeroQueueSizeMessage) {
+        incomingMessages_.push(msg);
+        incomingMessagesSize_.fetch_add(msg.getLength());
+    }
+
+    // try trigger pending batch messages
+    if (hasEnoughMessagesForBatchReceive()) {
+        ConsumerImplBase::notifyBatchPendingReceivedCallback();
+    }
+}
+
+void ConsumerImpl::notifyBatchPendingReceivedCallback(const 
BatchReceiveCallback& callback) {
+    auto messages = 
std::make_shared<MessagesImpl>(batchReceivePolicy_.getMaxNumMessages(),
+                                                   
batchReceivePolicy_.getMaxNumBytes());
+    Message peekMsg;
+    while (incomingMessages_.peek(peekMsg) && messages->canAdd(peekMsg)) {
+        // decreaseIncomingMessageSize
+        Message msg;
+        incomingMessages_.pop(msg);
+        messageProcessed(msg);
+        messages->add(msg);

Review Comment:
   ```suggestion
       while (incomingMessages_.pop(peekMsg, std::chrono::milliseconds(0)) && 
messages->canAdd(peekMsg)) {
           messageProcessed(peekMsg);
           messages->add(peekMsg);
       }
   ```
   
   The combination of `peek` and `pop` is not atomic, we should use `pop` with 
zero timeout like what we did in `ConsumerImpl::receiveAsync`.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& 
topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, 
ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > 
conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), 
userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << 
userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than 
maxReceiverQueueSize: {"
+                                                        << 
conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           
"maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, 
the timer will fail, and it
+            // cannot continue at this time, and the request needs to be 
ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - 
(TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });

Review Comment:
   There is no need to capture `self` because the lambda's body never accesses 
any field or method of `ConsumerImplBase`. Declare a default constructed 
`Messages` object and capture it is also weird. And we don't need to unlock 
explicitly.
   
   ```c++
       Lock lock(batchPendingReceiveMutex_);
       while (!batchPendingReceives_.empty()) {
           OpBatchReceive opBatchReceive = batchPendingReceives_.front();
           batchPendingReceives_.pop();
           listenerExecutor_->postWork([opBatchReceive]() {
               opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, {});
           });
       }
   ```



##########
lib/MessagesImpl.cc:
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "MessagesImpl.h"
+#include "stdexcept"
+
+MessagesImpl::MessagesImpl(int maxNumberOfMessages, long maxSizeOfMessages)
+    : maxNumberOfMessages_(maxNumberOfMessages),
+      maxSizeOfMessages_(maxSizeOfMessages),
+      currentNumberOfMessages_(0),
+      currentSizeOfMessages_(0) {
+    messageList_ = std::vector<Message>();
+}

Review Comment:
   ```suggestion
         currentSizeOfMessages_(0) {}
   ```
   
   The default constructor of `std::vector` creates an empty vector, it's 
redundant.



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& 
topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, 
ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > 
conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), 
userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << 
userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than 
maxReceiverQueueSize: {"
+                                                        << 
conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           
"maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, 
the timer will fail, and it
+            // cannot continue at this time, and the request needs to be 
ignored.
+            auto self = weakSelf.lock();
+            if (self && !ec) {
+                self->doBatchReceiveTimeTask();
+            }
+        });
+    }
+}
+
+void ConsumerImplBase::doBatchReceiveTimeTask() {
+    if (state_ != Ready) {
+        return;
+    }
+
+    bool hasPendingReceives = false;
+    long timeToWaitMs = batchReceivePolicy_.getTimeoutMs();
+
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        long diff =
+            batchReceivePolicy_.getTimeoutMs() - 
(TimeUtils::currentTimeMillis() - batchReceive.createAt_);
+        if (diff <= 0) {
+            
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
+            batchPendingReceives_.pop();
+        } else {
+            hasPendingReceives = true;
+            timeToWaitMs = diff;
+            break;
+        }
+    }
+    lock.unlock();
+
+    if (hasPendingReceives) {
+        triggerBatchReceiveTimerTask(timeToWaitMs);
+    }
+}
+
+void ConsumerImplBase::failPendingBatchReceiveCallback() {
+    Messages msgs;
+    Lock lock(batchPendingReceiveMutex_);
+    while (!batchPendingReceives_.empty()) {
+        OpBatchReceive opBatchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        auto self = shared_from_this();
+        listenerExecutor_->postWork([opBatchReceive, self, msgs]() {
+            opBatchReceive.batchReceiveCallback_(ResultAlreadyClosed, msgs);
+        });
+    }
+    lock.unlock();
+}
+
+void ConsumerImplBase::notifyBatchPendingReceivedCallback() {
+    Lock lock(batchPendingReceiveMutex_);
+    if (!batchPendingReceives_.empty()) {
+        OpBatchReceive& batchReceive = batchPendingReceives_.front();
+        batchPendingReceives_.pop();
+        notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);

Review Comment:
   ```suggestion
           lock.unlock();
           
notifyBatchPendingReceivedCallback(batchReceive.batchReceiveCallback_);
   ```



##########
lib/ConsumerImplBase.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerImpl.h"
+#include "MessageImpl.h"
+#include "MessagesImpl.h"
+#include "LogUtils.h"
+#include "TimeUtils.h"
+#include "pulsar/Result.h"
+#include "MessageIdUtil.h"
+#include "AckGroupingTracker.h"
+#include "ConsumerImplBase.h"
+
+#include <algorithm>
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
+ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& 
topic, Backoff backoff,
+                                   const ConsumerConfiguration& conf, 
ExecutorServicePtr listenerExecutor)
+    : HandlerBase(client, topic, backoff),
+      listenerExecutor_(listenerExecutor),
+      batchReceivePolicy_(conf.getBatchReceivePolicy()) {
+    auto userBatchReceivePolicy = conf.getBatchReceivePolicy();
+    if (userBatchReceivePolicy.getMaxNumMessages() > 
conf.getReceiverQueueSize()) {
+        batchReceivePolicy_ =
+            BatchReceivePolicy(conf.getReceiverQueueSize(), 
userBatchReceivePolicy.getMaxNumBytes(),
+                               userBatchReceivePolicy.getTimeoutMs());
+        LOG_WARN("BatchReceivePolicy maxNumMessages: {" << 
userBatchReceivePolicy.getMaxNumMessages()
+                                                        << "} is greater than 
maxReceiverQueueSize: {"
+                                                        << 
conf.getReceiverQueueSize()
+                                                        << "}, reset to "
+                                                           
"maxReceiverQueueSize. ");
+    }
+    batchReceiveTimer_ = listenerExecutor_->createDeadlineTimer();
+}
+
+void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
+    if (timeoutMs > 0) {
+        
batchReceiveTimer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs));
+        std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+        batchReceiveTimer_->async_wait([weakSelf](const 
boost::system::error_code& ec) {
+            // If two requests call runPartitionUpdateTask at the same time, 
the timer will fail, and it
+            // cannot continue at this time, and the request needs to be 
ignored.

Review Comment:
   ```suggestion
   ```
   
   These comments seem to be copied from somewhere else.



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + 
uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" 
+ uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);

Review Comment:
   ```suggestion
   ```
   
   debug logs in tests are meaningless



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + 
uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" 
+ uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { 
testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = 
"persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + 
"admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);

Review Comment:
   ```suggestion
   ```



##########
tests/BasicEndToEndTest.cc:
##########
@@ -4098,3 +4098,191 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     consumer.close();
     client.close();
 }
+
+void testBatchReceive(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "persistent://public/default/test-batch-receive" + 
uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url =
+            adminUrl + "admin/v2/persistent/public/default/test-batch-receive" 
+ uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    // when receiver queue size > maxNumMessages, use receiver queue size.
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, -1));
+    consumerConfig.setReceiverQueueSize(10);
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // sync batch receive test
+    std::string prefix = "batch-receive-msg";
+    int numOfMessages = 10;
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("1 sending message " << messageContent);
+    }
+
+    Messages messages;
+    Result receive = consumer.batchReceive(messages);
+    ASSERT_EQ(receive, ResultOk);
+    ASSERT_EQ(messages.size(), numOfMessages);
+
+    // async batch receive test
+    Latch latch(1);
+    BatchReceiveCallback batchReceiveCallback = [&latch, numOfMessages](Result 
result, Messages messages) {
+        ASSERT_EQ(result, ResultOk);
+        ASSERT_EQ(messages.size(), numOfMessages);
+        latch.countdown();
+    };
+    consumer.batchReceiveAsync(batchReceiveCallback);
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        producer.send(msg);
+        LOG_DEBUG("2 sending message " << messageContent);
+    }
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
+
+    producer.close();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testBatchReceive) { testBatchReceive(false); }
+
+TEST(BasicEndToEndTest, testBatchReceiveWithMultiConsumer) { 
testBatchReceive(true); }
+
+void testBatchReceiveTimeout(bool multiConsumer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = 
"persistent://public/default/test-batch-receive-timeout" + uniqueChunk;
+
+    if (multiConsumer) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + 
"admin/v2/persistent/public/default/test-batch-receive-timeout" +
+                          uniqueChunk + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setBatchReceivePolicy(BatchReceivePolicy(1000, -1, 1000));
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");

Review Comment:
   The same question as I asked before.



##########
tests/MessagesImplTest.cc:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <MessagesImpl.h>
+#include "pulsar/MessageBuilder.h"
+
+using namespace pulsar;
+
+TEST(MessagesImplTest, testMessage) {
+    // 0. test not limits
+    {
+        MessagesImpl messages(-1, -1);
+        ASSERT_TRUE(messages.canAdd(Message()));
+    }
+
+    // 1. test max number of messages.
+    {
+        Message msg = MessageBuilder().setContent("c").build();
+        MessagesImpl messages(10, -1);
+        for (int i = 0; i < 10; i++) {
+            messages.add(msg);
+        }
+        ASSERT_FALSE(messages.canAdd(msg));
+        ASSERT_EQ(messages.size(), 10);
+        try {
+            messages.add(msg);
+            FAIL() << "Should be failed.";
+        } catch (std::invalid_argument& e) {
+        }

Review Comment:
   Use `ASSERT_THROW`, see `KeySharedPolicyTest` as example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to