This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new d9182b9 [C++] Fix cpp client do AcknowledgeCumulative not clean up
previous message (#8606)
d9182b9 is described below
commit d9182b9c66c6569d17e593e54a6184808b1d45c4
Author: sijianliang <[email protected]>
AuthorDate: Wed Dec 30 18:35:58 2020 +0800
[C++] Fix cpp client do AcknowledgeCumulative not clean up previous message
(#8606)
### Motivation
pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`,
not <= `msgId` in `UnAckedMessageTrackerEnabled::removeMessagesTill`
### Modifications
- When do AcknowledgeCumulative from application, earse <= `msgId` in
UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged
messages to Broker
- add unit test for `UnAckedMessageTrackerEnabled`
(cherry picked from commit e75de488679ccee30a6cf99edc8e7779112fb786)
---
.../lib/UnAckedMessageTrackerEnabled.cc | 24 +--
.../lib/UnAckedMessageTrackerEnabled.h | 2 +-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 205 +++++++++++++++++++++
3 files changed, 218 insertions(+), 13 deletions(-)
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index e280dba..96dfd6c 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -113,6 +113,7 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId&
m) {
std::map<MessageId, std::set<MessageId>&>::iterator exist =
messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
+ messageIdPartitionMap.erase(exist);
}
return removed;
}
@@ -124,13 +125,13 @@ long UnAckedMessageTrackerEnabled::size() {
void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
- for (auto it = messageIdPartitionMap.begin(); it !=
messageIdPartitionMap.end(); it++) {
+ for (auto it = messageIdPartitionMap.begin(); it !=
messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
- if (msgIdInMap < msgId) {
- std::map<MessageId, std::set<MessageId>&>::iterator exist =
messageIdPartitionMap.find(msgId);
- if (exist != messageIdPartitionMap.end()) {
- exist->second.erase(msgId);
- }
+ if (msgIdInMap <= msgId) {
+ it->second.erase(msgIdInMap);
+ messageIdPartitionMap.erase(it++);
+ } else {
+ it++;
}
}
}
@@ -138,14 +139,13 @@ void
UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic,
should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string&
topic) {
std::lock_guard<std::mutex> acquire(lock_);
- for (auto it = messageIdPartitionMap.begin(); it !=
messageIdPartitionMap.end(); it++) {
+ for (auto it = messageIdPartitionMap.begin(); it !=
messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
- std::map<MessageId, std::set<MessageId>&>::iterator exist =
- messageIdPartitionMap.find(msgIdInMap);
- if (exist != messageIdPartitionMap.end()) {
- exist->second.erase(msgIdInMap);
- }
+ it->second.erase(msgIdInMap);
+ messageIdPartitionMap.erase(it++);
+ } else {
+ it++;
}
}
}
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 9195b30..16933cc 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -36,7 +36,7 @@ class UnAckedMessageTrackerEnabled : public
UnAckedMessageTrackerInterface {
void clear();
- private:
+ protected:
void timeoutHandlerHelper();
bool isEmpty();
long size();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index a812657..8c0a1ed 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3828,3 +3828,208 @@ TEST(BasicEndToEndTest,
testAckGroupingTrackerEnabledCumulativeAck) {
ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " <<
msg.getMessageId();
}
+
+class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
+ public:
+ UnAckedMessageTrackerEnabledMock(long timeoutMs, const ClientImplPtr
client, ConsumerImplBase &consumer)
+ : UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer)
{}
+ const long getUnAckedMessagesTimeoutMs() { return this->timeoutMs_; }
+ const long getTickDurationInMs() { return this->tickDurationInMs_; }
+ bool isEmpty() { return UnAckedMessageTrackerEnabled::isEmpty(); }
+ long size() { return UnAckedMessageTrackerEnabled::size(); }
+}; // class UnAckedMessageTrackerEnabledMock
+
+TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
+ ConsumerConfiguration configConsumer;
+ ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
+ ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);
+
+ UnAckedMessageTrackerDisabled tracker;
+ Message msg;
+ ASSERT_FALSE(tracker.add(msg.getMessageId()));
+ ASSERT_FALSE(tracker.remove(msg.getMessageId()));
+}
+
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerDisabled) {
+ constexpr auto numMsg = 10;
+ const std::string topicName =
+ "testUnAckedMessageTrackerDisabledIndividualAck" +
std::to_string(time(nullptr));
+ const std::string subName = "sub-un-acked-msg-disabled-ind-ack";
+
+ // Setup client, producer and consumer.
+ Client client(lookupUrl);
+
+ Producer producer;
+ ProducerConfiguration configProducer;
+ configProducer.setBatchingEnabled(false);
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, configProducer,
producer));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+
+ // Sending and receiving messages.
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg = MessageBuilder().setContent(std::string("MSG-") +
std::to_string(count)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ UnAckedMessageTrackerDisabled tracker;
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg;
+
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ ASSERT_FALSE(tracker.add(msg.getMessageId()));
+
+ consumer.acknowledge(msg.getMessageId());
+ ASSERT_FALSE(tracker.remove(msg.getMessageId()));
+ }
+ consumer.close();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " <<
msg.getDataAsString();
+ consumer.close();
+ client.close();
+}
+
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledIndividualAck) {
+ constexpr auto numMsg = 10;
+ constexpr auto unAckedMessagesTimeoutMs = 1000;
+ const std::string topicName =
+ "testUnAckedMessageTrackerEnabledIndividualAck" +
std::to_string(time(nullptr));
+ const std::string subName = "sub-un-acked-msg-enabled-ind-ack";
+
+ // Setup client, producer and consumer.
+ Client client(lookupUrl);
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+ auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+
+ // Sending and receiving messages.
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg = MessageBuilder().setContent(std::string("MSG-") +
std::to_string(count)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ std::vector<MessageId> recvMsgId;
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ recvMsgId.emplace_back(msg.getMessageId());
+ }
+
+ auto tracker0 =
std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
+
clientImplPtr, consumerImpl0);
+ ASSERT_EQ(tracker0->getUnAckedMessagesTimeoutMs(),
unAckedMessagesTimeoutMs);
+ ASSERT_EQ(tracker0->getTickDurationInMs(), unAckedMessagesTimeoutMs);
+
+ for (auto idx = 0; idx < numMsg; ++idx) {
+ ASSERT_TRUE(tracker0->add(recvMsgId[idx]));
+ }
+ ASSERT_EQ(numMsg, tracker0->size());
+ ASSERT_FALSE(tracker0->isEmpty());
+
+ std::this_thread::sleep_for(std::chrono::seconds(4));
+ ASSERT_EQ(0, tracker0->size());
+ ASSERT_TRUE(tracker0->isEmpty());
+ consumer.close();
+
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+ auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
+ std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ ASSERT_EQ(restMsgId.count(msg.getMessageId()), 1);
+ ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
+ }
+
+ auto tracker1 =
std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
+
clientImplPtr, consumerImpl1);
+ for (auto idx = 0; idx < numMsg; ++idx) {
+ ASSERT_TRUE(tracker1->add(recvMsgId[idx]));
+ ASSERT_TRUE(tracker1->remove(recvMsgId[idx]));
+ }
+ ASSERT_EQ(0, tracker1->size());
+ ASSERT_TRUE(tracker1->isEmpty());
+ consumer.close();
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " <<
msg.getMessageId();
+ consumer.close();
+ client.close();
+}
+
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
+ constexpr auto numMsg = 10;
+ constexpr auto unAckedMessagesTimeoutMs = 1000;
+ const std::string topicName =
+ "testUnAckedMessageTrackerEnabledCumulativeAck" +
std::to_string(time(nullptr));
+ const std::string subName = "sub-un-acked-msg-enabled-cum-ack";
+
+ // Setup client, producer and consumer.
+ Client client(lookupUrl);
+ auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+ auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+
+ // Sending and receiving messages.
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg = MessageBuilder().setContent(std::string("MSG-") +
std::to_string(count)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ std::vector<MessageId> recvMsgId;
+ for (auto count = 0; count < numMsg; ++count) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ recvMsgId.emplace_back(msg.getMessageId());
+ }
+ auto tracker =
std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
clientImplPtr,
+
consumerImpl0);
+ for (auto idx = 0; idx < numMsg; ++idx) {
+ ASSERT_TRUE(tracker->add(recvMsgId[idx]));
+ }
+ ASSERT_EQ(numMsg, tracker->size());
+ ASSERT_FALSE(tracker->isEmpty());
+
+ std::sort(recvMsgId.begin(), recvMsgId.end());
+
+ auto targetMsgId = recvMsgId[numMsg / 2];
+ ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(targetMsgId));
+ tracker->removeMessagesTill(targetMsgId);
+ ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
+ ASSERT_FALSE(tracker->isEmpty());
+
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+ ASSERT_EQ(0, tracker->size());
+ ASSERT_TRUE(tracker->isEmpty());
+ consumer.close();
+
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+ for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+ }
+ Message msg;
+ auto ret = consumer.receive(msg, 1000);
+ ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " <<
msg.getMessageId();
+ consumer.close();
+ client.close();
+}