This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fbba81446b980609b920a748d69b208578bfab4e Author: Yunze Xu <[email protected]> AuthorDate: Wed Jul 13 09:16:06 2022 +0800 [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510) Fixes #16509 ### Motivation Since all producers and consumer of the same Client share the same connection pool, if one failed, the following producer or consumer might reuse the broken connection and failed with `ResultConnectError`. It causes the flaky `testWrongListener` because the consumer and reader would be created after the producer creation failed with `ResultServiceNotReady`. ### Modifications Recreate the `Client` for the subsequent creation of `Consumer` and `Reader` so that new connection pools will be used for them. There is also a potential bug that makes `Client::shutdown` wait for the max timeout (3 seconds), this PR also applies the timeout for other executors. ### Verifying this change After this change, I've run the following command in my local env and it never failed. ```bash # rerun the testWrongListener for 20 times ./tests/main --gtest_filter='ClientTest.testWrongListener' --gtest_repeat=20 ``` (cherry picked from commit 10307f90ab9b0d168a5a40f40027177fae587602) --- pulsar-client-cpp/lib/ClientImpl.cc | 4 ++-- pulsar-client-cpp/tests/ClientTest.cc | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index d2955cf223f..6a98491d1ff 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -602,12 +602,12 @@ void ClientImpl::shutdown() { LOG_DEBUG("ioExecutorProvider_ is closed"); timeoutProcessor.tik(); - listenerExecutorProvider_->close(); + listenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout()); timeoutProcessor.tok(); LOG_DEBUG("listenerExecutorProvider_ is closed"); timeoutProcessor.tik(); - partitionListenerExecutorProvider_->close(); + partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout()); timeoutProcessor.tok(); LOG_DEBUG("partitionListenerExecutorProvider_ is closed"); } diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index dd073fb6bfc..14c3af70aa3 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -234,17 +234,19 @@ TEST(ClientTest, testWrongListener) { Producer producer; ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer)); ASSERT_EQ(ResultProducerNotInitialized, producer.close()); + ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0); + ASSERT_EQ(ResultOk, client.close()); + // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the + // creation of Consumer or Reader could fail with ResultConnectError. + client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); Consumer consumer; ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer)); ASSERT_EQ(ResultConsumerNotInitialized, consumer.close()); - ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0); ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); ASSERT_EQ(ResultOk, client.close()); - // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the - // creation of Reader would fail with ResultConnectError. client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); // Currently Reader can only read a non-partitioned topic in C++ client
