This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 92161881578dcc93e94f405a6ca5047793acfffd
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Aug 26 13:59:19 2022 +0800

    Revert "[fix][cpp] Fix multi-topics consumer close segmentation fault 
(#17239)"
    
    This reverts commit 2dbf9df0d207dad1b9c516bb910e22fcd72719c8.
---
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 97 +++++++++++-----------
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  2 +-
 pulsar-client-cpp/lib/SynchronizedHashMap.h        | 11 ---
 pulsar-client-cpp/tests/ClientTest.cc              | 11 ---
 pulsar-client-cpp/tests/SynchronizedHashMapTest.cc |  9 +-
 5 files changed, 51 insertions(+), 79 deletions(-)

diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 5ed66400983..293ece4c838 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -82,17 +82,16 @@ void MultiTopicsConsumerImpl::start() {
 void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer 
consumer,
                                                        const std::string& 
topic,
                                                        
std::shared_ptr<std::atomic<int>> topicsNeedCreate) {
+    (*topicsNeedCreate)--;
+
     if (result != ResultOk) {
         state_ = Failed;
-        // Use the first failed result
-        auto expectedResult = ResultOk;
-        failedResult.compare_exchange_strong(expectedResult, result);
         LOG_ERROR("Failed when subscribed to topic " << topic << " in 
TopicsConsumer. Error - " << result);
-    } else {
-        LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
     }
 
-    if (--(*topicsNeedCreate) == 0) {
+    LOG_DEBUG("Subscribed to topic " << topic << " in TopicsConsumer ");
+
+    if (topicsNeedCreate->load() == 0) {
         MultiTopicsConsumerState state = Pending;
         if (state_.compare_exchange_strong(state, Ready)) {
             LOG_INFO("Successfully Subscribed to Topics");
@@ -100,10 +99,11 @@ void 
MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
         } else {
             LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " 
Error - " << result);
             // unsubscribed all of the successfully subscribed partitioned 
consumers
-            // It's safe to capture only this here, because the callback can 
be called only when this is valid
-            closeAsync(
-                [this](Result result) { 
multiTopicsConsumerCreatedPromise_.setFailed(failedResult.load()); });
+            closeAsync(nullptr);
+            multiTopicsConsumerCreatedPromise_.setFailed(result);
+            return;
         }
+        return;
     }
 }
 
@@ -364,47 +364,13 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
 
     state_ = Closing;
 
-    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{shared_from_this()};
+    auto self = shared_from_this();
     int numConsumers = 0;
-    consumers_.clear(
-        [this, weakSelf, &numConsumers, callback](const std::string& name, 
const ConsumerImplPtr& consumer) {
-            auto self = weakSelf.lock();
-            if (!self) {
-                return;
-            }
+    consumers_.forEach(
+        [&numConsumers, &self, callback](const std::string& name, const 
ConsumerImplPtr& consumer) {
             numConsumers++;
-            consumer->closeAsync([this, weakSelf, name, callback](Result 
result) {
-                auto self = weakSelf.lock();
-                if (!self) {
-                    return;
-                }
-                LOG_DEBUG("Closing the consumer for partition - " << name << " 
numberTopicPartitions_ - "
-                                                                  << 
numberTopicPartitions_->load());
-                const int numConsumersLeft = --*numberTopicPartitions_;
-                if (numConsumersLeft < 0) {
-                    LOG_ERROR("[" << name << "] Unexpected number of left 
consumers: " << numConsumersLeft
-                                  << " during close");
-                    return;
-                }
-                if (result != ResultOk) {
-                    state_ = Failed;
-                    LOG_ERROR("Closing the consumer failed for partition - " 
<< name << " with error - "
-                                                                             
<< result);
-                }
-                // closed all consumers
-                if (numConsumersLeft == 0) {
-                    messages_.clear();
-                    topicsPartitions_.clear();
-                    unAckedMessageTrackerPtr_->clear();
-
-                    if (state_ != Failed) {
-                        state_ = Closed;
-                    }
-
-                    if (callback) {
-                        callback(result);
-                    }
-                }
+            consumer->closeAsync([self, name, callback](Result result) {
+                self->handleSingleConsumerClose(result, name, callback);
             });
         });
     if (numConsumers == 0) {
@@ -421,6 +387,41 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
callback) {
     failPendingReceiveCallback();
 }
 
+void MultiTopicsConsumerImpl::handleSingleConsumerClose(Result result, 
std::string topicPartitionName,
+                                                        CloseCallback 
callback) {
+    consumers_.remove(topicPartitionName);
+
+    LOG_DEBUG("Closing the consumer for partition - " << topicPartitionName << 
" numberTopicPartitions_ - "
+                                                      << 
numberTopicPartitions_->load());
+
+    assert(numberTopicPartitions_->load() > 0);
+    numberTopicPartitions_->fetch_sub(1);
+
+    if (result != ResultOk) {
+        state_ = Failed;
+        LOG_ERROR("Closing the consumer failed for partition - " << 
topicPartitionName << " with error - "
+                                                                 << result);
+    }
+
+    // closed all consumers
+    if (numberTopicPartitions_->load() == 0) {
+        messages_.clear();
+        consumers_.clear();
+        topicsPartitions_.clear();
+        unAckedMessageTrackerPtr_->clear();
+
+        if (state_ != Failed) {
+            state_ = Closed;
+        }
+
+        multiTopicsConsumerCreatedPromise_.setFailed(ResultUnknownError);
+        if (callback) {
+            callback(result);
+        }
+        return;
+    }
+}
+
 void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
     LOG_DEBUG("Received Message from one of the topic - " << 
consumer.getTopic()
                                                           << " message:" << 
msg.getDataAsString());
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 0f111110c44..d2ede4a8770 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -105,7 +105,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     MessageListener messageListener_;
     LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
-    std::atomic<Result> failedResult{ResultOk};
     Promise<Result, ConsumerImplBaseWeakPtr> 
multiTopicsConsumerCreatedPromise_;
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string>& topics_;
@@ -114,6 +113,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                               unsigned int partitionIndex);
+    void handleSingleConsumerClose(Result result, std::string 
topicPartitionName, CloseCallback callback);
     void notifyResult(CloseCallback closeCallback);
     void messageReceived(Consumer consumer, const Message& msg);
     void internalListener(Consumer consumer);
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h 
b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 184ca6a2836..3a784675dd1 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -70,17 +70,6 @@ class SynchronizedHashMap {
         data_.clear();
     }
 
-    // clear the map and apply `f` on each removed value
-    void clear(std::function<void(const K&, const V&)> f) {
-        Lock lock(mutex_);
-        auto it = data_.begin();
-        while (it != data_.end()) {
-            f(it->first, it->second);
-            auto next = data_.erase(it);
-            it = next;
-        }
-    }
-
     OptValue find(const K& key) const {
         Lock lock(mutex_);
         auto it = data_.find(key);
diff --git a/pulsar-client-cpp/tests/ClientTest.cc 
b/pulsar-client-cpp/tests/ClientTest.cc
index 58c889f074a..00696b9a5a4 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -262,17 +262,6 @@ TEST(ClientTest, testWrongListener) {
 
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
-    Consumer multiTopicsConsumer;
-    ASSERT_EQ(ResultServiceUnitNotReady,
-              client.subscribe({topic + "-partition-0", topic + 
"-partition-1", topic + "-partition-2"},
-                               "sub", multiTopicsConsumer));
-
-    ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
-    ASSERT_EQ(ResultOk, client.close());
-
-    // Currently Reader can only read a non-partitioned topic in C++ client
-    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
-
     // Currently Reader can only read a non-partitioned topic in C++ client
     Reader reader;
     ASSERT_EQ(ResultServiceUnitNotReady,
diff --git a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc 
b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
index 8d74a24014a..62c55c46c8e 100644
--- a/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
+++ b/pulsar-client-cpp/tests/SynchronizedHashMapTest.cc
@@ -40,16 +40,9 @@ inline PairVector sort(PairVector pairs) {
 }
 
 TEST(SynchronizedHashMap, testClear) {
-    SyncMapType m({{1, 100}, {2, 200}});
+    SynchronizedHashMap<int, int> m({{1, 100}, {2, 200}});
     m.clear();
     ASSERT_EQ(m.toPairVector(), PairVector{});
-
-    PairVector expectedPairs({{3, 300}, {4, 400}});
-    SyncMapType m2(expectedPairs);
-    PairVector pairs;
-    m2.clear([&pairs](const int& key, const int& value) { 
pairs.emplace_back(key, value); });
-    ASSERT_EQ(m2.toPairVector(), PairVector{});
-    ASSERT_EQ(sort(pairs), expectedPairs);
 }
 
 TEST(SynchronizedHashMap, testRemoveAndFind) {

Reply via email to