This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b7fecc3  Fix Client::close might hang forever for a multi-topics 
consumer (#211)
b7fecc3 is described below

commit b7fecc3fa3de7752bf6d594b3792b0ea516b309d
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 14 20:53:37 2023 +0800

    Fix Client::close might hang forever for a multi-topics consumer (#211)
    
    * Fix Client::close might hang forever for a multi-topics consumer
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/209
    
    ### Motivation
    
    There is a race condition when `Client::close` is called after a
    `MultiTopicsConsumerImpl` instance is destructed. e.g.
    
    ```c++
    {
      Consumer consumer;
      // subscribe multiple topics or partitions
      client.subscribe(/* ... */, consumer);
    }  // consumer will be destructed after going out of the scope here
    client.close();
    ```
    
    There is a chance that when `client.close()` is called first, the
    destructor of `consumer` is not called, so `client.consumers_` is
    not empty and `ClientImpl::closeAsync` will wait until the
    `MultiTopicsConsumerImpl::closeAsync` completes. Then, in its callback
    
https://github.com/apache/pulsar-client-cpp/blob/242ad770f76655d92c5c1d1abfdee1319ba2ebdc/lib/MultiTopicsConsumerImpl.cc#L482
    
    the original callback (`ClientImpl::handleClose`) will be skipped and
    `ClientImpl::closeAsync` will never complete.
    
    It's also the root cause of the flaky `ConnectionFailTest`, we can see
    each time it stuck for 120 minutes, there must be the following log:
    
    ```
    ClientImpl:589 | Closing Pulsar client with 0 producers and 1 consumers
    ```
    
    When the test passed, the log was:
    
    ```
    ClientImpl:589 | Closing Pulsar client with 0 producers and 0 consumers
    ```
    
    ### Modifications
    
    In `MultiTopicsConsumerImpl::closeAsync`, do not capture `weakSelf` in
    the callback that is passed to `consumer->closeAsync`. Instead, capture
    the `weakSelf` in the wrapped `callback` and modify the `state_` field
    here. Since now `callback` (and `originalCallback`) will never be
    skipped, `ClientImpl::close` will not be blocked forever.
    
    ### Verifications
    
    Run the `ConnectionFailTest` inside a `ubuntu:22.04` docker container
    until the following log appeared.
    
    ```
    ClientImpl:589 | Closing Pulsar client with 0 producers and 1 consumers
    ```
    
    We can see now the test still passed.
    
    ```
    2023-03-14 06:51:39.201 INFO  [140309830422656] ClientImpl:553 | Closing 
Pulsar client with 0 producers and 1 consumers
    2023-03-14 06:51:39.201 INFO  [140309830422656] ConsumerImpl:1188 | 
[persistent://public/default/test-connection-fail-51678776698-partition-4, 
reader-e0617a7614, 9] Closing consumer for topic 
persistent://public/default/test-connection-fail-51678776698-partition-4
    2023-03-14 06:51:39.202 INFO  [140309830422656] ConsumerImpl:1188 | 
[persistent://public/default/test-connection-fail-51678776698-partition-3, 
reader-e0617a7614, 8] Closing consumer for topic 
persistent://public/default/test-connection-fail-51678776698-partition-3
    2023-03-14 06:51:39.202 INFO  [140309830422656] ConsumerImpl:1188 | 
[persistent://public/default/test-connection-fail-51678776698-partition-2, 
reader-e0617a7614, 7] Closing consumer for topic 
persistent://public/default/test-connection-fail-51678776698-partition-2
    2023-03-14 06:51:39.202 INFO  [140309830422656] ConsumerImpl:1188 | 
[persistent://public/default/test-connection-fail-51678776698-partition-1, 
reader-e0617a7614, 6] Closing consumer for topic 
persistent://public/default/test-connection-fail-51678776698-partition-1
    2023-03-14 06:51:39.203 INFO  [140309830422656] ConsumerImpl:1188 | 
[persistent://public/default/test-connection-fail-51678776698-partition-0, 
reader-e0617a7614, 5] Closing consumer for topic 
persistent://public/default/test-connection-fail-51678776698-partition-0
    2023-03-14 06:51:39.250 INFO  [140309602235968] ConsumerImpl:1174 | 
[persistent://public/default/test-connection-fail-51678776698-partition-4, 
reader-e0617a7614, 9] Closed consumer 9
    2023-03-14 06:51:39.250 INFO  [140309602235968] ConsumerImpl:1174 | 
[persistent://public/default/test-connection-fail-51678776698-partition-3, 
reader-e0617a7614, 8] Closed consumer 8
    2023-03-14 06:51:39.251 INFO  [140309602235968] ConsumerImpl:1174 | 
[persistent://public/default/test-connection-fail-51678776698-partition-2, 
reader-e0617a7614, 7] Closed consumer 7
    2023-03-14 06:51:39.251 INFO  [140309602235968] ConsumerImpl:1174 | 
[persistent://public/default/test-connection-fail-51678776698-partition-1, 
reader-e0617a7614, 6] Closed consumer 6
    2023-03-14 06:51:39.251 INFO  [140309602235968] ConsumerImpl:1174 | 
[persistent://public/default/test-connection-fail-51678776698-partition-0, 
reader-e0617a7614, 5] Closed consumer 5
    2023-03-14 06:51:39.252 INFO  [140309593843264] ClientConnection:1200 | 
[172.17.0.2:46406 -> 192.168.65.2:6650] Connection disconnected
    2023-03-14 06:51:39.252 INFO  [140309593843264] ClientConnection:262 | 
[172.17.0.2:46406 -> 192.168.65.2:6650] Destroyed connection
    2023-03-14 06:51:39.253 INFO  [140309593843264] ClientConnection:1200 | 
[172.17.0.2:46408 -> 192.168.65.2:6650] Connection disconnected
    2023-03-14 06:51:39.253 INFO  [140309593843264] ClientConnection:262 | 
[172.17.0.2:46408 -> 192.168.65.2:6650] Destroyed connection
    [       OK ] Unix/ConnectionFailTest.test/1 (312 ms)
    [----------] 2 tests from Unix/ConnectionFailTest (450 ms total)
    ```
    
    * Fix flaky testReferenceCount
---
 lib/MultiTopicsConsumerImpl.cc | 71 ++++++++++++++++++++----------------------
 tests/ClientTest.cc            |  4 +++
 2 files changed, 37 insertions(+), 38 deletions(-)

diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 32891e7..c43bf2f 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -448,10 +448,17 @@ void 
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
 }
 
 void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
-    auto callback = [this, originalCallback](Result result) {
-        shutdown();
-        if (result != ResultOk) {
-            LOG_WARN(getName() << "Failed to close consumer: " << result);
+    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+    auto callback = [weakSelf, originalCallback](Result result) {
+        auto self = weakSelf.lock();
+        if (self) {
+            self->shutdown();
+            if (result != ResultOk) {
+                LOG_WARN(self->getName() << "Failed to close consumer: " << 
result);
+                if (result != ResultAlreadyClosed) {
+                    self->state_ = Failed;
+                }
+            }
         }
         if (originalCallback) {
             originalCallback(result);
@@ -467,46 +474,34 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback 
originalCallback) {
 
     cancelTimers();
 
-    auto weakSelf = weak_from_this();
-    int numConsumers = 0;
-    consumers_.clear(
-        [this, weakSelf, &numConsumers, callback](const std::string& name, 
const ConsumerImplPtr& consumer) {
-            auto self = weakSelf.lock();
-            if (!self) {
-                return;
-            }
-            numConsumers++;
-            consumer->closeAsync([this, weakSelf, name, callback](Result 
result) {
-                auto self = weakSelf.lock();
-                if (!self) {
-                    return;
-                }
-                LOG_DEBUG("Closing the consumer for partition - " << name << " 
numberTopicPartitions_ - "
-                                                                  << 
numberTopicPartitions_->load());
-                const int numConsumersLeft = --*numberTopicPartitions_;
-                if (numConsumersLeft < 0) {
-                    LOG_ERROR("[" << name << "] Unexpected number of left 
consumers: " << numConsumersLeft
-                                  << " during close");
-                    return;
-                }
-                if (result != ResultOk) {
-                    state_ = Failed;
-                    LOG_ERROR("Closing the consumer failed for partition - " 
<< name << " with error - "
-                                                                             
<< result);
-                }
-                // closed all consumers
-                if (numConsumersLeft == 0) {
-                    callback(result);
-                }
-            });
-        });
-    if (numConsumers == 0) {
+    auto consumers = consumers_.move();
+    *numberTopicPartitions_ = 0;
+    if (consumers.empty()) {
         LOG_DEBUG("TopicsConsumer have no consumers to close "
                   << " topic" << topic_ << " subscription - " << 
subscriptionName_);
         callback(ResultAlreadyClosed);
         return;
     }
 
+    auto numConsumers = 
std::make_shared<std::atomic<size_t>>(consumers.size());
+    for (auto&& kv : consumers) {
+        auto& name = kv.first;
+        auto& consumer = kv.second;
+        consumer->closeAsync([name, numConsumers, callback](Result result) {
+            const auto numConsumersLeft = --*numConsumers;
+            LOG_DEBUG("Closing the consumer for partition - " << name << " 
numConsumersLeft - "
+                                                              << 
numConsumersLeft);
+
+            if (result != ResultOk) {
+                LOG_ERROR("Closing the consumer failed for partition - " << 
name << " with error - "
+                                                                         << 
result);
+            }
+            if (numConsumersLeft == 0) {
+                callback(result);
+            }
+        });
+    }
+
     // fail pending receive
     failPendingReceiveCallback();
     failPendingBatchReceiveCallback();
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index d59633f..787c1ba 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -25,6 +25,7 @@
 
 #include "HttpHelper.h"
 #include "PulsarFriend.h"
+#include "WaitUtils.h"
 #include "lib/ClientConnection.h"
 #include "lib/LogUtils.h"
 #include "lib/checksum/ChecksumProvider.h"
@@ -228,6 +229,9 @@ TEST(ClientTest, testReferenceCount) {
         LOG_INFO("Reference count of the reader: " << 
readerWeakPtr.use_count());
     }
 
+    waitUntil(std::chrono::seconds(3), [&] {
+        return producers.size() == 0 && consumers.size() == 0 && 
readerWeakPtr.use_count() == 0;
+    });
     EXPECT_EQ(producers.size(), 0);
     EXPECT_EQ(consumers.size(), 0);
     EXPECT_EQ(readerWeakPtr.use_count(), 0);

Reply via email to