This is an automated email from the ASF dual-hosted git repository.
shibd pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 7d1002a Fix consumer reconnect state after subscribe failure (#577)
7d1002a is described below
commit 7d1002ae4f203ac142a7ad8661c179cee2ffe6c8
Author: Baodi Shi <[email protected]>
AuthorDate: Mon May 18 11:55:35 2026 +0800
Fix consumer reconnect state after subscribe failure (#577)
---
lib/ConsumerImpl.cc | 3 ++-
tests/ConsumerTest.cc | 36 ++++++++++++++++++++++++++++++++++++
tests/PulsarFriend.h | 5 +++++
3 files changed, 43 insertions(+), 1 deletion(-)
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d268484..85f4994 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -371,8 +371,9 @@ Result ConsumerImpl::handleCreateConsumer(const
ClientConnectionPtr& cnx, Result
}
if (consumerCreatedPromise_.isComplete()) {
- // Consumer had already been initially created, we need to retry
connecting in any case
+ // Clear the connection set before SUBSCRIBE so the next reconnect
is not skipped.
LOG_WARN(getName() << "Failed to reconnect consumer: " <<
strResult(result));
+ resetCnx();
handleResult = ResultRetryable;
} else {
// Consumer was not yet created, retry to connect to broker if
it's possible
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index 795613e..5de5c75 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1596,4 +1596,40 @@ TEST(ConsumerTest, testCloseAfterSeek) {
anotherClient.close();
}
+TEST(ConsumerTest,
testIsConnectedFalsePositiveAfterSubscribeRejectedOnReconnect) {
+ // A reconnect SUBSCRIBE failure happens after the initial subscribe has
already completed.
+ const std::string topic =
+ "persistent://public/default/test-false-positive-" +
std::to_string(time(nullptr));
+ Client client(lookupUrl);
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+ ASSERT_TRUE(consumer.isConnected()) << "Precondition: consumer should be
connected";
+
+ auto& consumerImpl = PulsarFriend::getConsumerImpl(consumer);
+
+ // Capture the current live connection.
+ auto cnx = consumerImpl.getCnx().lock();
+ ASSERT_TRUE(cnx != nullptr) << "Precondition: cnx should be non-null";
+ LOG_INFO("Step 1 passed: consumer subscribed, cnx=" << cnx);
+
+ // Simulate the broker rejecting the SUBSCRIBE command during reconnect.
+ Result handleResult = PulsarFriend::consumerHandleCreateConsumer(consumer,
cnx, ResultAuthorizationError);
+ LOG_INFO("Step 2: handleCreateConsumer returned " << handleResult);
+ EXPECT_EQ(ResultRetryable, handleResult)
+ << "handleCreateConsumer should return ResultRetryable for an
already-created consumer";
+
+ // The failed SUBSCRIBE must clear the connection set before SUBSCRIBE.
+ auto cnxAfter = consumerImpl.getCnx().lock();
+ LOG_INFO("Step 3: cnx after handleCreateConsumer failure = " << cnxAfter);
+ LOG_INFO("Step 3: isConnected() = " << consumer.isConnected());
+
+ EXPECT_EQ(nullptr, cnxAfter)
+ << "After fix: connection_ must be cleared by resetCnx() so grabCnx()
can retry";
+ EXPECT_FALSE(consumer.isConnected())
+ << "After fix: isConnected() must return false after SUBSCRIBE
rejection";
+
+ consumer.close();
+ client.close();
+}
+
} // namespace pulsar
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 8017e0b..0244743 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -102,6 +102,11 @@ class PulsarFriend {
return *consumerImpl;
}
+ static Result consumerHandleCreateConsumer(Consumer consumer, const
ClientConnectionPtr& cnx,
+ Result result) {
+ return getConsumerImpl(consumer).handleCreateConsumer(cnx, result);
+ }
+
static std::shared_ptr<ConsumerImpl> getConsumerImplPtr(Consumer consumer)
{
return std::static_pointer_cast<ConsumerImpl>(consumer.impl_);
}