This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f96eb9  Fix blue-green migration might be stuck due to an existing 
reconnection (#406)
9f96eb9 is described below

commit 9f96eb98ff52cebc8b4360de5e6d44b42b89c6e9
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Thu Feb 29 10:56:20 2024 +0800

    Fix blue-green migration might be stuck due to an existing reconnection 
(#406)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/405
    
    ### Motivation
    
    After triggering a blue-green migration, the socket will be disconnected
    and then schedule a reconnection to the blue cluster. However, the blue
    cluster could never respond with a response for Producer or Subscribe
    commands. Take producer as example, it means `connectionOpened` will not
    complete and `reconnectionPending_` will not become false.
    
    Then, after receiving a `CommandProducerClose` command from the blue
    cluster, a new reconnection will be scheduled to the green cluster but
    it will be skipped because `reconnectionPending_` is true, which means
    the previous `connectionOpened` future is not completed until the 30s
    timeout is reached.
    
    ```
    2024-02-26 06:09:30.251 INFO  [139737465607744] HandlerBase:101 | 
[persistent://public/unload-test/topic-1708927732, sub, 0] Ignoring 
reconnection attempt since there's already a pending reconnection
    2024-02-26 06:10:00.035 WARN  [139737859880512] ProducerImpl:291 | 
[persistent://public/unload-test/topic-1708927732, cluster-a-0-0] Failed to 
reconnect producer: TimeOut
    ```
    
    ### Modifications
    
    When receiving the `TOPIC_MIGRATED` command, cancel the pending
    `Producer` and `Subscribe` commands so that `connectionOpened` will fail
    with a retryable error. In the next time of reconnection, the green
    cluster will be connected.
    
    Fix the `ExtensibleLoadManagerTest` with a more strict timeout check.
    After this change, it will pass in about 3 seconds locally, while in CI
    even if it passed, it takes about 70 seconds before.
    
    Besides, fix the possible crash on macOS when closing the client, see
    
https://github.com/apache/pulsar-client-cpp/issues/405#issuecomment-1963969215
---
 lib/ClientConnection.cc                         | 12 +++++++
 lib/ClientConnection.h                          |  2 ++
 lib/ConsumerImpl.cc                             |  4 ++-
 lib/HandlerBase.h                               |  9 +++++
 lib/ProducerImpl.cc                             |  3 +-
 lib/stats/ConsumerStatsBase.h                   |  1 +
 lib/stats/ConsumerStatsImpl.h                   |  4 +++
 tests/PulsarFriend.h                            |  2 +-
 tests/extensibleLM/ExtensibleLoadManagerTest.cc | 45 ++++++++++++++-----------
 9 files changed, 60 insertions(+), 22 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 0beb739..abd38b4 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1800,6 +1800,7 @@ void ClientConnection::handleTopicMigrated(const 
proto::CommandTopicMigrated& co
         if (it != producers_.end()) {
             auto producer = it->second.lock();
             producer->setRedirectedClusterURI(migratedBrokerServiceUrl);
+            unsafeRemovePendingRequest(producer->firstRequestIdAfterConnect());
             LOG_INFO("Producer id:" << resourceId << " is migrated to " << 
migratedBrokerServiceUrl);
         } else {
             LOG_WARN("Got invalid producer Id in topicMigrated command: " << 
resourceId);
@@ -1809,6 +1810,7 @@ void ClientConnection::handleTopicMigrated(const 
proto::CommandTopicMigrated& co
         if (it != consumers_.end()) {
             auto consumer = it->second.lock();
             consumer->setRedirectedClusterURI(migratedBrokerServiceUrl);
+            unsafeRemovePendingRequest(consumer->firstRequestIdAfterConnect());
             LOG_INFO("Consumer id:" << resourceId << " is migrated to " << 
migratedBrokerServiceUrl);
         } else {
             LOG_WARN("Got invalid consumer Id in topicMigrated command: " << 
resourceId);
@@ -2027,4 +2029,14 @@ void ClientConnection::handleAckResponse(const 
proto::CommandAckResponse& respon
     }
 }
 
+void ClientConnection::unsafeRemovePendingRequest(long requestId) {
+    auto it = pendingRequests_.find(requestId);
+    if (it != pendingRequests_.end()) {
+        it->second.promise.setFailed(ResultDisconnected);
+        ASIO_ERROR ec;
+        it->second.timer->cancel(ec);
+        pendingRequests_.erase(it);
+    }
+}
+
 }  // namespace pulsar
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 3c83b4d..418cb2f 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -426,6 +426,8 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     boost::optional<std::string> getAssignedBrokerServiceUrl(const 
proto::CommandCloseProducer&);
     boost::optional<std::string> getAssignedBrokerServiceUrl(const 
proto::CommandCloseConsumer&);
     std::string getMigratedBrokerServiceUrl(const 
proto::CommandTopicMigrated&);
+    // This method must be called when `mutex_` is held
+    void unsafeRemovePendingRequest(long requestId);
 };
 }  // namespace pulsar
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 4181f05..fa33e28 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -251,7 +251,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
     unAckedMessageTrackerPtr_->clear();
 
     ClientImplPtr client = client_.lock();
-    uint64_t requestId = client->newRequestId();
+    long requestId = client->newRequestId();
     SharedBuffer cmd = Commands::newSubscribe(
         topic(), subscription_, consumerId_, requestId, getSubType(), 
getConsumerName(), subscriptionMode_,
         subscribeMessageId, readCompacted_, config_.getProperties(), 
config_.getSubscriptionProperties(),
@@ -260,6 +260,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& c
 
     // Keep a reference to ensure object is kept alive.
     auto self = get_shared_this_ptr();
+    setFirstRequestIdAfterConnect(requestId);
     cnx->sendRequestWithId(cmd, requestId)
         .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
             Result handleResult = handleCreateConsumer(cnx, result);
@@ -1719,6 +1720,7 @@ void ConsumerImpl::cancelTimers() noexcept {
     batchReceiveTimer_->cancel(ec);
     checkExpiredChunkedTimer_->cancel(ec);
     unAckedMessageTrackerPtr_->stop();
+    consumerStatsBasePtr_->stop();
 }
 
 void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, 
ProcessDLQCallBack cb) {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 415e234..b68dce3 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -100,6 +100,10 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     const std::string& topic() const { return *topic_; }
     const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; }
 
+    long firstRequestIdAfterConnect() const {
+        return firstRequestIdAfterConnect_.load(std::memory_order_acquire);
+    }
+
    private:
     const std::shared_ptr<std::string> topic_;
 
@@ -140,6 +144,10 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
 
     Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) 
const;
 
+    void setFirstRequestIdAfterConnect(long requestId) {
+        firstRequestIdAfterConnect_.store(requestId, 
std::memory_order_release);
+    }
+
    private:
     DeadlineTimerPtr timer_;
     DeadlineTimerPtr creationTimer_;
@@ -148,6 +156,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
     std::atomic<bool> reconnectionPending_;
     ClientConnectionWeakPtr connection_;
     std::string redirectedClusterURI_;
+    std::atomic<long> firstRequestIdAfterConnect_{-1L};
 
     friend class ClientConnection;
     friend class PulsarFriend;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index f84c255..4399ce5 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -149,7 +149,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const 
ClientConnectionPtr& c
     ClientImplPtr client = client_.lock();
     cnx->registerProducer(producerId_, shared_from_this());
 
-    int requestId = client->newRequestId();
+    long requestId = client->newRequestId();
 
     SharedBuffer cmd = Commands::newProducer(topic(), producerId_, 
producerName_, requestId,
                                              conf_.getProperties(), 
conf_.getSchema(), epoch_,
@@ -159,6 +159,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const 
ClientConnectionPtr& c
 
     // Keep a reference to ensure object is kept alive.
     auto self = shared_from_this();
+    setFirstRequestIdAfterConnect(requestId);
     cnx->sendRequestWithId(cmd, requestId)
         .addListener([this, self, cnx, promise](Result result, const 
ResponseData& responseData) {
             Result handleResult = handleCreateProducer(cnx, result, 
responseData);
diff --git a/lib/stats/ConsumerStatsBase.h b/lib/stats/ConsumerStatsBase.h
index 6e2e71b..e8e17c7 100644
--- a/lib/stats/ConsumerStatsBase.h
+++ b/lib/stats/ConsumerStatsBase.h
@@ -28,6 +28,7 @@ namespace pulsar {
 class ConsumerStatsBase {
    public:
     virtual void start() {}
+    virtual void stop() {}
     virtual void receivedMessage(Message&, Result) = 0;
     virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t 
ackNums = 1) = 0;
     virtual ~ConsumerStatsBase() {}
diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h
index 03f3a47..3333ea8 100644
--- a/lib/stats/ConsumerStatsImpl.h
+++ b/lib/stats/ConsumerStatsImpl.h
@@ -59,6 +59,10 @@ class ConsumerStatsImpl : public 
std::enable_shared_from_this<ConsumerStatsImpl>
     ConsumerStatsImpl(const ConsumerStatsImpl& stats);
     void flushAndReset(const ASIO_ERROR&);
     void start() override;
+    void stop() override {
+        ASIO_ERROR error;
+        timer_->cancel(error);
+    }
     void receivedMessage(Message&, Result) override;
     void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) 
override;
     virtual ~ConsumerStatsImpl();
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index bfa11ef..e708405 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -169,7 +169,7 @@ class PulsarFriend {
     static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { 
return handler.connection_; }
 
     static std::string getConnectionPhysicalAddress(HandlerBase& handler) {
-        auto cnx = handler.connection_.lock();
+        auto cnx = handler.getCnx().lock();
         if (cnx) {
             return cnx->physicalAddress_;
         }
diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc 
b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
index f4e2c81..c7e5aa0 100644
--- a/tests/extensibleLM/ExtensibleLoadManagerTest.cc
+++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc
@@ -25,6 +25,7 @@
 #include "include/pulsar/Client.h"
 #include "lib/LogUtils.h"
 #include "lib/Semaphore.h"
+#include "lib/TimeUtils.h"
 #include "tests/HttpHelper.h"
 #include "tests/PulsarFriend.h"
 
@@ -40,6 +41,9 @@ bool checkTime() {
 }
 
 TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
+    constexpr auto maxWaitTime = std::chrono::seconds(5);
+    constexpr long maxWaitTimeMs = maxWaitTime.count() * 1000L;
+
     const static std::string blueAdminUrl = "http://localhost:8080/";;
     const static std::string greenAdminUrl = "http://localhost:8081/";;
     const static std::string topicNameSuffix = std::to_string(time(NULL));
@@ -105,12 +109,13 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) 
{
             std::string content = std::to_string(i);
             const auto msg = MessageBuilder().setContent(content).build();
 
-            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
-                Result sendResult = producer.send(msg);
-                return sendResult == ResultOk;
-            }));
+            auto start = TimeUtils::currentTimeMillis();
+            Result sendResult = producer.send(msg);
+            auto elapsed = TimeUtils::currentTimeMillis() - start;
+            LOG_INFO("produce i: " << i << " " << elapsed << " ms");
+            ASSERT_EQ(sendResult, ResultOk);
+            ASSERT_TRUE(elapsed < maxWaitTimeMs);
 
-            LOG_INFO("produced i:" << i);
             producedMsgs.emplace(i, i);
             i++;
         }
@@ -124,18 +129,20 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) 
{
             if (stopConsumer && producedMsgs.size() == msgCount && 
consumedMsgs.size() == msgCount) {
                 break;
             }
-            Result receiveResult =
-                consumer.receive(receivedMsg, 1000);  // Assumed that we wait 
1000 ms for each message
-            if (receiveResult != ResultOk) {
-                continue;
-            }
+            auto start = TimeUtils::currentTimeMillis();
+            Result receiveResult = consumer.receive(receivedMsg, 
maxWaitTimeMs);
+            auto elapsed = TimeUtils::currentTimeMillis() - start;
             int i = std::stoi(receivedMsg.getDataAsString());
-            LOG_INFO("received i:" << i);
-            ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
-                Result ackResult = consumer.acknowledge(receivedMsg);
-                return ackResult == ResultOk;
-            }));
-            LOG_INFO("acked i:" << i);
+            LOG_INFO("receive i: " << i << " " << elapsed << " ms");
+            ASSERT_EQ(receiveResult, ResultOk);
+            ASSERT_TRUE(elapsed < maxWaitTimeMs);
+
+            start = TimeUtils::currentTimeMillis();
+            Result ackResult = consumer.acknowledge(receivedMsg);
+            elapsed = TimeUtils::currentTimeMillis() - start;
+            LOG_INFO("acked i:" << i << " " << elapsed << " ms");
+            ASSERT_TRUE(elapsed < maxWaitTimeMs);
+            ASSERT_EQ(ackResult, ResultOk);
             consumedMsgs.emplace(i, i);
         }
         LOG_INFO("consumer finished");
@@ -153,7 +160,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
         std::string destinationBroker;
         while (checkTime()) {
             // make sure producers and consumers are ready
-            ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
+            ASSERT_TRUE(waitUntil(maxWaitTime,
                                   [&] { return consumerImpl.isConnected() && 
producerImpl.isConnected(); }));
 
             std::string url =
@@ -182,7 +189,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
             }
 
             // make sure producers and consumers are ready
-            ASSERT_TRUE(waitUntil(std::chrono::seconds(30),
+            ASSERT_TRUE(waitUntil(maxWaitTime,
                                   [&] { return consumerImpl.isConnected() && 
producerImpl.isConnected(); }));
             std::string responseDataAfterUnload;
             ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] {
@@ -220,7 +227,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) {
         LOG_INFO("res:" << res);
         return res == 200;
     }));
-    ASSERT_TRUE(waitUntil(std::chrono::seconds(130), [&] {
+    ASSERT_TRUE(waitUntil(maxWaitTime, [&] {
         auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer);
         auto &producerImpl = PulsarFriend::getProducerImpl(producer);
         auto consumerConnAddress = 
PulsarFriend::getConnectionPhysicalAddress(consumerImpl);

Reply via email to