This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2e2f90b Avoid double attempt at reconnecting (#310)
2e2f90b is described below
commit 2e2f90b182aa783152c049df9fbc9ff472ff3afc
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Sep 7 01:05:40 2023 -0700
Avoid double attempt at reconnecting (#310)
### Motivation
There is a sequence of conditions that can trigger a client to schedule
multiple reconnections to the broker. This is due to fact that we're not
checking whether such an attempt is already in progress.
example:
1. Receive `CloseConsumer` command from broker
1a. Schedule for reconnection in 100ms
2. Connection is closed (eg: broker shutdown has initiated)
2a. Schedule for reconnection in 200ms (since the backoff was already
incremented)
Result is that we're going to call `grabCnx()` twice
### Modifications
Use atomic flag to ignore the 2nd attempt, just waiting for the 1st attempt
to finish.
---
lib/HandlerBase.cc | 14 +++++++++++++-
lib/HandlerBase.h | 1 +
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 28b5317..df315d3 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -38,7 +38,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const
std::string& topic,
state_(NotStarted),
backoff_(backoff),
epoch_(0),
- timer_(executor_->createDeadlineTimer()) {}
+ timer_(executor_->createDeadlineTimer()),
+ reconnectionPending_(false) {}
HandlerBase::~HandlerBase() { timer_->cancel(); }
@@ -69,6 +70,13 @@ void HandlerBase::grabCnx() {
LOG_INFO(getName() << "Ignoring reconnection request since we're
already connected");
return;
}
+
+ bool expectedState = false;
+ if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
+ LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's
already a pending reconnection");
+ return;
+ }
+
LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
Future<Result, ClientConnectionWeakPtr> future =
client->getConnection(*topic_);
@@ -83,6 +91,9 @@ void HandlerBase::handleNewConnection(Result result,
ClientConnectionWeakPtr con
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
return;
}
+
+ handler->reconnectionPending_ = false;
+
if (result == ResultOk) {
ClientConnectionPtr conn = connection.lock();
if (conn) {
@@ -140,6 +151,7 @@ void HandlerBase::handleDisconnection(Result result,
ClientConnectionWeakPtr con
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
const auto state = handler->state_.load();
+
if (state == Pending || state == Ready) {
TimeDuration delay = handler->backoff_.next();
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index e6a2fc6..cf6e115 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -128,6 +128,7 @@ class HandlerBase {
DeadlineTimerPtr timer_;
mutable std::mutex connectionMutex_;
+ std::atomic<bool> reconnectionPending_;
ClientConnectionWeakPtr connection_;
friend class ClientConnection;
friend class PulsarFriend;