This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0721dab  [C++] Fix Consumer send redeliverMessages repeatedly (#9072)
0721dab is described below

commit 0721dab1a5a9a0384ed5d71894eecea9671fdc9a
Author: sijianliang <[email protected]>
AuthorDate: Wed Jan 6 18:57:40 2021 +0800

    [C++] Fix Consumer send redeliverMessages repeatedly (#9072)
    
    Fixes #9028
    
    Both PartitionedConsumerImpl and ConsumerImpl have member variable 
unAckedMessageTrackerPtr_ (class UnAckedMessageTrackerEnabled), and 
PartitionedConsumerImpl is composed of ConsumerImpl. If the acknowledgement 
times out, they will send redeliverMessages repeatedly, MultiTopicsConsumerImpl 
has same problem.
    
    - add `hasParent_` field to whether there is a parent in ConsumerImpl
    - add unit test verify the redelivery request won't be sent repeatedly
    - add GTest header `gtest/gtest_prod.h` to access private members in unit 
tests
    - fix typo
    
    (cherry picked from commit 9894b99683ba8416f66ba369308dfaab08e06af3)
---
 pom.xml                                            |   1 +
 pulsar-client-cpp/.gitignore                       |   1 +
 pulsar-client-cpp/include/gtest/gtest_prod.h       |  60 +++++++
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  15 +-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   9 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |   4 +-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   6 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |   2 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   7 +-
 pulsar-client-cpp/lib/ReaderImpl.cc                |   2 +-
 .../lib/UnAckedMessageTrackerEnabled.h             |   4 +
 .../lib/UnAckedMessageTrackerInterface.h           |   2 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |   5 +-
 pulsar-client-cpp/tests/ConsumerTest.cc            | 198 ++++++++++++++++++++-
 pulsar-client-cpp/tests/PulsarFriend.h             |  19 +-
 15 files changed, 310 insertions(+), 25 deletions(-)

diff --git a/pom.xml b/pom.xml
index 8e71ad5..46f56f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1163,6 +1163,7 @@ flexible messaging model and an intuitive client 
API.</description>
             <exclude>logs/**</exclude>
             <exclude>**/circe/**</exclude>
             
<exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
+            <exclude>pulsar-client-cpp/include/gtest/gtest_prod.h</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
             <exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index 76efe8e..f031a55 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -54,6 +54,7 @@ lib*.so*
 .settings/
 .pydevproject
 .idea/
+.vs/
 *.cbp
 
 # doxygen files
diff --git a/pulsar-client-cpp/include/gtest/gtest_prod.h 
b/pulsar-client-cpp/include/gtest/gtest_prod.h
new file mode 100644
index 0000000..a06bc6f
--- /dev/null
+++ b/pulsar-client-cpp/include/gtest/gtest_prod.h
@@ -0,0 +1,60 @@
+// Copyright 2006, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+//     * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+//     * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+//
+// Google C++ Testing and Mocking Framework definitions useful in production 
code.
+// GOOGLETEST_CM0003 DO NOT DELETE
+
+#ifndef GTEST_INCLUDE_GTEST_GTEST_PROD_H_
+#define GTEST_INCLUDE_GTEST_GTEST_PROD_H_
+
+// When you need to test the private or protected members of a class,
+// use the FRIEND_TEST macro to declare your tests as friends of the
+// class.  For example:
+//
+// class MyClass {
+//  private:
+//   void PrivateMethod();
+//   FRIEND_TEST(MyClassTest, PrivateMethodWorks);
+// };
+//
+// class MyClassTest : public testing::Test {
+//   // ...
+// };
+//
+// TEST_F(MyClassTest, PrivateMethodWorks) {
+//   // Can call MyClass::PrivateMethod() here.
+// }
+//
+// Note: The test class must be in the same namespace as the class being 
tested.
+// For example, putting MyClassTest in an anonymous namespace will not work.
+
+#define FRIEND_TEST(test_case_name, test_name) friend class 
test_case_name##_##test_name##_Test
+
+#endif  // GTEST_INCLUDE_GTEST_GTEST_PROD_H_
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 806d14e..f6163d7 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
 ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& 
topic,
                            const std::string& subscription, const 
ConsumerConfiguration& conf,
                            const ExecutorServicePtr listenerExecutor /* = NULL 
by default */,
+                           bool hasParent /* = false by default */,
                            const ConsumerTopicType consumerTopicType /* = 
NonPartitioned by default */,
                            Commands::SubscriptionMode subscriptionMode, 
Optional<MessageId> startMessageId)
     : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), 
