This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e2f90b  Avoid double attempt at reconnecting (#310)
2e2f90b is described below

commit 2e2f90b182aa783152c049df9fbc9ff472ff3afc
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Sep 7 01:05:40 2023 -0700

    Avoid double attempt at reconnecting (#310)
    
    ### Motivation
    
    There is a sequence of conditions that can trigger a client to schedule 
multiple reconnections to the broker. This is due to fact that we're not 
checking whether such an attempt is already in progress.
    
    example:
     1. Receive `CloseConsumer` command from broker
        1a.  Schedule for reconnection in 100ms
     2. Connection is closed (eg: broker shutdown has initiated)
       2a. Schedule for reconnection in 200ms (since the backoff was already 
incremented)
    
    Result is that we're going to call `grabCnx()` twice
    
    ### Modifications
    
    Use atomic flag to ignore the 2nd attempt, just waiting for the 1st attempt 
to finish.
---
 lib/HandlerBase.cc | 14 +++++++++++++-
 lib/HandlerBase.h  |  1 +
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 28b5317..df315d3 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -38,7 +38,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const 
std::string& topic,
       state_(NotStarted),
       backoff_(backoff),
       epoch_(0),
-      timer_(executor_->createDeadlineTimer()) {}
+      timer_(executor_->createDeadlineTimer()),
+      reconnectionPending_(false) {}
 
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
@@ -69,6 +70,13 @@ void HandlerBase::grabCnx() {
         LOG_INFO(getName() << "Ignoring reconnection request since we're 
already connected");
         return;
     }
+
+    bool expectedState = false;
+    if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
+        LOG_DEBUG(getName() << "Ignoring reconnection attempt since there's 
already a pending reconnection");
+        return;
+    }
+
     LOG_INFO(getName() << "Getting connection from pool");
     ClientImplPtr client = client_.lock();
     Future<Result, ClientConnectionWeakPtr> future = 
client->getConnection(*topic_);
@@ -83,6 +91,9 @@ void HandlerBase::handleNewConnection(Result result, 
ClientConnectionWeakPtr con
         LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
         return;
     }
+
+    handler->reconnectionPending_ = false;
+
     if (result == ResultOk) {
         ClientConnectionPtr conn = connection.lock();
         if (conn) {
@@ -140,6 +151,7 @@ void HandlerBase::handleDisconnection(Result result, 
ClientConnectionWeakPtr con
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
     const auto state = handler->state_.load();
+
     if (state == Pending || state == Ready) {
         TimeDuration delay = handler->backoff_.next();
 
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index e6a2fc6..cf6e115 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -128,6 +128,7 @@ class HandlerBase {
     DeadlineTimerPtr timer_;
 
     mutable std::mutex connectionMutex_;
+    std::atomic<bool> reconnectionPending_;
     ClientConnectionWeakPtr connection_;
     friend class ClientConnection;
     friend class PulsarFriend;

Reply via email to