This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 5940cb5  Fix consumer might not subscribe after a reconnection (#438)
5940cb5 is described below

commit 5940cb518bdf3a13ae7f859fab10d6940eec51b7
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 13 22:30:48 2024 +0800

    Fix consumer might not subscribe after a reconnection (#438)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/436
    
    ### Motivation
    
    When a consumer starts grabbing the connection, it registers a timer after 
the operation timeout. When that
    timer is expired, it will fail the connection and cancel the connection 
timer. However, it results a race
    condition that:
      1. The consumer's connection is closed (e.g. the keep alive timer failed)
      2. The connection timer is registered on the executor and will trigger 
the reconnection after 100ms
      3. The connection timer is cancelled, then the reconnection won't start.
    
    ### Modifications
    
    Cancel the `creationTimer_` once `HandlerBase#start` succeeded first
    time. Add `testReconnectWhenFirstConnectTimedOut` to cover this case.
---
 lib/ClientConnection.h |  1 +
 lib/HandlerBase.cc     | 23 +++++++++++++++++------
 lib/HandlerBase.h      |  2 ++
 tests/ConsumerTest.cc  | 32 ++++++++++++++++++++++++++++++++
 tests/ConsumerTest.h   | 23 +++++++++++++++++++++++
 5 files changed, 75 insertions(+), 6 deletions(-)

diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 418cb2f..638edb4 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -404,6 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     const size_t poolIndex_;
 
     friend class PulsarFriend;
+    friend class ConsumerTest;
 
     void checkServerError(ServerError error, const std::string& message);
 
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 46b918f..65aa0db 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -18,6 +18,9 @@
  */
 #include "HandlerBase.h"
 
+#include <chrono>
+
+#include "AsioDefines.h"
 #include "Backoff.h"
 #include "ClientConnection.h"
 #include "ClientImpl.h"
@@ -63,6 +66,7 @@ void HandlerBase::start() {
     creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
         auto self = weakSelf.lock();
         if (self && !error) {
+            LOG_WARN("Cancel the pending reconnection due to the start 
timeout");
             connectionFailed(ResultTimeout);
             ASIO_ERROR ignored;
             timer_->cancel(ignored);
@@ -118,13 +122,21 @@ void HandlerBase::grabCnx(const 
boost::optional<std::string>& assignedBrokerUrl)
     }
     auto self = shared_from_this();
     auto cnxFuture = getConnection(client, assignedBrokerUrl);
-    cnxFuture.addListener([this, self](Result result, const 
ClientConnectionPtr& cnx) {
+    using namespace std::chrono;
+    auto before = high_resolution_clock::now();
+    cnxFuture.addListener([this, self, before](Result result, const 
ClientConnectionPtr& cnx) {
         if (result == ResultOk) {
-            LOG_DEBUG(getName() << "Connected to broker: " << 
cnx->cnxString());
-            connectionOpened(cnx).addListener([this, self](Result result, 
bool) {
+            connectionOpened(cnx).addListener([this, self, before](Result 
result, bool) {
                 // Do not use bool, only Result.
                 reconnectionPending_ = false;
-                if (result != ResultOk && isResultRetryable(result)) {
+                if (result == ResultOk) {
+                    connectionTimeMs_ =
+                        
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
+                    // Prevent the creationTimer_ from cancelling the timer_ 
in future
+                    ASIO_ERROR ignored;
+                    creationTimer_->cancel(ignored);
+                    LOG_INFO("Finished connecting to broker after " << 
connectionTimeMs_ << " ms")
+                } else if (isResultRetryable(result)) {
                     scheduleReconnection();
                 }
             });
@@ -194,8 +206,7 @@ void HandlerBase::scheduleReconnection(const 
boost::optional<std::string>& assig
 
 void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const 
boost::optional<std::string>& assignedBrokerUrl) {
     if (ec) {
-        LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec 
<< "]");
-        return;
+        LOG_INFO(getName() << "Ignoring timer cancelled event, code[" << ec << 
"]");
     } else {
         epoch_++;
         grabCnx(assignedBrokerUrl);
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index b68dce3..df7dc24 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -157,9 +157,11 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     ClientConnectionWeakPtr connection_;
     std::string redirectedClusterURI_;
     std::atomic<long> firstRequestIdAfterConnect_{-1L};
+    std::atomic<long> connectionTimeMs_{0};  // only for tests
 
     friend class ClientConnection;
     friend class PulsarFriend;
+    friend class ConsumerTest;
 };
 }  // namespace pulsar
 #endif  //_PULSAR_HANDLER_BASE_HEADER_
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f9840f9..105e5f6 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -16,6 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include "ConsumerTest.h"
+
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
@@ -26,6 +28,7 @@
 #include <map>
 #include <mutex>
 #include <set>
+#include <string>
 #include <thread>
 #include <vector>
 
@@ -1467,4 +1470,33 @@ TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
     client.close();
 }
 
+// When a consumer starts grabbing the connection, it registers a timer after 
the operation timeout. When that
+// timer is expired, it will fail the connection and cancel the connection 
timer. However, it results a race
+// condition that:
+//   1. The consumer's connection is closed (e.g. the keep alive timer failed)
+//   2. The connection timer is registered on the executor and will trigger 
the reconnection after 100ms
+//   3. The connection timer is cancelled, then the reconnection won't start.
+TEST(ConsumerTest, testReconnectWhenFirstConnectTimedOut) {
+    ClientConfiguration conf;
+    conf.setOperationTimeoutSeconds(1);
+    Client client{lookupUrl, conf};
+
+    auto topic = "consumer-test-reconnect-when-first-connect-timed-out" + 
std::to_string(time(nullptr));
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    auto timer = ConsumerTest::scheduleCloseConnection(consumer, 
std::chrono::seconds(1));
+    ASSERT_TRUE(timer != nullptr);
+    timer->wait();
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("msg").build()));
+
+    Message msg;
+    ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+    ASSERT_EQ("msg", msg.getDataAsString());
+    client.close();
+}
+
 }  // namespace pulsar
diff --git a/tests/ConsumerTest.h b/tests/ConsumerTest.h
index ca84aa7..8248287 100644
--- a/tests/ConsumerTest.h
+++ b/tests/ConsumerTest.h
@@ -16,9 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <chrono>
+#include <memory>
+#include <stdexcept>
 #include <string>
 
+#include "lib/ClientConnection.h"
 #include "lib/ConsumerImpl.h"
+#include "lib/ExecutorService.h"
 
 using std::string;
 
@@ -28,5 +33,23 @@ class ConsumerTest {
     static int getNumOfMessagesInQueue(const Consumer& consumer) {
         return consumer.impl_->getNumOfPrefetchedMessages();
     }
+
+    template <typename T>
+    static DeadlineTimerPtr scheduleCloseConnection(const Consumer& consumer, 
T delaySinceStartGrabCnx) {
+        auto impl = std::dynamic_pointer_cast<ConsumerImpl>(consumer.impl_);
+        if (!impl) {
+            throw std::runtime_error("scheduleCloseConnection can only be 
called on ConsumerImpl");
+        }
+
+        auto cnx = impl->getCnx().lock();
+        if (!cnx) {
+            return nullptr;
+        }
+        auto timer = cnx->executor_->createDeadlineTimer();
+        timer->expires_from_now(delaySinceStartGrabCnx -
+                                
std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
+        timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
+        return timer;
+    }
 };
 }  // namespace pulsar

Reply via email to