milliseconds(0))),
@@ -46,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       subscription_(subscription),
       originalSubscriptionName_(subscription),
       messageListener_(config_.getMessageListener()),
+      hasParent_(hasParent),
       consumerTopicType_(consumerTopicType),
       subscriptionMode_(subscriptionMode),
       startMessageId_(startMessageId),
@@ -563,7 +565,6 @@ void ConsumerImpl::internalListener() {
         // This will only happen when the connection got reset and we cleared 
the queue
         return;
     }
-    unAckedMessageTrackerPtr_->add(msg.getMessageId());
     try {
         consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
         lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -638,7 +639,6 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
         lock.unlock();
         messageProcessed(msg);
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
         callback(ResultOk, msg);
     } else {
         pendingReceives_.push(callback);
@@ -672,7 +672,6 @@ Result ConsumerImpl::receiveHelper(Message& msg) {
 
     incomingMessages_.pop(msg);
     messageProcessed(msg);
-    unAckedMessageTrackerPtr_->add(msg.getMessageId());
     return ResultOk;
 }
 
@@ -702,7 +701,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
 
     if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
         messageProcessed(msg);
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
         return ResultOk;
     } else {
         return ResultTimeout;
@@ -720,6 +718,7 @@ void ConsumerImpl::messageProcessed(Message& msg) {
     }
 
     increaseAvailablePermits(currentCnx);
+    trackMessage(msg);
 }
 
 /**
@@ -1232,4 +1231,12 @@ void 
ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
     negativeAcksTracker_.setEnabledForTesting(enabled);
 }
 
+void ConsumerImpl::trackMessage(const Message& msg) {
+    if (hasParent_) {
+        unAckedMessageTrackerPtr_->remove(msg.getMessageId());
+    } else {
+        unAckedMessageTrackerPtr_->add(msg.getMessageId());
+    }
+}
+
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index e122a4a..1029e2f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -66,7 +66,7 @@ class ConsumerImpl : public ConsumerImplBase,
    public:
     ConsumerImpl(const ClientImplPtr client, const std::string& topic, const 
std::string& subscription,
                  const ConsumerConfiguration&,
-                 const ExecutorServicePtr listenerExecutor = 
ExecutorServicePtr(),
+                 const ExecutorServicePtr listenerExecutor = 
ExecutorServicePtr(), bool hasParent = false,
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
                  Optional<MessageId> startMessageId = 
Optional<MessageId>::empty());
@@ -166,6 +166,7 @@ class ConsumerImpl : public ConsumerImplBase,
     void notifyPendingReceivedCallback(Result result, Message& message, const 
ReceiveCallback& callback);
     void failPendingReceiveCallback();
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
+    void trackMessage(const Message& msg);
 
     Optional<MessageId> clearReceiveQueue();
 
@@ -175,6 +176,7 @@ class ConsumerImpl : public ConsumerImplBase,
     std::string originalSubscriptionName_;
     MessageListener messageListener_;
     ExecutorServicePtr listenerExecutor_;
+    bool hasParent_;
     ConsumerTopicType consumerTopicType_;
 
     Commands::SubscriptionMode subscriptionMode_;
@@ -192,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase,
     bool messageListenerRunning_;
     std::mutex messageListenerMutex_;
     CompressionCodecProvider compressionCodecProvider_;
-    UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+    UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
     NegativeAcksTracker negativeAcksTracker_;
@@ -218,6 +220,9 @@ class ConsumerImpl : public ConsumerImplBase,
     // these two declared friend to access 
setNegativeAcknowledgeEnabledForTesting
     friend class MultiTopicsConsumerImpl;
     friend class PartitionedConsumerImpl;
+
+    FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
+    FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
 };
 
 } /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 85a9868..322140b 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -181,7 +181,7 @@ void 
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     if (numPartitions == 0) {
         // We don't have to add partition-n suffix
         consumer = std::make_shared<ConsumerImpl>(client_, 
topicName->toString(), subscriptionName_, config,
-                                                  internalListenerExecutor, 
NonPartitioned);
+                                                  internalListenerExecutor, 
true, NonPartitioned);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
             &MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(), std::placeholders::_1,
             std::placeholders::_2, partitionsNeedCreate, 
topicSubResultPromise));
@@ -193,7 +193,7 @@ void 
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
         for (int i = 0; i < numPartitions; i++) {
             std::string topicPartitionName = 
topicName->getTopicPartitionName(i);
             consumer = std::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_, config,
-                                                      
internalListenerExecutor, Partitioned);
+                                                      
internalListenerExecutor, true, Partitioned);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
                 &MultiTopicsConsumerImpl::handleSingleConsumerCreated, 
shared_from_this(),
                 std::placeholders::_1, std::placeholders::_2, 
partitionsNeedCreate, topicSubResultPromise));
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index b91cac1..717dc04 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -18,6 +18,7 @@
  */
 #ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
 #define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#include "gtest/gtest_prod.h"
 #include "ConsumerImpl.h"
 #include "ClientImpl.h"
 #include "BlockingQueue.h"
