This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbba81446b980609b920a748d69b208578bfab4e
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Jul 13 09:16:06 2022 +0800

    [improve][test] Fix flaky C++ ClientTest.testWrongListener (#16510)
    
    Fixes #16509
    
    ### Motivation
    
    Since all producers and consumer of the same Client share the same
    connection pool, if one failed, the following producer or consumer might
    reuse the broken connection and failed with `ResultConnectError`. It
    causes the flaky `testWrongListener` because the consumer and reader
    would be created after the producer creation failed with
    `ResultServiceNotReady`.
    
    ### Modifications
    
    Recreate the `Client` for the subsequent creation of `Consumer` and
    `Reader` so that new connection pools will be used for them.
    
    There is also a potential bug that makes `Client::shutdown` wait for the
    max timeout (3 seconds), this PR also applies the timeout for other
    executors.
    
    ### Verifying this change
    
    After this change, I've run the following command in my local env and it
    never failed.
    
    ```bash
    # rerun the testWrongListener for 20 times
    ./tests/main --gtest_filter='ClientTest.testWrongListener' --gtest_repeat=20
    ```
    
    (cherry picked from commit 10307f90ab9b0d168a5a40f40027177fae587602)
---
 pulsar-client-cpp/lib/ClientImpl.cc   | 4 ++--
 pulsar-client-cpp/tests/ClientTest.cc | 8 +++++---
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index d2955cf223f..6a98491d1ff 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -602,12 +602,12 @@ void ClientImpl::shutdown() {
     LOG_DEBUG("ioExecutorProvider_ is closed");
 
     timeoutProcessor.tik();
-    listenerExecutorProvider_->close();
+    listenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("listenerExecutorProvider_ is closed");
 
     timeoutProcessor.tik();
-    partitionListenerExecutorProvider_->close();
+    
partitionListenerExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
     timeoutProcessor.tok();
     LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
diff --git a/pulsar-client-cpp/tests/ClientTest.cc 
b/pulsar-client-cpp/tests/ClientTest.cc
index dd073fb6bfc..14c3af70aa3 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -234,17 +234,19 @@ TEST(ClientTest, testWrongListener) {
     Producer producer;
     ASSERT_EQ(ResultServiceUnitNotReady, client.createProducer(topic, 
producer));
     ASSERT_EQ(ResultProducerNotInitialized, producer.close());
+    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
+    ASSERT_EQ(ResultOk, client.close());
 
+    // The connection will be closed when the consumer failed, we must 
recreate the Client. Otherwise, the
+    // creation of Consumer or Reader could fail with ResultConnectError.
+    client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
     Consumer consumer;
     ASSERT_EQ(ResultServiceUnitNotReady, client.subscribe(topic, "sub", 
consumer));
     ASSERT_EQ(ResultConsumerNotInitialized, consumer.close());
 
-    ASSERT_EQ(PulsarFriend::getProducers(client).size(), 0);
     ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
     ASSERT_EQ(ResultOk, client.close());
 
-    // The connection will be closed when the consumer failed, we must 
recreate the Client. Otherwise, the
-    // creation of Reader would fail with ResultConnectError.
     client = Client(lookupUrl, ClientConfiguration().setListenerName("test"));
 
     // Currently Reader can only read a non-partitioned topic in C++ client

Reply via email to