This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 3f0b33b fix: Incorrect acknowledgment behavior in the listener of the multi-topic consumer. (#423) 3f0b33b is described below commit 3f0b33bfad746fd2da63fe062ea745d9a9caed55 Author: Baodi Shi <ba...@apache.org> AuthorDate: Sat Apr 6 18:23:09 2024 +0800 fix: Incorrect acknowledgment behavior in the listener of the multi-topic consumer. (#423) ### Motivation https://github.com/apache/pulsar-client-node/issues/371 ### Modifications - Add the message to the unacknowledged tracker before call the listener. ### Verifying this change - Add `testMultiConsumerListenerAndAck` to cover it. --- lib/MultiTopicsConsumerImpl.cc | 2 +- lib/MultiTopicsConsumerImpl.h | 1 + lib/UnAckedMessageTrackerEnabled.h | 1 + tests/ConsumerTest.cc | 42 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 45 insertions(+), 1 deletion(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 1484785..80566c8 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -568,8 +568,8 @@ void MultiTopicsConsumerImpl::internalListener(Consumer consumer) { incomingMessages_.pop(m); try { Consumer self{get_shared_this_ptr()}; - messageListener_(self, m); messageProcessed(m); + messageListener_(self, m); } catch (const std::exception& e) { LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what()); } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 9d71a04..b5c51ec 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -183,6 +183,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); FRIEND_TEST(ConsumerTest, testPatternSubscribeTopic); + FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck); }; typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr; diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h index 83edc4c..c5479a7 100644 --- a/lib/UnAckedMessageTrackerEnabled.h +++ b/lib/UnAckedMessageTrackerEnabled.h @@ -67,6 +67,7 @@ class UnAckedMessageTrackerEnabled : public std::enable_shared_from_this<UnAcked FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker); FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); + FRIEND_TEST(ConsumerTest, testMultiConsumerListenerAndAck); }; } // namespace pulsar diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index f97457f..2aab722 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1490,4 +1490,46 @@ TEST(ConsumerTest, testSNIProxyConnect) { ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer)); client.close(); } + +TEST(ConsumerTest, testMultiConsumerListenerAndAck) { + Client client{lookupUrl}; + + const std::string topicName = "testConsumerEventWithPartition-topic-" + std::to_string(time(nullptr)); + int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions", + std::to_string(5)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + // Create a producer + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); + + int num = 10; + // Use listener to consume + Latch latch{num}; + Consumer consumer; + ConsumerConfiguration consumerConfiguration; + PulsarFriend::setConsumerUnAckMessagesTimeoutMs(consumerConfiguration, 2000); + consumerConfiguration.setMessageListener([&latch](Consumer& consumer, const Message& msg) { + LOG_INFO("Received message '" << msg.getDataAsString() << "' and ack it"); + consumer.acknowledge(msg); + latch.countdown(); + }); + ASSERT_EQ(ResultOk, client.subscribe(topicName, "consumer-1", consumerConfiguration, consumer)); + + // Send synchronously + for (int i = 0; i < 10; ++i) { + Message msg = MessageBuilder().setContent("content" + std::to_string(i)).build(); + Result result = producer.send(msg); + LOG_INFO("Message sent: " << result); + } + + ASSERT_TRUE(latch.wait(std::chrono::seconds(5))); + auto multiConsumerImplPtr = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer); + auto tracker = + static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImplPtr->unAckedMessageTrackerPtr_.get()); + ASSERT_EQ(0, tracker->size()); + + client.close(); +} + } // namespace pulsar