@@ -103,7 +104,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
     Promise<Result, ConsumerImplBaseWeakPtr> 
multiTopicsConsumerCreatedPromise_;
-    UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+    UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string>& topics_;
     std::queue<ReceiveCallback> pendingReceives_;
 
@@ -137,7 +138,10 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
 
    private:
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
+
+    FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
 };
 
+typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
 }  // namespace pulsar
 #endif  // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 6f5dbd2..3a43a0b 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -247,7 +247,7 @@ ConsumerImplPtr 
PartitionedConsumerImpl::newInternalConsumer(unsigned int partit
 
     std::string topicPartitionName = 
topicName_->getTopicPartitionName(partition);
     auto consumer = std::make_shared<ConsumerImpl>(client_, 
topicPartitionName, subscriptionName_, config,
-                                                   internalListenerExecutor_, 
Partitioned);
+                                                   internalListenerExecutor_, 
true, Partitioned);
 
     const auto shared_this = 
const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
     consumer->getConsumerCreatedFuture().addListener(std::bind(
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 7624d62..f90172e 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -18,6 +18,7 @@
  */
 #ifndef PULSAR_PARTITIONED_CONSUMER_HEADER
 #define PULSAR_PARTITIONED_CONSUMER_HEADER
+#include "gtest/gtest_prod.h"
 #include "ConsumerImpl.h"
 #include "ClientImpl.h"
 #include <vector>
@@ -118,11 +119,15 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     void failPendingReceiveCallback();
     virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
     Promise<Result, ConsumerImplBaseWeakPtr> 
partitionedConsumerCreatedPromise_;
-    UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+    UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     std::queue<ReceiveCallback> pendingReceives_;
     void runPartitionUpdateTask();
     void getPartitionMetadata();
     void handleGetPartitions(const Result result, const LookupDataResultPtr& 
lookupDataResult);
+
+    friend class PulsarFriend;
+
+    FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
 };
 typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
 typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc 
b/pulsar-client-cpp/lib/ReaderImpl.cc
index bcf707d..c4b6727 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -76,7 +76,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
     }
 
     consumer_ = std::make_shared<ConsumerImpl>(
-        client_.lock(), topic_, subscription, consumerConf, 
ExecutorServicePtr(), NonPartitioned,
+        client_.lock(), topic_, subscription, consumerConf, 
ExecutorServicePtr(), false, NonPartitioned,
         Commands::SubscriptionModeNonDurable, 
Optional<MessageId>::of(startMessageId));
     consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
     
consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 16933cc..36753dc 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -18,6 +18,7 @@
  */
 #ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
 #define LIB_UNACKEDMESSAGETRACKERENABLED_H_
+#include "gtest/gtest_prod.h"
 #include "lib/UnAckedMessageTrackerInterface.h"
 
 #include <mutex>
@@ -48,6 +49,9 @@ class UnAckedMessageTrackerEnabled : public 
UnAckedMessageTrackerInterface {
     ClientImplPtr client_;
     long timeoutMs_;
     long tickDurationInMs_;
+
+    FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
+    FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index a4e83e9..50fa72c 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -45,6 +45,6 @@ class UnAckedMessageTrackerInterface {
     virtual void removeTopicMessage(const std::string& topic) = 0;
 };
 
-typedef std::unique_ptr<UnAckedMessageTrackerInterface> 
UnAckedMessageTrackerScopedPtr;
+using UnAckedMessageTrackerPtr = 
std::shared_ptr<UnAckedMessageTrackerInterface>;
 }  // namespace pulsar
 #endif /* LIB_UNACKEDMESSAGETRACKERINTERFACE_H_ */
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e7fb1d8..d0b9fc1 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3825,7 +3825,7 @@ class UnAckedMessageTrackerEnabledMock : public 
UnAckedMessageTrackerEnabled {
     long size() { return UnAckedMessageTrackerEnabled::size(); }
 };  // class UnAckedMessageTrackerEnabledMock
 
-TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerDefaultBehavior) {
     ConsumerConfiguration configConsumer;
     ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
     ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);
@@ -4002,7 +4002,7 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
     ASSERT_FALSE(tracker->isEmpty());
 
-    std::this_thread::sleep_for(std::chrono::seconds(2));
+    std::this_thread::sleep_for(std::chrono::seconds(4));
     ASSERT_EQ(0, tracker->size());
     ASSERT_TRUE(tracker->isEmpty());
     consumer.close();
@@ -4012,6 +4012,7 @@ TEST(BasicEndToEndTest, 
testUnAckedMessageTrackerEnabledCumulativeAck) {
     for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
         Message msg;
         ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
     }
     Message msg;
     auto ret = consumer.receive(msg, 1000);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc 
b/pulsar-client-cpp/tests/ConsumerTest.cc
index f0df238..2278c05 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -16,21 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/Client.h>
-#include <gtest/gtest.h>
+#include <chrono>
+#include <thread>
 #include <time.h>
 #include <set>
 
-#include "../lib/Future.h"
-#include "../lib/Utils.h"
+#include "gtest/gtest.h"
 
+#include "pulsar/Client.h"
+#include "PulsarFriend.h"
+#include "lib/Future.h"
+#include "lib/Utils.h"
+#include "lib/LogUtils.h"
+#include "lib/PartitionedConsumerImpl.h"
+#include "lib/MultiTopicsConsumerImpl.h"
 #include "HttpHelper.h"
 
