This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b7fecc3 Fix Client::close might hang forever for a multi-topics
consumer (#211)
b7fecc3 is described below
commit b7fecc3fa3de7752bf6d594b3792b0ea516b309d
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 14 20:53:37 2023 +0800
Fix Client::close might hang forever for a multi-topics consumer (#211)
* Fix Client::close might hang forever for a multi-topics consumer
Fixes https://github.com/apache/pulsar-client-cpp/issues/209
### Motivation
There is a race condition when `Client::close` is called after a
`MultiTopicsConsumerImpl` instance is destructed. e.g.
```c++
{
Consumer consumer;
// subscribe multiple topics or partitions
client.subscribe(/* ... */, consumer);
} // consumer will be destructed after going out of the scope here
client.close();
```
There is a chance that when `client.close()` is called first, the
destructor of `consumer` is not called, so `client.consumers_` is
not empty and `ClientImpl::closeAsync` will wait until the
`MultiTopicsConsumerImpl::closeAsync` completes. Then, in its callback
https://github.com/apache/pulsar-client-cpp/blob/242ad770f76655d92c5c1d1abfdee1319ba2ebdc/lib/MultiTopicsConsumerImpl.cc#L482
the original callback (`ClientImpl::handleClose`) will be skipped and
`ClientImpl::closeAsync` will never complete.
It's also the root cause of the flaky `ConnectionFailTest`, we can see
each time it stuck for 120 minutes, there must be the following log:
```
ClientImpl:589 | Closing Pulsar client with 0 producers and 1 consumers
```
When the test passed, the log was:
```
ClientImpl:589 | Closing Pulsar client with 0 producers and 0 consumers
```
### Modifications
In `MultiTopicsConsumerImpl::closeAsync`, do not capture `weakSelf` in
the callback that is passed to `consumer->closeAsync`. Instead, capture
the `weakSelf` in the wrapped `callback` and modify the `state_` field
here. Since now `callback` (and `originalCallback`) will never be
skipped, `ClientImpl::close` will not be blocked forever.
### Verifications
Run the `ConnectionFailTest` inside a `ubuntu:22.04` docker container
until the following log appeared.
```
ClientImpl:589 | Closing Pulsar client with 0 producers and 1 consumers
```
We can see now the test still passed.
```
2023-03-14 06:51:39.201 INFO [140309830422656] ClientImpl:553 | Closing
Pulsar client with 0 producers and 1 consumers
2023-03-14 06:51:39.201 INFO [140309830422656] ConsumerImpl:1188 |
[persistent://public/default/test-connection-fail-51678776698-partition-4,
reader-e0617a7614, 9] Closing consumer for topic
persistent://public/default/test-connection-fail-51678776698-partition-4
2023-03-14 06:51:39.202 INFO [140309830422656] ConsumerImpl:1188 |
[persistent://public/default/test-connection-fail-51678776698-partition-3,
reader-e0617a7614, 8] Closing consumer for topic
persistent://public/default/test-connection-fail-51678776698-partition-3
2023-03-14 06:51:39.202 INFO [140309830422656] ConsumerImpl:1188 |
[persistent://public/default/test-connection-fail-51678776698-partition-2,
reader-e0617a7614, 7] Closing consumer for topic
persistent://public/default/test-connection-fail-51678776698-partition-2
2023-03-14 06:51:39.202 INFO [140309830422656] ConsumerImpl:1188 |
[persistent://public/default/test-connection-fail-51678776698-partition-1,
reader-e0617a7614, 6] Closing consumer for topic
persistent://public/default/test-connection-fail-51678776698-partition-1
2023-03-14 06:51:39.203 INFO [140309830422656] ConsumerImpl:1188 |
[persistent://public/default/test-connection-fail-51678776698-partition-0,
reader-e0617a7614, 5] Closing consumer for topic
persistent://public/default/test-connection-fail-51678776698-partition-0
2023-03-14 06:51:39.250 INFO [140309602235968] ConsumerImpl:1174 |
[persistent://public/default/test-connection-fail-51678776698-partition-4,
reader-e0617a7614, 9] Closed consumer 9
2023-03-14 06:51:39.250 INFO [140309602235968] ConsumerImpl:1174 |
[persistent://public/default/test-connection-fail-51678776698-partition-3,
reader-e0617a7614, 8] Closed consumer 8
2023-03-14 06:51:39.251 INFO [140309602235968] ConsumerImpl:1174 |
[persistent://public/default/test-connection-fail-51678776698-partition-2,
reader-e0617a7614, 7] Closed consumer 7
2023-03-14 06:51:39.251 INFO [140309602235968] ConsumerImpl:1174 |
[persistent://public/default/test-connection-fail-51678776698-partition-1,
reader-e0617a7614, 6] Closed consumer 6
2023-03-14 06:51:39.251 INFO [140309602235968] ConsumerImpl:1174 |
[persistent://public/default/test-connection-fail-51678776698-partition-0,
reader-e0617a7614, 5] Closed consumer 5
2023-03-14 06:51:39.252 INFO [140309593843264] ClientConnection:1200 |
[172.17.0.2:46406 -> 192.168.65.2:6650] Connection disconnected
2023-03-14 06:51:39.252 INFO [140309593843264] ClientConnection:262 |
[172.17.0.2:46406 -> 192.168.65.2:6650] Destroyed connection
2023-03-14 06:51:39.253 INFO [140309593843264] ClientConnection:1200 |
[172.17.0.2:46408 -> 192.168.65.2:6650] Connection disconnected
2023-03-14 06:51:39.253 INFO [140309593843264] ClientConnection:262 |
[172.17.0.2:46408 -> 192.168.65.2:6650] Destroyed connection
[ OK ] Unix/ConnectionFailTest.test/1 (312 ms)
[----------] 2 tests from Unix/ConnectionFailTest (450 ms total)
```
* Fix flaky testReferenceCount
---
lib/MultiTopicsConsumerImpl.cc | 71 ++++++++++++++++++++----------------------
tests/ClientTest.cc | 4 +++
2 files changed, 37 insertions(+), 38 deletions(-)
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 32891e7..c43bf2f 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -448,10 +448,17 @@ void
MultiTopicsConsumerImpl::handleOneTopicUnsubscribedAsync(
}
void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
- auto callback = [this, originalCallback](Result result) {
- shutdown();
- if (result != ResultOk) {
- LOG_WARN(getName() << "Failed to close consumer: " << result);
+ std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
+ auto callback = [weakSelf, originalCallback](Result result) {
+ auto self = weakSelf.lock();
+ if (self) {
+ self->shutdown();
+ if (result != ResultOk) {
+ LOG_WARN(self->getName() << "Failed to close consumer: " <<
result);
+ if (result != ResultAlreadyClosed) {
+ self->state_ = Failed;
+ }
+ }
}
if (originalCallback) {
originalCallback(result);
@@ -467,46 +474,34 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback
originalCallback) {
cancelTimers();
- auto weakSelf = weak_from_this();
- int numConsumers = 0;
- consumers_.clear(
- [this, weakSelf, &numConsumers, callback](const std::string& name,
const ConsumerImplPtr& consumer) {
- auto self = weakSelf.lock();
- if (!self) {
- return;
- }
- numConsumers++;
- consumer->closeAsync([this, weakSelf, name, callback](Result
result) {
- auto self = weakSelf.lock();
- if (!self) {
- return;
- }
- LOG_DEBUG("Closing the consumer for partition - " << name << "
numberTopicPartitions_ - "
- <<
numberTopicPartitions_->load());
- const int numConsumersLeft = --*numberTopicPartitions_;
- if (numConsumersLeft < 0) {
- LOG_ERROR("[" << name << "] Unexpected number of left
consumers: " << numConsumersLeft
- << " during close");
- return;
- }
- if (result != ResultOk) {
- state_ = Failed;
- LOG_ERROR("Closing the consumer failed for partition - "
<< name << " with error - "
-
<< result);
- }
- // closed all consumers
- if (numConsumersLeft == 0) {
- callback(result);
- }
- });
- });
- if (numConsumers == 0) {
+ auto consumers = consumers_.move();
+ *numberTopicPartitions_ = 0;
+ if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic_ << " subscription - " <<
subscriptionName_);
callback(ResultAlreadyClosed);
return;
}
+ auto numConsumers =
std::make_shared<std::atomic<size_t>>(consumers.size());
+ for (auto&& kv : consumers) {
+ auto& name = kv.first;
+ auto& consumer = kv.second;
+ consumer->closeAsync([name, numConsumers, callback](Result result) {
+ const auto numConsumersLeft = --*numConsumers;
+ LOG_DEBUG("Closing the consumer for partition - " << name << "
numConsumersLeft - "
+ <<
numConsumersLeft);
+
+ if (result != ResultOk) {
+ LOG_ERROR("Closing the consumer failed for partition - " <<
name << " with error - "
+ <<
result);
+ }
+ if (numConsumersLeft == 0) {
+ callback(result);
+ }
+ });
+ }
+
// fail pending receive
failPendingReceiveCallback();
failPendingBatchReceiveCallback();
diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc
index d59633f..787c1ba 100644
--- a/tests/ClientTest.cc
+++ b/tests/ClientTest.cc
@@ -25,6 +25,7 @@
#include "HttpHelper.h"
#include "PulsarFriend.h"
+#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/LogUtils.h"
#include "lib/checksum/ChecksumProvider.h"
@@ -228,6 +229,9 @@ TEST(ClientTest, testReferenceCount) {
LOG_INFO("Reference count of the reader: " <<
readerWeakPtr.use_count());
}
+ waitUntil(std::chrono::seconds(3), [&] {
+ return producers.size() == 0 && consumers.size() == 0 &&
readerWeakPtr.use_count() == 0;
+ });
EXPECT_EQ(producers.size(), 0);
EXPECT_EQ(consumers.size(), 0);
EXPECT_EQ(readerWeakPtr.use_count(), 0);