This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a8402da  Fix close() returns ResultAlreadyClosed after unsubscribe or 
close (#338)
a8402da is described below

commit a8402da957143b322364c048a8b19dc790891724
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Nov 10 10:54:58 2023 +0800

    Fix close() returns ResultAlreadyClosed after unsubscribe or close (#338)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/88
    
    ### Motivation
    
    When `close` is called if the consumer has already called `unsubscribe`
    or `close`, it should not fail. See
    
    
https://github.com/apache/pulsar/blob/428c18c8d0c3d135189920740192982e11ffb2bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1034
    
    ### Modifications
    
    Use the same close logic with Java client.
    
    Add `testCloseAgainBeforeCloseDone` and `testCloseAfterUnsubscribe` to
    verify the new behaviors of `Consumer::close`.
---
 lib/ConsumerImpl.cc                | 11 +++++++----
 tests/BasicEndToEndTest.cc         | 14 +++++++-------
 tests/ConsumerConfigurationTest.cc |  2 +-
 tests/ConsumerTest.cc              | 25 +++++++++++++++++++++++++
 tests/c/c_BasicEndToEndTest.cc     |  2 +-
 5 files changed, 41 insertions(+), 13 deletions(-)

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 8dd334e..b466683 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1238,10 +1238,12 @@ void ConsumerImpl::disconnectConsumer() {
 }
 
 void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
-    auto callback = [this, originalCallback](Result result) {
+    auto callback = [this, originalCallback](Result result, bool alreadyClosed 
= false) {
         shutdown();
         if (result == ResultOk) {
-            LOG_INFO(getName() << "Closed consumer " << consumerId_);
+            if (!alreadyClosed) {
+                LOG_INFO(getName() << "Closed consumer " << consumerId_);
+            }
         } else {
             LOG_WARN(getName() << "Failed to close consumer: " << result);
         }
@@ -1250,8 +1252,9 @@ void ConsumerImpl::closeAsync(ResultCallback 
originalCallback) {
         }
     };
 
-    if (state_ != Ready) {
-        callback(ResultAlreadyClosed);
+    auto state = state_.load();
+    if (state == Closing || state == Closed) {
+        callback(ResultOk, true);
         return;
     }
 
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 0951dd4..e2c6697 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -244,7 +244,7 @@ TEST(BasicEndToEndTest, testProduceConsume) {
     consumer.receive(receivedMsg);
     ASSERT_EQ(content, receivedMsg.getDataAsString());
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, consumer.close());
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
@@ -405,7 +405,7 @@ TEST(BasicEndToEndTest, 
testMultipleClientsMultipleSubscriptions) {
 
     ASSERT_EQ(ResultOk, producer1.close());
     ASSERT_EQ(ResultOk, consumer1.close());
-    ASSERT_EQ(ResultAlreadyClosed, consumer1.close());
+    ASSERT_EQ(ResultOk, consumer1.close());
     ASSERT_EQ(ResultConsumerNotInitialized, consumer2.close());
     ASSERT_EQ(ResultOk, client1.close());
 
@@ -637,7 +637,7 @@ TEST(BasicEndToEndTest, testCompressionLZ4) {
     ASSERT_EQ(content2, receivedMsg.getDataAsString());
 
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, consumer.close());
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
@@ -675,7 +675,7 @@ TEST(BasicEndToEndTest, testCompressionZLib) {
     ASSERT_EQ(content2, receivedMsg.getDataAsString());
 
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, consumer.close());
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
@@ -750,7 +750,7 @@ TEST(BasicEndToEndTest, testConsumerClose) {
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
     ASSERT_EQ(consumer.close(), ResultOk);
-    ASSERT_EQ(consumer.close(), ResultAlreadyClosed);
+    ASSERT_EQ(consumer.close(), ResultOk);
 }
 
 TEST(BasicEndToEndTest, testDuplicateConsumerCreationOnPartitionedTopic) {
@@ -1398,7 +1398,7 @@ TEST(BasicEndToEndTest, testRSAEncryption) {
         }
 
         ASSERT_EQ(ResultOk, consumer.unsubscribe());
-        ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+        ASSERT_EQ(ResultOk, consumer.close());
         ASSERT_EQ(ResultOk, producer.close());
     }
     ASSERT_EQ(ResultOk, client.close());
@@ -1617,7 +1617,7 @@ TEST(BasicEndToEndTest, testSeek) {
     ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
     ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, consumer.close());
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
diff --git a/tests/ConsumerConfigurationTest.cc 
b/tests/ConsumerConfigurationTest.cc
index f378501..482f0fc 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -314,7 +314,7 @@ TEST(ConsumerConfigurationTest, 
testSubscriptionInitialPosition) {
     ASSERT_EQ(content1, receivedMsg.getDataAsString());
 
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, consumer.close());
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 80c0a71..0836fbf 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1405,4 +1405,29 @@ TEST(ConsumerTest, testNoListenerThreadBlocking) {
     client.close();
 }
 
+TEST(ConsumerTest, testCloseAfterUnsubscribe) {
+    Client client{lookupUrl};
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe("test-close-after-unsubscribe", 
"sub", consumer));
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    ASSERT_EQ(ResultOk, consumer.close());
+}
+
+TEST(ConsumerTest, testCloseAgainBeforeCloseDone) {
+    Client client{lookupUrl};
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe("test-close-again-before-close-done", 
"sub", consumer));
+    auto done = std::make_shared<std::atomic_bool>(false);
+    auto result = std::make_shared<std::atomic<Result>>(ResultOk);
+    consumer.closeAsync([done, result](Result innerResult) {
+        result->store(innerResult);
+        done->store(true);
+    });
+    ASSERT_EQ(ResultOk, consumer.close());
+    ASSERT_FALSE(*done);
+    waitUntil(std::chrono::seconds(3), [done] { return done->load(); });
+    ASSERT_EQ(ResultOk, *result);
+    ASSERT_TRUE(*done);
+}
+
 }  // namespace pulsar
diff --git a/tests/c/c_BasicEndToEndTest.cc b/tests/c/c_BasicEndToEndTest.cc
index 04aa1dc..e3ff19a 100644
--- a/tests/c/c_BasicEndToEndTest.cc
+++ b/tests/c/c_BasicEndToEndTest.cc
@@ -109,7 +109,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
     delete receive_ctx.data;
 
     ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
-    ASSERT_EQ(pulsar_result_AlreadyClosed, pulsar_consumer_close(consumer));
+    ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));
     ASSERT_EQ(pulsar_result_Ok, pulsar_producer_close(producer));
     ASSERT_EQ(pulsar_result_Ok, pulsar_client_close(client));
 

Reply via email to