This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9d024040499805a0536d745b2f9ddce3da9ca983 Author: Yunze Xu <[email protected]> AuthorDate: Mon Aug 24 07:39:32 2020 +0800 [issue 7851][C++] Make clear() thread-safe (#7862) Fixes #7851 ### Motivation `clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe. ### Modifications Acquire a mutex in these `clear()` methods. (cherry picked from commit 97f41120b9691474f0038b220f3204fa69e48257) --- pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc | 1 + pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc | 6 +++++- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc index df15119..3d6d920 100644 --- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc +++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc @@ -33,6 +33,7 @@ BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic } void BatchAcknowledgementTracker::clear() { + Lock lock(mutex_); trackerMap_.clear(); sendList_.clear(); } diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index 9185dba..e280dba 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -39,7 +39,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { } void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { - std::lock_guard<std::mutex> acquire(lock_); + std::unique_lock<std::mutex> acquire(lock_); LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ " << consumerReference_.getName().c_str()); @@ -60,6 +60,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { timePartitions.push_back(headPartition); if (msgIdsToRedeliver.size() > 0) { + // redeliverUnacknowledgedMessages() may call clear() that acquire the lock again, so we should unlock + // here to avoid deadlock + acquire.unlock(); consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver); } } @@ -148,6 +151,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) } void UnAckedMessageTrackerEnabled::clear() { + std::lock_guard<std::mutex> acquire(lock_); messageIdPartitionMap.clear(); for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) { it->clear(); diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index cc71a35..acad16a 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -1748,6 +1748,8 @@ TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) { std::this_thread::sleep_for(std::chrono::milliseconds(500)); timeWaited += 500; } + + client.close(); } TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
