This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 13065496510ff774ca8d9e7f665e0d6f1467543e Author: Yunze Xu <[email protected]> AuthorDate: Thu Aug 25 10:20:51 2022 +0800 [fix][cpp] Fix multi-topics consumer close segmentation fault (#17239) (cherry picked from commit 40d2ae3ac7ea11c8200ea104efcc3c587425b800) --- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 97 +++++++++++----------- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 2 +- pulsar-client-cpp/lib/SynchronizedHashMap.h | 11 +++ pulsar-client-cpp/tests/ClientTest.cc | 11 +++ pulsar-client-cpp/tests/SynchronizedHashMapTest.cc | 9 +- 5 files changed, 79 insertions(+), 51 deletions(-) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 293ece4c838..5ed66400983 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -82,16 +82,17 @@ void MultiTopicsConsumerImpl::start() { void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic, std::shared_ptr<std::atomic<int>> topicsNeedCreate) { - (*topicsNeedCreate)--; - if (result != ResultOk) { state_ = Failed; + // Use the first failed result + auto expectedResult = ResultOk; + failedResult.compare_exchange_strong(expectedResult, result); LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result); + } else { + LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); } - LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); - - if (topicsNeedCreate->load() == 0) { + if (--(*topicsNeedCreate) == 0) { MultiTopicsConsumerState state = Pending; if (state_.compare_exchange_strong(state, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); @@ -99,11 +100,10 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c } else { LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); // unsubscribed all of the successfully subscribed partitioned consumers - closeAsync(nullptr); - multiTopicsConsumerCreatedPromise_.setFailed(result); - return; + // It's safe to capture only this here, because the callback can be called only when this is valid + closeAsync( + [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); }); } - return; } } @@ -364,13 +364,47 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { state_ = Closing; - auto self = shared_from_this(); + std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()}; int numConsumers = 0; - consumers_.forEach( - [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) { + consumers_.clear( + [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) { + auto self = weakSelf.lock(); + if (!self) { + return; + } numConsumers++; - consumer->closeAsync([self, name, callback](Result result) { - self->handleSingleConsumerClose(result, name, callback); + consumer->closeAsync([this, weakSelf, name, callback](Result result) { + auto self = weakSelf.lock(); + if (!self) { + return; + } + LOG_DEBUG("Closing the consumer for partition - " << name << " numberTopicPartitions_ - " + << numberTopicPartitions_->load()); + const int numConsumersLeft = --*numberTopicPartitions_; + if (numConsumersLeft < 0) { + LOG_ERROR("[" << name << "] Unexpected number of left consumers: " << numConsumersLeft + << " during close"); + return; + } + if (result != ResultOk) { + state_ = Failed; + LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - " + << result); + } + // closed all consumers + if (numConsumersLeft == 0) { + messages_.clear(); + topicsPartitions_.clear(); + unAckedMessageTrackerPtr_->clear(); + + if (state_ != Failed) { + state_ = Closed; + } + + if (callback) { + callback(result); + } + } }); }); if (numConsumers == 0) { @@ -387,41 +421,6 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { failPendingReceiveCallback(); } -void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName, - CloseCallback callback) { - consumers_.remove(topicPartitionName); - - LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - " - << numberTopicPartitions_->load()); - - assert(numberTopicPartitions_->load() > 0); - numberTopicPartitions_->fetch_sub(1); - - if (result != ResultOk) { - state_ = Failed; - LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - " - << result); - } - - // closed all consumers - if (numberTopicPartitions_->load() == 0) { - messages_.clear(); - consumers_.clear(); - topicsPartitions_.clear(); - unAckedMessageTrackerPtr_->clear(); - - if (state_ != Failed) { - state_ = Closed; - } - - multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError); - if (callback) { - callback(result); - } - return; - } -} - void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic() << " message:" << msg.getDataAsString()); diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index d2ede4a8770..0f111110c44 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -105,6 +105,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, MessageListener messageListener_; LookupServicePtr lookupServicePtr_; std::shared_ptr<std::atomic<int>> numberTopicPartitions_; + std::atomic<Result> failedResult{ResultOk}; Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_; UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; const std::vector<std::string>& topics_; @@ -113,7 +114,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, /* methods */ void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); - void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback); void notifyResult(CloseCallback closeCallback); void messageReceived(Consumer consumer, const Message& msg); void internalListener(Consumer consumer); diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h index 3a784675dd1..184ca6a2836 100644 --- a/pulsar-client-cpp/lib/SynchronizedHashMap.h +++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h @@ -70,6 +70,17 @@ class SynchronizedHashMap { data_.clear(); } + // clear the map and apply `f` on each removed value + void clear(std::function<void(const K&, const V&)> f) { + Lock lock(mutex_); + auto it = data_.begin(); + while (it != data_.end()) { + f(it->first, it->second); + auto next = data_.erase(it); + it = next; + } + } + OptValue find(const K& key) const { Lock lock(mutex_); auto it = data_.find(key); diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 00696b9a5a4..58c889f074a 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -262,6 +262,17 @@ TEST(ClientTest, testWrongListener) { client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); + Consumer multiTopicsConsumer; + ASSERT_EQ(ResultServiceUnitNotReady, + client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"}, + "sub", multiTopicsConsumer)); + + ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); + ASSERT_EQ(ResultOk, client.close()); + + // Currently Reader can only read a non-partitioned topic in C++ client + client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); + // Currently Reader can only read a non-partitioned topic in C++ client Reader reader; ASSERT_EQ(ResultServiceUnitNotReady, diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc index 62c55c46c8e..8d74a24014a 100644 --- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc +++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc @@ -40,9 +40,16 @@ inline PairVector sort(PairVector pairs) { } TEST(SynchronizedHashMap, testClear) { - SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}}); + SyncMapType m({{1, 100}, {2, 200}}); m.clear(); ASSERT_EQ(m.toPairVector(), PairVector{}); + + PairVector expectedPairs({{3, 300}, {4, 400}}); + SyncMapType m2(expectedPairs); + PairVector pairs; + m2.clear([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); }); + ASSERT_EQ(m2.toPairVector(), PairVector{}); + ASSERT_EQ(sort(pairs), expectedPairs); } TEST(SynchronizedHashMap, testRemoveAndFind) {
