This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f3899d53959971b56ddce8e4809b6c759bc1d061 Author: Yunze Xu <[email protected]> AuthorDate: Wed Jul 13 09:16:06 2022 +0800 [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510) Fixes #16509 ### Motivation Since all producers and consumer of the same Client share the same connection pool, if one failed, the following producer or consumer might reuse the broken connection and failed with `ResultConnectError`. It causes the flaky `testWrongListener` because the consumer and reader would be created after the producer creation failed with `ResultServiceNotReady`. ### Modifications Recreate the `Client` for the subsequent creation of `Consumer` and `Reader` so that new connection pools will be used for them. There is also a potential bug that makes `Client::shutdown` wait for the max timeout (3 seconds), this PR also applies the timeout for other executors. ### Verifying this change After this change, I've run the following command in my local env and it never failed. ```bash # rerun the testWrongListener for 20 times ./tests/main --gtest_filter='ClientTest.testWrongListener' --gtest_repeat=20 ``` (cherry picked from commit 10307f90ab9b0d168a5a40f40027177fae587602) --- pulsar-client-cpp/lib/ClientImpl.cc | 4 ++-- pulsar-client-cpp/tests/ClientTest.cc | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 7c9a3a18f4a..d419ffe7dc5 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -609,12 +609,12 @@ void ClientImpl::shutdown() { LOG_DEBUG("ioExecutorProvider_ is closed"); timeoutProcessor.tik(); - listenerExecutorProvider_->close(); + listenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout()); timeoutProcessor.tok(); LOG_DEBUG("listenerExecutorProvider_ is closed"); timeoutProcessor.tik(); - partitionListenerExecutorProvider_->close(); + partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout()); timeoutProcessor.tok(); LOG_DEBUG("partitionListenerExecutorProvider_ is closed"); } diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index dd073fb6bfc..14c3af70aa3 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -234,17 +234,19 @@ TEST(ClientTest, testWrongListener) { Producer producer; ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, producer)); ASSERT_EQ(ResultProducerNotInitialized, producer.close()); + ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0); + ASSERT_EQ(ResultOk, client.close()); + // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the + // creation of Consumer or Reader could fail with ResultConnectError. + client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); Consumer consumer; ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", consumer)); ASSERT_EQ(ResultConsumerNotInitialized, consumer.close()); - ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0); ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0); ASSERT_EQ(ResultOk, client.close()); - // The connection will be closed when the consumer failed, we must recreate the Client. Otherwise, the - // creation of Reader would fail with ResultConnectError. client = Client(lookupUrl, ClientConfiguration().setListenerName("test")); // Currently Reader can only read a non-partitioned topic in C++ client
