This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 97f4112 [issue 7851][C++] Make clear() thread-safe (#7862)
97f4112 is described below
commit 97f41120b9691474f0038b220f3204fa69e48257
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Aug 24 07:39:32 2020 +0800
[issue 7851][C++] Make clear() thread-safe (#7862)
Fixes #7851
### Motivation
`clear()` methods of `BatchAcknowledgementTracker` and
`UnAckedMessageTrackerEnabled` are not thread-safe.
### Modifications
Acquire a mutex in these `clear()` methods.
---
pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc | 1 +
pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc | 6 +++++-
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 2 ++
3 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
index df15119..3d6d920 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
@@ -33,6 +33,7 @@
BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic
}
void BatchAcknowledgementTracker::clear() {
+ Lock lock(mutex_);
trackerMap_.clear();
sendList_.clear();
}
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 9185dba..e280dba 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -39,7 +39,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}
void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
- std::lock_guard<std::mutex> acquire(lock_);
+ std::unique_lock<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for
consumerPtr_ "
<< consumerReference_.getName().c_str());
@@ -60,6 +60,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
timePartitions.push_back(headPartition);
if (msgIdsToRedeliver.size() > 0) {
+ // redeliverUnacknowledgedMessages() may call clear() that acquire the
lock again, so we should unlock
+ // here to avoid deadlock
+ acquire.unlock();
consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
}
@@ -148,6 +151,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const
std::string& topic)
}
void UnAckedMessageTrackerEnabled::clear() {
+ std::lock_guard<std::mutex> acquire(lock_);
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index b574630..eeb1489 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1750,6 +1750,8 @@ TEST(BasicEndToEndTest,
testPartitionTopicUnAckedMessageTimeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
timeWaited += 500;
}
+
+ client.close();
}
TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {