This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
commit 1dad87bb3b804d2aa8542ac48e4c35228ac2f1bf Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Sat May 6 17:25:40 2023 +0800 Fix deadlock for negative acknowledgment (#266) Fixes https://github.com/apache/pulsar-client-cpp/issues/265 ### Modifications Make `timer_` const and `enabledForTesting_` atomic in `NegativeAcksTracker` so that the `mutex_` can be used only for the `nackedMessages_` field. After that, we can unlock `mutex_` in `handleTimer` to avoid the potential deadlock from user-provided logger or intercepter. Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix. (cherry picked from commit 8a9b2dc7f58b9b4a888be11d8068839abec51310) --- lib/NegativeAcksTracker.cc | 37 ++++++++++++-------------- lib/NegativeAcksTracker.h | 7 ++--- tests/ConsumerTest.cc | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 23 deletions(-) diff --git a/lib/NegativeAcksTracker.cc b/lib/NegativeAcksTracker.cc index 451a13f..5c3ef3f 100644 --- a/lib/NegativeAcksTracker.cc +++ b/lib/NegativeAcksTracker.cc @@ -35,8 +35,7 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con const ConsumerConfiguration &conf) : consumer_(consumer), timerInterval_(0), - executor_(client->getIOExecutorProvider()->get()), - enabledForTesting_(true) { + timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) { static const long MIN_NACK_DELAY_MILLIS = 100; nackDelay_ = @@ -47,7 +46,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con } void NegativeAcksTracker::scheduleTimer() { - timer_ = executor_->createDeadlineTimer(); + if (closed_) { + return; + } timer_->expires_from_now(timerInterval_); timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1)); } @@ -58,8 +59,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { return; } - std::lock_guard<std::mutex> lock(mutex_); - timer_ = nullptr; + std::unique_lock<std::mutex> lock(mutex_); if (nackedMessages_.empty() || !enabledForTesting_) { return; @@ -78,6 +78,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { ++it; } } + lock.unlock(); if (!messagesToRedeliver.empty()) { consumer_.onNegativeAcksSend(messagesToRedeliver); @@ -87,34 +88,30 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) { } void NegativeAcksTracker::add(const MessageId &m) { - std::lock_guard<std::mutex> lock(mutex_); - + auto msgId = discardBatch(m); auto now = Clock::now(); - // Erase batch id to group all nacks from same batch - nackedMessages_[discardBatch(m)] = now + nackDelay_; - - if (!timer_) { - scheduleTimer(); + { + std::lock_guard<std::mutex> lock{mutex_}; + // Erase batch id to group all nacks from same batch + nackedMessages_[msgId] = now + nackDelay_; } + + scheduleTimer(); } void NegativeAcksTracker::close() { + closed_ = true; + boost::system::error_code ec; + timer_->cancel(ec); std::lock_guard<std::mutex> lock(mutex_); - - if (timer_) { - boost::system::error_code ec; - timer_->cancel(ec); - } - timer_ = nullptr; nackedMessages_.clear(); } void NegativeAcksTracker::setEnabledForTesting(bool enabled) { - std::lock_guard<std::mutex> lock(mutex_); enabledForTesting_ = enabled; - if (enabledForTesting_ && !timer_) { + if (enabledForTesting_) { scheduleTimer(); } } diff --git a/lib/NegativeAcksTracker.h b/lib/NegativeAcksTracker.h index f8b334b..029f7d2 100644 --- a/lib/NegativeAcksTracker.h +++ b/lib/NegativeAcksTracker.h @@ -22,6 +22,7 @@ #include <pulsar/ConsumerConfiguration.h> #include <pulsar/MessageId.h> +#include <atomic> #include <boost/asio/deadline_timer.hpp> #include <chrono> #include <map> @@ -65,9 +66,9 @@ class NegativeAcksTracker { typedef typename std::chrono::steady_clock Clock; std::map<MessageId, Clock::time_point> nackedMessages_; - ExecutorServicePtr executor_; - DeadlineTimerPtr timer_; - bool enabledForTesting_; // to be able to test deterministically + const DeadlineTimerPtr timer_; + std::atomic_bool closed_{false}; + std::atomic_bool enabledForTesting_{true}; // to be able to test deterministically FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose); }; diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 23ea687..7867a57 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -20,9 +20,11 @@ #include <pulsar/Client.h> #include <array> +#include <atomic> #include <chrono> #include <ctime> #include <map> +#include <mutex> #include <set> #include <thread> #include <vector> @@ -1240,4 +1242,67 @@ TEST(ConsumerTest, testAckNotPersistentTopic) { INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); +class InterceptorForNegAckDeadlock : public ConsumerInterceptor { + public: + Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; } + + void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {} + + void onAcknowledgeCumulative(const Consumer& consumer, Result result, + const MessageId& messageID) override {} + + void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override { + duringNegativeAck_ = true; + // Wait for the next time Consumer::negativeAcknowledge is called + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::lock_guard<std::mutex> lock{mutex_}; + LOG_INFO("onNegativeAcksSend is called for " << consumer.getTopic()); + duringNegativeAck_ = false; + } + + static std::mutex mutex_; + static std::atomic_bool duringNegativeAck_; +}; + +std::mutex InterceptorForNegAckDeadlock::mutex_; +std::atomic_bool InterceptorForNegAckDeadlock::duringNegativeAck_{false}; + +// For https://github.com/apache/pulsar-client-cpp/issues/265 +TEST(ConsumerTest, testNegativeAckDeadlock) { + const std::string topic = "test-negative-ack-deadlock"; + Client client{lookupUrl}; + ConsumerConfiguration conf; + conf.setNegativeAckRedeliveryDelayMs(500); + conf.intercept({std::make_shared<InterceptorForNegAckDeadlock>()}); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); + producer.send(MessageBuilder().setContent("msg").build()); + + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg)); + + auto& duringNegativeAck = InterceptorForNegAckDeadlock::duringNegativeAck_; + duringNegativeAck = false; + consumer.negativeAcknowledge(msg); // schedule the negative ack timer + // Wait until the negative ack timer is triggered and onNegativeAcksSend will be called + for (int i = 0; !duringNegativeAck && i < 100; i++) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + ASSERT_TRUE(duringNegativeAck); + + { + std::lock_guard<std::mutex> lock{InterceptorForNegAckDeadlock::mutex_}; + consumer.negativeAcknowledge(msg); + } + for (int i = 0; duringNegativeAck && i < 100; i++) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + ASSERT_FALSE(duringNegativeAck); + + client.close(); +} + } // namespace pulsar