This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 0721dab [C++] Fix Consumer send redeliverMessages repeatedly (#9072)
0721dab is described below
commit 0721dab1a5a9a0384ed5d71894eecea9671fdc9a
Author: sijianliang <[email protected]>
AuthorDate: Wed Jan 6 18:57:40 2021 +0800
[C++] Fix Consumer send redeliverMessages repeatedly (#9072)
Fixes #9028
Both PartitionedConsumerImpl and ConsumerImpl have member variable
unAckedMessageTrackerPtr_ (class UnAckedMessageTrackerEnabled), and
PartitionedConsumerImpl is composed of ConsumerImpl. If the acknowledgement
times out, they will send redeliverMessages repeatedly, MultiTopicsConsumerImpl
has same problem.
- add `hasParent_` field to whether there is a parent in ConsumerImpl
- add unit test verify the redelivery request won't be sent repeatedly
- add GTest header `gtest/gtest_prod.h` to access private members in unit
tests
- fix typo
(cherry picked from commit 9894b99683ba8416f66ba369308dfaab08e06af3)
---
pom.xml | 1 +
pulsar-client-cpp/.gitignore | 1 +
pulsar-client-cpp/include/gtest/gtest_prod.h | 60 +++++++
pulsar-client-cpp/lib/ConsumerImpl.cc | 15 +-
pulsar-client-cpp/lib/ConsumerImpl.h | 9 +-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 +-
pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 6 +-
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 2 +-
pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 7 +-
pulsar-client-cpp/lib/ReaderImpl.cc | 2 +-
.../lib/UnAckedMessageTrackerEnabled.h | 4 +
.../lib/UnAckedMessageTrackerInterface.h | 2 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 5 +-
pulsar-client-cpp/tests/ConsumerTest.cc | 198 ++++++++++++++++++++-
pulsar-client-cpp/tests/PulsarFriend.h | 19 +-
15 files changed, 310 insertions(+), 25 deletions(-)
diff --git a/pom.xml b/pom.xml
index 8e71ad5..46f56f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1163,6 +1163,7 @@ flexible messaging model and an intuitive client
API.</description>
<exclude>logs/**</exclude>
<exclude>**/circe/**</exclude>
<exclude>pulsar-broker/src/test/resources/authentication/basic/.htpasswd</exclude>
+ <exclude>pulsar-client-cpp/include/gtest/gtest_prod.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude>
<exclude>pulsar-client-cpp/lib/checksum/gf2.hpp</exclude>
<exclude>pulsar-client-cpp/lib/checksum/crc32c_sse42.cc</exclude>
diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index 76efe8e..f031a55 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -54,6 +54,7 @@ lib*.so*
.settings/
.pydevproject
.idea/
+.vs/
*.cbp
# doxygen files
diff --git a/pulsar-client-cpp/include/gtest/gtest_prod.h
b/pulsar-client-cpp/include/gtest/gtest_prod.h
new file mode 100644
index 0000000..a06bc6f
--- /dev/null
+++ b/pulsar-client-cpp/include/gtest/gtest_prod.h
@@ -0,0 +1,60 @@
+// Copyright 2006, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+//
+// Google C++ Testing and Mocking Framework definitions useful in production
code.
+// GOOGLETEST_CM0003 DO NOT DELETE
+
+#ifndef GTEST_INCLUDE_GTEST_GTEST_PROD_H_
+#define GTEST_INCLUDE_GTEST_GTEST_PROD_H_
+
+// When you need to test the private or protected members of a class,
+// use the FRIEND_TEST macro to declare your tests as friends of the
+// class. For example:
+//
+// class MyClass {
+// private:
+// void PrivateMethod();
+// FRIEND_TEST(MyClassTest, PrivateMethodWorks);
+// };
+//
+// class MyClassTest : public testing::Test {
+// // ...
+// };
+//
+// TEST_F(MyClassTest, PrivateMethodWorks) {
+// // Can call MyClass::PrivateMethod() here.
+// }
+//
+// Note: The test class must be in the same namespace as the class being
tested.
+// For example, putting MyClassTest in an anonymous namespace will not work.
+
+#define FRIEND_TEST(test_case_name, test_name) friend class
test_case_name##_##test_name##_Test
+
+#endif // GTEST_INCLUDE_GTEST_GTEST_PROD_H_
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 806d14e..f6163d7 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -38,6 +38,7 @@ DECLARE_LOG_OBJECT()
ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string&
topic,
const std::string& subscription, const
ConsumerConfiguration& conf,
const ExecutorServicePtr listenerExecutor /* = NULL
by default */,
+ bool hasParent /* = false by default */,
const ConsumerTopicType consumerTopicType /* =
NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode,
Optional<MessageId> startMessageId)
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60),
milliseconds(0))),
@@ -46,6 +47,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
subscription_(subscription),
originalSubscriptionName_(subscription),
messageListener_(config_.getMessageListener()),
+ hasParent_(hasParent),
consumerTopicType_(consumerTopicType),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId),
@@ -563,7 +565,6 @@ void ConsumerImpl::internalListener() {
// This will only happen when the connection got reset and we cleared
the queue
return;
}
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
try {
consumerStatsBasePtr_->receivedMessage(msg, ResultOk);
lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
@@ -638,7 +639,6 @@ void ConsumerImpl::receiveAsync(ReceiveCallback& callback) {
if (incomingMessages_.pop(msg, std::chrono::milliseconds(0))) {
lock.unlock();
messageProcessed(msg);
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
callback(ResultOk, msg);
} else {
pendingReceives_.push(callback);
@@ -672,7 +672,6 @@ Result ConsumerImpl::receiveHelper(Message& msg) {
incomingMessages_.pop(msg);
messageProcessed(msg);
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
}
@@ -702,7 +701,6 @@ Result ConsumerImpl::receiveHelper(Message& msg, int
timeout) {
if (incomingMessages_.pop(msg, std::chrono::milliseconds(timeout))) {
messageProcessed(msg);
- unAckedMessageTrackerPtr_->add(msg.getMessageId());
return ResultOk;
} else {
return ResultTimeout;
@@ -720,6 +718,7 @@ void ConsumerImpl::messageProcessed(Message& msg) {
}
increaseAvailablePermits(currentCnx);
+ trackMessage(msg);
}
/**
@@ -1232,4 +1231,12 @@ void
ConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
negativeAcksTracker_.setEnabledForTesting(enabled);
}
+void ConsumerImpl::trackMessage(const Message& msg) {
+ if (hasParent_) {
+ unAckedMessageTrackerPtr_->remove(msg.getMessageId());
+ } else {
+ unAckedMessageTrackerPtr_->add(msg.getMessageId());
+ }
+}
+
} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h
b/pulsar-client-cpp/lib/ConsumerImpl.h
index e122a4a..1029e2f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -66,7 +66,7 @@ class ConsumerImpl : public ConsumerImplBase,
public:
ConsumerImpl(const ClientImplPtr client, const std::string& topic, const
std::string& subscription,
const ConsumerConfiguration&,
- const ExecutorServicePtr listenerExecutor =
ExecutorServicePtr(),
+ const ExecutorServicePtr listenerExecutor =
ExecutorServicePtr(), bool hasParent = false,
const ConsumerTopicType consumerTopicType = NonPartitioned,
Commands::SubscriptionMode =
Commands::SubscriptionModeDurable,
Optional<MessageId> startMessageId =
Optional<MessageId>::empty());
@@ -166,6 +166,7 @@ class ConsumerImpl : public ConsumerImplBase,
void notifyPendingReceivedCallback(Result result, Message& message, const
ReceiveCallback& callback);
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
+ void trackMessage(const Message& msg);
Optional<MessageId> clearReceiveQueue();
@@ -175,6 +176,7 @@ class ConsumerImpl : public ConsumerImplBase,
std::string originalSubscriptionName_;
MessageListener messageListener_;
ExecutorServicePtr listenerExecutor_;
+ bool hasParent_;
ConsumerTopicType consumerTopicType_;
Commands::SubscriptionMode subscriptionMode_;
@@ -192,7 +194,7 @@ class ConsumerImpl : public ConsumerImplBase,
bool messageListenerRunning_;
std::mutex messageListenerMutex_;
CompressionCodecProvider compressionCodecProvider_;
- UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+ UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
@@ -218,6 +220,9 @@ class ConsumerImpl : public ConsumerImplBase,
// these two declared friend to access
setNegativeAcknowledgeEnabledForTesting
friend class MultiTopicsConsumerImpl;
friend class PartitionedConsumerImpl;
+
+ FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
+ FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};
} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 85a9868..322140b 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -181,7 +181,7 @@ void
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
if (numPartitions == 0) {
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client_,
topicName->toString(), subscriptionName_, config,
- internalListenerExecutor,
NonPartitioned);
+ internalListenerExecutor,
true, NonPartitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(), std::placeholders::_1,
std::placeholders::_2, partitionsNeedCreate,
topicSubResultPromise));
@@ -193,7 +193,7 @@ void
MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName =
topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_, config,
-
internalListenerExecutor, Partitioned);
+
internalListenerExecutor, true, Partitioned);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated,
shared_from_this(),
std::placeholders::_1, std::placeholders::_2,
partitionsNeedCreate, topicSubResultPromise));
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index b91cac1..717dc04 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -18,6 +18,7 @@
*/
#ifndef PULSAR_MULTI_TOPICS_CONSUMER_HEADER
#define PULSAR_MULTI_TOPICS_CONSUMER_HEADER
+#include "gtest/gtest_prod.h"
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include "BlockingQueue.h"
@@ -103,7 +104,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
ExecutorServicePtr listenerExecutor_;
MessageListener messageListener_;
Promise<Result, ConsumerImplBaseWeakPtr>
multiTopicsConsumerCreatedPromise_;
- UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+ UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string>& topics_;
std::queue<ReceiveCallback> pendingReceives_;
@@ -137,7 +138,10 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
private:
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
+
+ FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};
+typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
} // namespace pulsar
#endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 6f5dbd2..3a43a0b 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -247,7 +247,7 @@ ConsumerImplPtr
PartitionedConsumerImpl::newInternalConsumer(unsigned int partit
std::string topicPartitionName =
topicName_->getTopicPartitionName(partition);
auto consumer = std::make_shared<ConsumerImpl>(client_,
topicPartitionName, subscriptionName_, config,
- internalListenerExecutor_,
Partitioned);
+ internalListenerExecutor_,
true, Partitioned);
const auto shared_this =
const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
consumer->getConsumerCreatedFuture().addListener(std::bind(
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 7624d62..f90172e 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -18,6 +18,7 @@
*/
#ifndef PULSAR_PARTITIONED_CONSUMER_HEADER
#define PULSAR_PARTITIONED_CONSUMER_HEADER
+#include "gtest/gtest_prod.h"
#include "ConsumerImpl.h"
#include "ClientImpl.h"
#include <vector>
@@ -118,11 +119,15 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
void failPendingReceiveCallback();
virtual void setNegativeAcknowledgeEnabledForTesting(bool enabled);
Promise<Result, ConsumerImplBaseWeakPtr>
partitionedConsumerCreatedPromise_;
- UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
+ UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
std::queue<ReceiveCallback> pendingReceives_;
void runPartitionUpdateTask();
void getPartitionMetadata();
void handleGetPartitions(const Result result, const LookupDataResultPtr&
lookupDataResult);
+
+ friend class PulsarFriend;
+
+ FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
};
typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc
b/pulsar-client-cpp/lib/ReaderImpl.cc
index bcf707d..c4b6727 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -76,7 +76,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
}
consumer_ = std::make_shared<ConsumerImpl>(
- client_.lock(), topic_, subscription, consumerConf,
ExecutorServicePtr(), NonPartitioned,
+ client_.lock(), topic_, subscription, consumerConf,
ExecutorServicePtr(), false, NonPartitioned,
Commands::SubscriptionModeNonDurable,
Optional<MessageId>::of(startMessageId));
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_->getConsumerCreatedFuture().addListener(std::bind(&ReaderImpl::handleConsumerCreated,
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 16933cc..36753dc 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -18,6 +18,7 @@
*/
#ifndef LIB_UNACKEDMESSAGETRACKERENABLED_H_
#define LIB_UNACKEDMESSAGETRACKERENABLED_H_
+#include "gtest/gtest_prod.h"
#include "lib/UnAckedMessageTrackerInterface.h"
#include <mutex>
@@ -48,6 +49,9 @@ class UnAckedMessageTrackerEnabled : public
UnAckedMessageTrackerInterface {
ClientImplPtr client_;
long timeoutMs_;
long tickDurationInMs_;
+
+ FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
+ FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index a4e83e9..50fa72c 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -45,6 +45,6 @@ class UnAckedMessageTrackerInterface {
virtual void removeTopicMessage(const std::string& topic) = 0;
};
-typedef std::unique_ptr<UnAckedMessageTrackerInterface>
UnAckedMessageTrackerScopedPtr;
+using UnAckedMessageTrackerPtr =
std::shared_ptr<UnAckedMessageTrackerInterface>;
} // namespace pulsar
#endif /* LIB_UNACKEDMESSAGETRACKERINTERFACE_H_ */
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e7fb1d8..d0b9fc1 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3825,7 +3825,7 @@ class UnAckedMessageTrackerEnabledMock : public
UnAckedMessageTrackerEnabled {
long size() { return UnAckedMessageTrackerEnabled::size(); }
}; // class UnAckedMessageTrackerEnabledMock
-TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerDefaultBehavior) {
ConsumerConfiguration configConsumer;
ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);
@@ -4002,7 +4002,7 @@ TEST(BasicEndToEndTest,
testUnAckedMessageTrackerEnabledCumulativeAck) {
ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
ASSERT_FALSE(tracker->isEmpty());
- std::this_thread::sleep_for(std::chrono::seconds(2));
+ std::this_thread::sleep_for(std::chrono::seconds(4));
ASSERT_EQ(0, tracker->size());
ASSERT_TRUE(tracker->isEmpty());
consumer.close();
@@ -4012,6 +4012,7 @@ TEST(BasicEndToEndTest,
testUnAckedMessageTrackerEnabledCumulativeAck) {
for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
}
Message msg;
auto ret = consumer.receive(msg, 1000);
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index f0df238..2278c05 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -16,21 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-#include <pulsar/Client.h>
-#include <gtest/gtest.h>
+#include <chrono>
+#include <thread>
#include <time.h>
#include <set>
-#include "../lib/Future.h"
-#include "../lib/Utils.h"
+#include "gtest/gtest.h"
+#include "pulsar/Client.h"
+#include "PulsarFriend.h"
+#include "lib/Future.h"
+#include "lib/Utils.h"
+#include "lib/LogUtils.h"
+#include "lib/PartitionedConsumerImpl.h"
+#include "lib/MultiTopicsConsumerImpl.h"
#include "HttpHelper.h"
-using namespace pulsar;
-
static const std::string lookupUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+
TEST(ConsumerTest, consumerNotInitialized) {
Consumer consumer;
@@ -160,3 +168,181 @@ TEST(ConsumerTest, testPartitionIndex) {
client.close();
}
+
+TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
+ Client client(lookupUrl);
+ const std::string partitionedTopic =
+ "testPartitionedConsumerUnAckedMessageRedelivery" +
std::to_string(time(nullptr));
+ std::string subName = "sub-partition-consumer-un-acked-msg-redelivery";
+ constexpr int numPartitions = 3;
+ constexpr int numOfMessages = 15;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+
+ int res =
+ makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ consumerConfig.setTickDurationInMs(tickDurationInMs);
+ ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName,
consumerConfig, consumer));
+ PartitionedConsumerImplPtr partitionedConsumerImplPtr =
+ PulsarFriend::getPartitionedConsumerImplPtr(consumer);
+ ASSERT_EQ(numPartitions, partitionedConsumerImplPtr->consumers_.size());
+
+ // send messages
+ ProducerConfiguration producerConfig;
+ producerConfig.setBatchingEnabled(false);
+ producerConfig.setBlockIfQueueFull(true);
+
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic,
producerConfig, producer));
+ std::string prefix = "message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix + std::to_string(i);
+ Message msg = MessageBuilder().setContent(messageContent).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+ producer.close();
+
+ // receive message and don't acknowledge
+ std::set<MessageId> messageIds[numPartitions];
+ for (auto i = 0; i < numOfMessages; ++i) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+
+ MessageId msgId = msg.getMessageId();
+ int32_t partitionIndex = msgId.partition();
+ ASSERT_TRUE(partitionIndex < numPartitions);
+ messageIds[msgId.partition()].emplace(msgId);
+ }
+
+ auto partitionedTracker = static_cast<UnAckedMessageTrackerEnabled*>(
+ partitionedConsumerImplPtr->unAckedMessageTrackerPtr_.get());
+ ASSERT_EQ(numOfMessages, partitionedTracker->size());
+ ASSERT_FALSE(partitionedTracker->isEmpty());
+ for (auto i = 0; i < numPartitions; i++) {
+ ASSERT_EQ(numOfMessages / numPartitions, messageIds[i].size());
+ auto subConsumerPtr = partitionedConsumerImplPtr->consumers_[i];
+ auto tracker =
+
static_cast<UnAckedMessageTrackerEnabled*>(subConsumerPtr->unAckedMessageTrackerPtr_.get());
+ ASSERT_EQ(0, tracker->size());
+ ASSERT_TRUE(tracker->isEmpty());
+ }
+
+ // timeout and send redeliver message
+
std::this_thread::sleep_for(std::chrono::milliseconds(unAckedMessagesTimeoutMs
+ tickDurationInMs * 2));
+ ASSERT_EQ(0, partitionedTracker->size());
+ ASSERT_TRUE(partitionedTracker->isEmpty());
+
+ for (auto i = 0; i < numOfMessages; ++i) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ ASSERT_EQ(1, partitionedTracker->size());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
+ ASSERT_EQ(0, partitionedTracker->size());
+ }
+ ASSERT_EQ(0, partitionedTracker->size());
+ ASSERT_TRUE(partitionedTracker->isEmpty());
+ partitionedTracker = NULL;
+
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " <<
msg.getMessageId();
+ consumer.close();
+ client.close();
+}
+
+TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
+ Client client(lookupUrl);
+ const std::string nonPartitionedTopic =
+ "testMultiTopicsConsumerUnAckedMessageRedelivery-topic-" +
std::to_string(time(nullptr));
+ const std::string partitionedTopic1 =
+ "testMultiTopicsConsumerUnAckedMessageRedelivery-par-topic1-" +
std::to_string(time(nullptr));
+ const std::string partitionedTopic2 =
+ "testMultiTopicsConsumerUnAckedMessageRedelivery-par-topic2-" +
std::to_string(time(nullptr));
+ std::string subName = "sub-multi-topics-consumer-un-acked-msg-redelivery";
+ constexpr int numPartitions = 3;
+ constexpr int numOfMessages = 15;
+ constexpr int unAckedMessagesTimeoutMs = 10000;
+ constexpr int tickDurationInMs = 1000;
+
+ int res = makePutRequest(
+ adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic1 +
"/partitions", "1");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+ res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" +
partitionedTopic2 + "/partitions",
+ std::to_string(numPartitions));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfig;
+ consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
+ consumerConfig.setTickDurationInMs(tickDurationInMs);
+ const std::vector<std::string> topics = {nonPartitionedTopic,
partitionedTopic1, partitionedTopic2};
+ ASSERT_EQ(ResultOk, client.subscribe(topics, subName, consumerConfig,
consumer));
+ MultiTopicsConsumerImplPtr multiTopicsConsumerImplPtr =
+ PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+ ASSERT_EQ(numPartitions + 2 /* nonPartitionedTopic + partitionedTopic1 */,
+ multiTopicsConsumerImplPtr->consumers_.size());
+
+ // send messages
+ auto sendMessageToTopic = [&client](const std::string& topic) {
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ Message msg = MessageBuilder().setContent("hello").build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ };
+ for (int i = 0; i < numOfMessages; i++) {
+ sendMessageToTopic(nonPartitionedTopic);
+ sendMessageToTopic(partitionedTopic1);
+ sendMessageToTopic(partitionedTopic2);
+ }
+
+ // receive message and don't acknowledge
+ for (auto i = 0; i < numOfMessages * 3; ++i) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ MessageId msgId = msg.getMessageId();
+ }
+
+ auto multiTopicsTracker = static_cast<UnAckedMessageTrackerEnabled*>(
+ multiTopicsConsumerImplPtr->unAckedMessageTrackerPtr_.get());
+ ASSERT_EQ(numOfMessages * 3, multiTopicsTracker->size());
+ ASSERT_FALSE(multiTopicsTracker->isEmpty());
+ for (auto iter = multiTopicsConsumerImplPtr->consumers_.begin();
+ iter != multiTopicsConsumerImplPtr->consumers_.end(); ++iter) {
+ auto subConsumerPtr = iter->second;
+ auto tracker =
+
static_cast<UnAckedMessageTrackerEnabled*>(subConsumerPtr->unAckedMessageTrackerPtr_.get());
+ ASSERT_EQ(0, tracker->size());
+ ASSERT_TRUE(tracker->isEmpty());
+ }
+
+ // timeout and send redeliver message
+
std::this_thread::sleep_for(std::chrono::milliseconds(unAckedMessagesTimeoutMs
+ tickDurationInMs * 2));
+ ASSERT_EQ(0, multiTopicsTracker->size());
+ ASSERT_TRUE(multiTopicsTracker->isEmpty());
+
+ for (auto i = 0; i < numOfMessages * 3; ++i) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ ASSERT_EQ(1, multiTopicsTracker->size());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msg.getMessageId()));
+ ASSERT_EQ(0, multiTopicsTracker->size());
+ }
+ ASSERT_EQ(0, multiTopicsTracker->size());
+ ASSERT_TRUE(multiTopicsTracker->isEmpty());
+ multiTopicsTracker = NULL;
+
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " <<
msg.getMessageId();
+ consumer.close();
+ client.close();
+}
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h
b/pulsar-client-cpp/tests/PulsarFriend.h
index 87b48d2..9f6ba51 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -17,12 +17,15 @@
* under the License.
*/
-#include <lib/ProducerImpl.h>
-#include <lib/PartitionedProducerImpl.h>
-#include <lib/ConsumerImpl.h>
-#include <lib/ClientImpl.h>
#include <string>
+#include "lib/ClientImpl.h"
+#include "lib/ProducerImpl.h"
+#include "lib/PartitionedProducerImpl.h"
+#include "lib/ConsumerImpl.h"
+#include "lib/PartitionedConsumerImpl.h"
+#include "lib/MultiTopicsConsumerImpl.h"
+
using std::string;
namespace pulsar {
@@ -86,6 +89,14 @@ class PulsarFriend {
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}
+ static std::shared_ptr<PartitionedConsumerImpl>
getPartitionedConsumerImplPtr(Consumer consumer) {
+ return
std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
+ }
+
+ static std::shared_ptr<MultiTopicsConsumerImpl>
getMultiTopicsConsumerImplPtr(Consumer consumer) {
+ return
std::static_pointer_cast<MultiTopicsConsumerImpl>(consumer.impl_);
+ }
+
static std::shared_ptr<ClientImpl> getClientImplPtr(Client client) {
return client.impl_; }
static void setNegativeAckEnabled(Consumer consumer, bool enabled) {