This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 92161881578dcc93e94f405a6ca5047793acfffd Author: Yunze Xu <[email protected]> AuthorDate: Fri Aug 26 13:59:19 2022 +0800 Revert "[fix][cpp] Fix multi-topics consumer close segmentation fault (#17239)" This reverts commit 2dbf9df0d207dad1b9c516bb910e22fcd72719c8. --- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 97 +++++++++++----------- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 2 +- pulsar-client-cpp/lib/SynchronizedHashMap.h | 11 --- pulsar-client-cpp/tests/ClientTest.cc | 11 --- pulsar-client-cpp/tests/SynchronizedHashMapTest.cc | 9 +- 5 files changed, 51 insertions(+), 79 deletions(-) diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 5ed66400983..293ece4c838 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -82,17 +82,16 @@ void MultiTopicsConsumerImpl::start() { void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic, std::shared_ptr<std::atomic<int>> topicsNeedCreate) { + (*topicsNeedCreate)--; + if (result != ResultOk) { state_ = Failed; - // Use the first failed result - auto expectedResult = ResultOk; - failedResult.compare_exchange_strong(expectedResult, result); LOG_ERROR("Failed when subscribed to topic " << topic << " in TopicsConsumer. Error - " << result); - } else { - LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); } - if (--(*topicsNeedCreate) == 0) { + LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer "); + + if (topicsNeedCreate->load() == 0) { MultiTopicsConsumerState state = Pending; if (state_.compare_exchange_strong(state, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); @@ -100,10 +99,11 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c } else { LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); // unsubscribed all of the successfully subscribed partitioned consumers - // It's safe to capture only this here, because the callback can be called only when this is valid - closeAsync( - [this](Result result) { multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); }); + closeAsync(nullptr); + multiTopicsConsumerCreatedPromise_.setFailed(result); + return; } + return; } } @@ -364,47 +364,13 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { state_ = Closing; - std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()}; + auto self = shared_from_this(); int numConsumers = 0; - consumers_.clear( - [this, weakSelf, &numConsumers, callback](const std::string& name, const ConsumerImplPtr& consumer) { - auto self = weakSelf.lock(); - if (!self) { - return; - } + consumers_.forEach( + [&numConsumers, &self, callback](const std::string& name, const ConsumerImplPtr& consumer) { numConsumers++; - consumer->closeAsync([this, weakSelf, name, callback](Result result) { - auto self = weakSelf.lock(); - if (!self) { - return; - } - LOG_DEBUG("Closing the consumer for partition - " << name << " numberTopicPartitions_ - " - << numberTopicPartitions_->load()); - const int numConsumersLeft = --*numberTopicPartitions_; - if (numConsumersLeft < 0) { - LOG_ERROR("[" << name << "] Unexpected number of left consumers: " << numConsumersLeft - << " during close"); - return; - } - if (result != ResultOk) { - state_ = Failed; - LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - " - << result); - } - // closed all consumers - if (numConsumersLeft == 0) { - messages_.clear(); - topicsPartitions_.clear(); - unAckedMessageTrackerPtr_->clear(); - - if (state_ != Failed) { - state_ = Closed; - } - - if (callback) { - callback(result); - } - } + consumer->closeAsync([self, name, callback](Result result) { + self->handleSingleConsumerClose(result, name, callback); }); }); if (numConsumers == 0) { @@ -421,6 +387,41 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback callback) { failPendingReceiveCallback(); } +void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, std::string topicPartitionName, + CloseCallback callback) { + consumers_.remove(topicPartitionName); + + LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << " numberTopicPartitions_ - " + << numberTopicPartitions_->load()); + + assert(numberTopicPartitions_->load() > 0); + numberTopicPartitions_->fetch_sub(1); + + if (result != ResultOk) { + state_ = Failed; + LOG_ERROR("Closing the consumer failed for partition - " << topicPartitionName << " with error - " + << result); + } + + // closed all consumers + if (numberTopicPartitions_->load() == 0) { + messages_.clear(); + consumers_.clear(); + topicsPartitions_.clear(); + unAckedMessageTrackerPtr_->clear(); + + if (state_ != Failed) { + state_ = Closed; + } + + multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError); + if (callback) { + callback(result); + } + return; + } +} + void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic() << " message:" << msg.getDataAsString()); diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index 0f111110c44..d2ede4a8770 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -105,7 +105,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, MessageListener messageListener_; LookupServicePtr lookupServicePtr_; std::shared_ptr<std::atomic<int>> numberTopicPartitions_; - std::atomic<Result> failedResult{ResultOk}; Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_; UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; const std::vector<std::string>& topics_; @@ -114,6 +113,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, /* methods */ void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex); + void handleSingleConsumerClose(Result result, std::string topicPartitionName, CloseCallback callback); void notifyResult(CloseCallback closeCallback); void messageReceived(Consumer consumer, const Message& msg); void internalListener(Consumer consumer); diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h index 184ca6a2836..3a784675dd1 100644 --- a/pulsar-client-cpp/lib/SynchronizedHashMap.h +++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h @@ -70,17 +70,6 @@ class SynchronizedHashMap { data_.clear(); } - // clear the map and apply `f` on each removed value - void clear(std::function<void(const K&, const V&)> f) { - Lock lock(mutex_); - auto it = data_.begin(); - while (it != data_.end()) { - f(it->first, it->second); - auto next = data_.erase(it); - it = next; - } - } - OptValue find(const K& key) const { Lock lock(mutex_); auto it = data_.find(key); diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 58c889f074a..00696b9a5a4 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -262,17 +262,6 @@ TEST(ClientTest, testWrongListener) { client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); - Consumer multiTopicsConsumer; - ASSERT_EQ(ResultServiceUnitNotReady, - client.subscribe({topic + "-partition-0", topic + "-partition-1", topic + "-partition-2"}, - "sub", multiTopicsConsumer)); - - ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); - ASSERT_EQ(ResultOk, client.close()); - - // Currently Reader can only read a non-partitioned topic in C++ client - client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); - // Currently Reader can only read a non-partitioned topic in C++ client Reader reader; ASSERT_EQ(ResultServiceUnitNotReady, diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc index 8d74a24014a..62c55c46c8e 100644 --- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc +++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc @@ -40,16 +40,9 @@ inline PairVector sort(PairVector pairs) { } TEST(SynchronizedHashMap, testClear) { - SyncMapType m({{1, 100}, {2, 200}}); + SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}}); m.clear(); ASSERT_EQ(m.toPairVector(), PairVector{}); - - PairVector expectedPairs({{3, 300}, {4, 400}}); - SyncMapType m2(expectedPairs); - PairVector pairs; - m2.clear([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); }); - ASSERT_EQ(m2.toPairVector(), PairVector{}); - ASSERT_EQ(sort(pairs), expectedPairs); } TEST(SynchronizedHashMap, testRemoveAndFind) {
