This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a8402da Fix close() returns ResultAlreadyClosed after unsubscribe or
close (#338)
a8402da is described below
commit a8402da957143b322364c048a8b19dc790891724
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Nov 10 10:54:58 2023 +0800
Fix close() returns ResultAlreadyClosed after unsubscribe or close (#338)
Fixes https://github.com/apache/pulsar-client-cpp/issues/88
### Motivation
When `close` is called if the consumer has already called `unsubscribe`
or `close`, it should not fail. See
https://github.com/apache/pulsar/blob/428c18c8d0c3d135189920740192982e11ffb2bf/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1034
### Modifications
Use the same close logic with Java client.
Add `testCloseAgainBeforeCloseDone` and `testCloseAfterUnsubscribe` to
verify the new behaviors of `Consumer::close`.
---
lib/ConsumerImpl.cc | 11 +++++++----
tests/BasicEndToEndTest.cc | 14 +++++++-------
tests/ConsumerConfigurationTest.cc | 2 +-
tests/ConsumerTest.cc | 25 +++++++++++++++++++++++++
tests/c/c_BasicEndToEndTest.cc | 2 +-
5 files changed, 41 insertions(+), 13 deletions(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 8dd334e..b466683 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -1238,10 +1238,12 @@ void ConsumerImpl::disconnectConsumer() {
}
void ConsumerImpl::closeAsync(ResultCallback originalCallback) {
- auto callback = [this, originalCallback](Result result) {
+ auto callback = [this, originalCallback](Result result, bool alreadyClosed
= false) {
shutdown();
if (result == ResultOk) {
- LOG_INFO(getName() << "Closed consumer " << consumerId_);
+ if (!alreadyClosed) {
+ LOG_INFO(getName() << "Closed consumer " << consumerId_);
+ }
} else {
LOG_WARN(getName() << "Failed to close consumer: " << result);
}
@@ -1250,8 +1252,9 @@ void ConsumerImpl::closeAsync(ResultCallback
originalCallback) {
}
};
- if (state_ != Ready) {
- callback(ResultAlreadyClosed);
+ auto state = state_.load();
+ if (state == Closing || state == Closed) {
+ callback(ResultOk, true);
return;
}
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 0951dd4..e2c6697 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -244,7 +244,7 @@ TEST(BasicEndToEndTest, testProduceConsume) {
consumer.receive(receivedMsg);
ASSERT_EQ(content, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}
@@ -405,7 +405,7 @@ TEST(BasicEndToEndTest,
testMultipleClientsMultipleSubscriptions) {
ASSERT_EQ(ResultOk, producer1.close());
ASSERT_EQ(ResultOk, consumer1.close());
- ASSERT_EQ(ResultAlreadyClosed, consumer1.close());
+ ASSERT_EQ(ResultOk, consumer1.close());
ASSERT_EQ(ResultConsumerNotInitialized, consumer2.close());
ASSERT_EQ(ResultOk, client1.close());
@@ -637,7 +637,7 @@ TEST(BasicEndToEndTest, testCompressionLZ4) {
ASSERT_EQ(content2, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}
@@ -675,7 +675,7 @@ TEST(BasicEndToEndTest, testCompressionZLib) {
ASSERT_EQ(content2, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}
@@ -750,7 +750,7 @@ TEST(BasicEndToEndTest, testConsumerClose) {
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumer));
ASSERT_EQ(consumer.close(), ResultOk);
- ASSERT_EQ(consumer.close(), ResultAlreadyClosed);
+ ASSERT_EQ(consumer.close(), ResultOk);
}
TEST(BasicEndToEndTest, testDuplicateConsumerCreationOnPartitionedTopic) {
@@ -1398,7 +1398,7 @@ TEST(BasicEndToEndTest, testRSAEncryption) {
}
ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
}
ASSERT_EQ(ResultOk, client.close());
@@ -1617,7 +1617,7 @@ TEST(BasicEndToEndTest, testSeek) {
ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}
diff --git a/tests/ConsumerConfigurationTest.cc
b/tests/ConsumerConfigurationTest.cc
index f378501..482f0fc 100644
--- a/tests/ConsumerConfigurationTest.cc
+++ b/tests/ConsumerConfigurationTest.cc
@@ -314,7 +314,7 @@ TEST(ConsumerConfigurationTest,
testSubscriptionInitialPosition) {
ASSERT_EQ(content1, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.unsubscribe());
- ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 80c0a71..0836fbf 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1405,4 +1405,29 @@ TEST(ConsumerTest, testNoListenerThreadBlocking) {
client.close();
}
+TEST(ConsumerTest, testCloseAfterUnsubscribe) {
+ Client client{lookupUrl};
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe("test-close-after-unsubscribe",
"sub", consumer));
+ ASSERT_EQ(ResultOk, consumer.unsubscribe());
+ ASSERT_EQ(ResultOk, consumer.close());
+}
+
+TEST(ConsumerTest, testCloseAgainBeforeCloseDone) {
+ Client client{lookupUrl};
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe("test-close-again-before-close-done",
"sub", consumer));
+ auto done = std::make_shared<std::atomic_bool>(false);
+ auto result = std::make_shared<std::atomic<Result>>(ResultOk);
+ consumer.closeAsync([done, result](Result innerResult) {
+ result->store(innerResult);
+ done->store(true);
+ });
+ ASSERT_EQ(ResultOk, consumer.close());
+ ASSERT_FALSE(*done);
+ waitUntil(std::chrono::seconds(3), [done] { return done->load(); });
+ ASSERT_EQ(ResultOk, *result);
+ ASSERT_TRUE(*done);
+}
+
} // namespace pulsar
diff --git a/tests/c/c_BasicEndToEndTest.cc b/tests/c/c_BasicEndToEndTest.cc
index 04aa1dc..e3ff19a 100644
--- a/tests/c/c_BasicEndToEndTest.cc
+++ b/tests/c/c_BasicEndToEndTest.cc
@@ -109,7 +109,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
delete receive_ctx.data;
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
- ASSERT_EQ(pulsar_result_AlreadyClosed, pulsar_consumer_close(consumer));
+ ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));
ASSERT_EQ(pulsar_result_Ok, pulsar_producer_close(producer));
ASSERT_EQ(pulsar_result_Ok, pulsar_client_close(client));