-using namespace pulsar;
-
 static const std::string lookupUrl = "pulsar://localhost:6650";
 static const std::string adminUrl = "http://localhost:8080/";;
 
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
 TEST(ConsumerTest, consumerNotInitialized) {
     Consumer consumer;
 
@@ -160,3 +168,181 @@ TEST(ConsumerTest, testPartitionIndex) {
 
     client.close();
 }
+
+TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
+    Client client(lookupUrl);
+    const std::string partitionedTopic =
+        "testPartitionedConsumerUnAckedMessageRedelivery" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-partition-consumer-un-acked-msg-redelivery";
+    constexpr int numPartitions = 3;
+    constexpr int numOfMessages = 15;
+    constexpr int unAckedMessagesTimeoutMs = 10000;
+    constexpr int tickDurationInMs = 1000;
+
+    int res =
+        makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + 
partitionedTopic + "/partitions",
+                       std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    consumerConfig.setTickDurationInMs(tickDurationInMs);
+    ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, 
consumerConfig, consumer));
+    PartitionedConsumerImplPtr partitionedConsumerImplPtr =
+        PulsarFriend::getPartitionedConsumerImplPtr(consumer);
+    ASSERT_EQ(numPartitions, partitionedConsumerImplPtr->consumers_.size());
+
+    // send messages
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(false);
+    producerConfig.setBlockIfQueueFull(true);
+    
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, 
producerConfig, producer));
+    std::string prefix = "message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + std::to_string(i);
+        Message msg = MessageBuilder().setContent(messageContent).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+    producer.close();
+
+    // receive message and don't acknowledge
+    std::set<MessageId> messageIds[numPartitions];
+    for (auto i = 0; i < numOfMessages; ++i) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+
+        MessageId msgId = msg.getMessageId();
+        int32_t partitionIndex = msgId.partition();
+        ASSERT_TRUE(partitionIndex < numPartitions);
+        messageIds[msgId.partition()].emplace(msgId);
+    }
+
+    auto partitionedTracker = static_cast<UnAckedMessageTrackerEnabled*>(
+        partitionedConsumerImplPtr->unAckedMessageTrackerPtr_.get());
+    ASSERT_EQ(numOfMessages, partitionedTracker->size());
+    ASSERT_FALSE(partitionedTracker->isEmpty());
+    for (auto i = 0; i < numPartitions; i++) {
+        ASSERT_EQ(numOfMessages / numPartitions, messageIds[i].size());
+        auto subConsumerPtr = partitionedConsumerImplPtr->consumers_[i];
+        auto tracker =
+            
static_cast<UnAckedMessageTrackerEnabled*>(subConsumerPtr->unAckedMessageTrackerPtr_.get());
+        ASSERT_EQ(0, tracker->size());
+        ASSERT_TRUE(tracker->isEmpty());
+    }
+
+    // timeout and send redeliver message
+    
std::this_thread::sleep_for(std::chrono::milliseconds(unAckedMessagesTimeoutMs 
+ tickDurationInMs * 2));
+    ASSERT_EQ(0, partitionedTracker->size());
+    ASSERT_TRUE(partitionedTracker->isEmpty());
+
+    for (auto i = 0; i < numOfMessages; ++i) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        ASSERT_EQ(1, partitionedTracker->size());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
+        ASSERT_EQ(0, partitionedTracker->size());
+    }
+    ASSERT_EQ(0, partitionedTracker->size());
+    ASSERT_TRUE(partitionedTracker->isEmpty());
+    partitionedTracker = NULL;
+
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << 
msg.getMessageId();
+    consumer.close();
+    client.close();
+}
+
+TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
+    Client client(lookupUrl);
+    const std::string nonPartitionedTopic =
+        "testMultiTopicsConsumerUnAckedMessageRedelivery-topic-" + 
std::to_string(time(nullptr));
+    const std::string partitionedTopic1 =
+        "testMultiTopicsConsumerUnAckedMessageRedelivery-par-topic1-" + 
std::to_string(time(nullptr));
+    const std::string partitionedTopic2 =
+        "testMultiTopicsConsumerUnAckedMessageRedelivery-par-topic2-" + 
std::to_string(time(nullptr));
+    std::string subName = "sub-multi-topics-consumer-un-acked-msg-redelivery";
+    constexpr int numPartitions = 3;
+    constexpr int numOfMessages = 15;
+    constexpr int unAckedMessagesTimeoutMs = 10000;
+    constexpr int tickDurationInMs = 1000;
+
+    int res = makePutRequest(
+        adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic1 + 
"/partitions", "1");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + 
partitionedTopic2 + "/partitions",
+                         std::to_string(numPartitions));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+    consumerConfig.setTickDurationInMs(tickDurationInMs);
+    const std::vector<std::string> topics = {nonPartitionedTopic, 
partitionedTopic1, partitionedTopic2};
+    ASSERT_EQ(ResultOk, client.subscribe(topics, subName, consumerConfig, 
consumer));
+    MultiTopicsConsumerImplPtr multiTopicsConsumerImplPtr =
+        PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+    ASSERT_EQ(numPartitions + 2 /* nonPartitionedTopic + partitionedTopic1 */,
+              multiTopicsConsumerImplPtr->consumers_.size());
+
+    // send messages
+    auto sendMessageToTopic = [&client](const std::string& topic) {
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+        Message msg = MessageBuilder().setContent("hello").build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    };
+    for (int i = 0; i < numOfMessages; i++) {
+        sendMessageToTopic(nonPartitionedTopic);
+        sendMessageToTopic(partitionedTopic1);
+        sendMessageToTopic(partitionedTopic2);
+    }
+
+    // receive message and don't acknowledge
+    for (auto i = 0; i < numOfMessages * 3; ++i) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        MessageId msgId = msg.getMessageId();
+    }
+
+    auto multiTopicsTracker = static_cast<UnAckedMessageTrackerEnabled*>(
+        multiTopicsConsumerImplPtr->unAckedMessageTrackerPtr_.get());
+    ASSERT_EQ(numOfMessages * 3, multiTopicsTracker->size());
+    ASSERT_FALSE(multiTopicsTracker->isEmpty());
+    for (auto iter = multiTopicsConsumerImplPtr->consumers_.begin();
+         iter != multiTopicsConsumerImplPtr->consumers_.end(); ++iter) {
+        auto subConsumerPtr = iter->second;
+        auto tracker =
+            
static_cast<UnAckedMessageTrackerEnabled*>(subConsumerPtr->unAckedMessageTrackerPtr_.get());
+        ASSERT_EQ(0, tracker->size());
+        ASSERT_TRUE(tracker->isEmpty());
+    }
+
+    // timeout and send redeliver message
+    
std::this_thread::sleep_for(std::chrono::milliseconds(unAckedMessagesTimeoutMs 
+ tickDurationInMs * 2));
+    ASSERT_EQ(0, multiTopicsTracker->size());
+    ASSERT_TRUE(multiTopicsTracker->isEmpty());
+
+    for (auto i = 0; i < numOfMessages * 3; ++i) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        ASSERT_EQ(1, multiTopicsTracker->size());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
+        ASSERT_EQ(0, multiTopicsTracker->size());
+    }
+    ASSERT_EQ(0, multiTopicsTracker->size());
+    ASSERT_TRUE(multiTopicsTracker->isEmpty());
+    multiTopicsTracker = NULL;
+
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << 
msg.getMessageId();
+    consumer.close();
+    client.close();
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h 
b/pulsar-client-cpp/tests/PulsarFriend.h
index 87b48d2..9f6ba51 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -17,12 +17,15 @@
  * under the License.
  */
 
-#include <lib/ProducerImpl.h>
-#include <lib/PartitionedProducerImpl.h>
-#include <lib/ConsumerImpl.h>
-#include <lib/ClientImpl.h>
 #include <string>
 
+#include "lib/ClientImpl.h"
+#include "lib/ProducerImpl.h"
+#include "lib/PartitionedProducerImpl.h"
+#include "lib/ConsumerImpl.h"
+#include "lib/PartitionedConsumerImpl.h"
+#include "lib/MultiTopicsConsumerImpl.h"
+
 using std::string;
 
 namespace pulsar {
@@ -86,6 +89,14 @@ class PulsarFriend {
         return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
     }
 
+    static std::shared_ptr<PartitionedConsumerImpl> 
getPartitionedConsumerImplPtr(Consumer consumer) {
+        return 
std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
+    }
+
+    static std::shared_ptr<MultiTopicsConsumerImpl> 
getMultiTopicsConsumerImplPtr(Consumer consumer) {
+        return 
std::static_pointer_cast<MultiTopicsConsumerImpl>(consumer.impl_);
+    }
+
     static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) { 
return client.impl_; }
 
     static void setNegativeAckEnabled(Consumer consumer, bool enabled) {

Reply via email to