This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new d9182b9  [C++] Fix cpp client do AcknowledgeCumulative not clean up 
previous message (#8606)
d9182b9 is described below

commit d9182b9c66c6569d17e593e54a6184808b1d45c4
Author: sijianliang <[email protected]>
AuthorDate: Wed Dec 30 18:35:58 2020 +0800

    [C++] Fix cpp client do AcknowledgeCumulative not clean up previous message 
(#8606)
    
    ### Motivation
    pulsar-client-cpp Consumer do AcknowledgeCumulative just clean up `msgId`, 
not <= `msgId` in  `UnAckedMessageTrackerEnabled::removeMessagesTill`
    
    ### Modifications
    
    - When do AcknowledgeCumulative from application, earse <= `msgId` in 
UnAckedMessageTrackerEnabled, avoid redeliver unnecessary unacknowledged 
messages to Broker
    - add unit test for `UnAckedMessageTrackerEnabled`
    
    (cherry picked from commit e75de488679ccee30a6cf99edc8e7779112fb786)
---
 .../lib/UnAckedMessageTrackerEnabled.cc            |  24 +--
 .../lib/UnAckedMessageTrackerEnabled.h             |   2 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 205 +++++++++++++++++++++
 3 files changed, 218 insertions(+), 13 deletions(-)

diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index e280dba..96dfd6c 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -113,6 +113,7 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId& 
m) {
     std::map<MessageId, std::set<MessageId>&>::iterator exist = 
messageIdPartitionMap.find(m);
     if (exist != messageIdPartitionMap.end()) {
         removed = exist->second.erase(m);
+        messageIdPartitionMap.erase(exist);
     }
     return removed;
 }
@@ -124,13 +125,13 @@ long UnAckedMessageTrackerEnabled::size() {
 
 void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     std::lock_guard<std::mutex> acquire(lock_);
-    for (auto it = messageIdPartitionMap.begin(); it != 
messageIdPartitionMap.end(); it++) {
+    for (auto it = messageIdPartitionMap.begin(); it != 
messageIdPartitionMap.end();) {
         MessageId msgIdInMap = it->first;
-        if (msgIdInMap < msgId) {
-            std::map<MessageId, std::set<MessageId>&>::iterator exist = 
messageIdPartitionMap.find(msgId);
-            if (exist != messageIdPartitionMap.end()) {
-                exist->second.erase(msgId);
-            }
+        if (msgIdInMap <= msgId) {
+            it->second.erase(msgIdInMap);
+            messageIdPartitionMap.erase(it++);
+        } else {
+            it++;
         }
     }
 }
@@ -138,14 +139,13 @@ void 
UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
 // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, 
should remove all it's message.
 void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& 
topic) {
     std::lock_guard<std::mutex> acquire(lock_);
-    for (auto it = messageIdPartitionMap.begin(); it != 
messageIdPartitionMap.end(); it++) {
+    for (auto it = messageIdPartitionMap.begin(); it != 
messageIdPartitionMap.end();) {
         MessageId msgIdInMap = it->first;
         if (msgIdInMap.getTopicName().compare(topic) == 0) {
-            std::map<MessageId, std::set<MessageId>&>::iterator exist =
-                messageIdPartitionMap.find(msgIdInMap);
-            if (exist != messageIdPartitionMap.end()) {
-                exist->second.erase(msgIdInMap);
-            }
+            it->second.erase(msgIdInMap);
+            messageIdPartitionMap.erase(it++);
+        } else {
+            it++;
         }
     }
 }
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 9195b30..16933cc 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -36,7 +36,7 @@ class UnAckedMessageTrackerEnabled : public 
UnAckedMessageTrackerInterface {
 
     void clear();
 
-   private:
+   protected:
     void timeoutHandlerHelper();
     bool isEmpty();
     long size();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index a812657..8c0a1ed 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3828,3 +3828,208 @@ TEST(BasicEndToEndTest, 
testAckGroupingTrackerEnabledCumulativeAck) {
     ret = consumer.receive(msg, 1000);
     ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << 
msg.getMessageId();
 }
+
+class UnAckedMessageTrackerEnabledMock : public UnAckedMessageTrackerEnabled {
+   public:
+    UnAckedMessageTrackerEnabledMock(long timeoutMs, const ClientImplPtr 
client, ConsumerImplBase &consumer)
+        : UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer) 
{}
+    const long getUnAckedMessagesTimeoutMs() { return this->timeoutMs_; }
+    const long getTickDurationInMs() { return this->tickDurationInMs_; }
+    bool isEmpty() { return UnAckedMessageTrackerEnabled::isEmpty(); }
+    long size() { return UnAckedMessageTrackerEnabled::size(); }
+};  // class UnAckedMessageTrackerEnabledMock
+
+TEST(BasicEndToEndTest, testtUnAckedMessageTrackerDefaultBehavior) {
+    ConsumerConfiguration configConsumer;
+    ASSERT_EQ(configConsumer.getUnAckedMessagesTimeoutMs(), 0);
+    ASSERT_EQ(configConsumer.getTickDurationInMs(), 1000);
+
+    UnAckedMessageTrackerDisabled tracker;
+    Message msg;
+    ASSERT_FALSE(tracker.add(msg.getMessageId()));
+    ASSERT_FALSE(tracker.remove(msg.getMessageId()));
+}
+
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerDisabled) {
+    constexpr auto numMsg = 10;
+    const std::string topicName =
+        "testUnAckedMessageTrackerDisabledIndividualAck" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub-un-acked-msg-disabled-ind-ack";
+
+    // Setup client, producer and consumer.
+    Client client(lookupUrl);
+
+    Producer producer;
+    ProducerConfiguration configProducer;
+    configProducer.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, configProducer, 
producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+
+    // Sending and receiving messages.
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg = MessageBuilder().setContent(std::string("MSG-") + 
std::to_string(count)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    UnAckedMessageTrackerDisabled tracker;
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg;
+
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        ASSERT_FALSE(tracker.add(msg.getMessageId()));
+
+        consumer.acknowledge(msg.getMessageId());
+        ASSERT_FALSE(tracker.remove(msg.getMessageId()));
+    }
+    consumer.close();
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message: " << 
msg.getDataAsString();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledIndividualAck) {
+    constexpr auto numMsg = 10;
+    constexpr auto unAckedMessagesTimeoutMs = 1000;
+    const std::string topicName =
+        "testUnAckedMessageTrackerEnabledIndividualAck" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub-un-acked-msg-enabled-ind-ack";
+
+    // Setup client, producer and consumer.
+    Client client(lookupUrl);
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+    auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+
+    // Sending and receiving messages.
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg = MessageBuilder().setContent(std::string("MSG-") + 
std::to_string(count)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    std::vector<MessageId> recvMsgId;
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        recvMsgId.emplace_back(msg.getMessageId());
+    }
+
+    auto tracker0 = 
std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
+                                                                       
clientImplPtr, consumerImpl0);
+    ASSERT_EQ(tracker0->getUnAckedMessagesTimeoutMs(), 
unAckedMessagesTimeoutMs);
+    ASSERT_EQ(tracker0->getTickDurationInMs(), unAckedMessagesTimeoutMs);
+
+    for (auto idx = 0; idx < numMsg; ++idx) {
+        ASSERT_TRUE(tracker0->add(recvMsgId[idx]));
+    }
+    ASSERT_EQ(numMsg, tracker0->size());
+    ASSERT_FALSE(tracker0->isEmpty());
+
+    std::this_thread::sleep_for(std::chrono::seconds(4));
+    ASSERT_EQ(0, tracker0->size());
+    ASSERT_TRUE(tracker0->isEmpty());
+    consumer.close();
+
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+    auto &consumerImpl1 = PulsarFriend::getConsumerImpl(consumer);
+    std::set<MessageId> restMsgId(recvMsgId.begin(), recvMsgId.end());
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        ASSERT_EQ(restMsgId.count(msg.getMessageId()), 1);
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
+    }
+
+    auto tracker1 = 
std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs,
+                                                                       
clientImplPtr, consumerImpl1);
+    for (auto idx = 0; idx < numMsg; ++idx) {
+        ASSERT_TRUE(tracker1->add(recvMsgId[idx]));
+        ASSERT_TRUE(tracker1->remove(recvMsgId[idx]));
+    }
+    ASSERT_EQ(0, tracker1->size());
+    ASSERT_TRUE(tracker1->isEmpty());
+    consumer.close();
+
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << 
msg.getMessageId();
+    consumer.close();
+    client.close();
+}
+
+TEST(BasicEndToEndTest, testUnAckedMessageTrackerEnabledCumulativeAck) {
+    constexpr auto numMsg = 10;
+    constexpr auto unAckedMessagesTimeoutMs = 1000;
+    const std::string topicName =
+        "testUnAckedMessageTrackerEnabledCumulativeAck" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub-un-acked-msg-enabled-cum-ack";
+
+    // Setup client, producer and consumer.
+    Client client(lookupUrl);
+    auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+    auto &consumerImpl0 = PulsarFriend::getConsumerImpl(consumer);
+
+    // Sending and receiving messages.
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg = MessageBuilder().setContent(std::string("MSG-") + 
std::to_string(count)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    std::vector<MessageId> recvMsgId;
+    for (auto count = 0; count < numMsg; ++count) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+        recvMsgId.emplace_back(msg.getMessageId());
+    }
+    auto tracker = 
std::make_shared<UnAckedMessageTrackerEnabledMock>(unAckedMessagesTimeoutMs, 
clientImplPtr,
+                                                                      
consumerImpl0);
+    for (auto idx = 0; idx < numMsg; ++idx) {
+        ASSERT_TRUE(tracker->add(recvMsgId[idx]));
+    }
+    ASSERT_EQ(numMsg, tracker->size());
+    ASSERT_FALSE(tracker->isEmpty());
+
+    std::sort(recvMsgId.begin(), recvMsgId.end());
+
+    auto targetMsgId = recvMsgId[numMsg / 2];
+    ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(targetMsgId));
+    tracker->removeMessagesTill(targetMsgId);
+    ASSERT_EQ(numMsg - (numMsg / 2 + 1), tracker->size());
+    ASSERT_FALSE(tracker->isEmpty());
+
+    std::this_thread::sleep_for(std::chrono::seconds(2));
+    ASSERT_EQ(0, tracker->size());
+    ASSERT_TRUE(tracker->isEmpty());
+    consumer.close();
+
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
+    for (auto count = numMsg / 2 + 1; count < numMsg; ++count) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
+    }
+    Message msg;
+    auto ret = consumer.receive(msg, 1000);
+    ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << 
msg.getMessageId();
+    consumer.close();
+    client.close();
+}

Reply via email to