This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 9f96eb9 Fix blue-green migration might be stuck due to an existing reconnection (#406) 9f96eb9 is described below commit 9f96eb98ff52cebc8b4360de5e6d44b42b89c6e9 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu Feb 29 10:56:20 2024 +0800 Fix blue-green migration might be stuck due to an existing reconnection (#406) Fixes https://github.com/apache/pulsar-client-cpp/issues/405 ### Motivation After triggering a blue-green migration, the socket will be disconnected and then schedule a reconnection to the blue cluster. However, the blue cluster could never respond with a response for Producer or Subscribe commands. Take producer as example, it means `connectionOpened` will not complete and `reconnectionPending_` will not become false. Then, after receiving a `CommandProducerClose` command from the blue cluster, a new reconnection will be scheduled to the green cluster but it will be skipped because `reconnectionPending_` is true, which means the previous `connectionOpened` future is not completed until the 30s timeout is reached. ``` 2024-02-26 06:09:30.251 INFO [139737465607744] HandlerBase:101 | [persistent://public/unload-test/topic-1708927732, sub, 0] Ignoring reconnection attempt since there's already a pending reconnection 2024-02-26 06:10:00.035 WARN [139737859880512] ProducerImpl:291 | [persistent://public/unload-test/topic-1708927732, cluster-a-0-0] Failed to reconnect producer: TimeOut ``` ### Modifications When receiving the `TOPIC_MIGRATED` command, cancel the pending `Producer` and `Subscribe` commands so that `connectionOpened` will fail with a retryable error. In the next time of reconnection, the green cluster will be connected. Fix the `ExtensibleLoadManagerTest` with a more strict timeout check. After this change, it will pass in about 3 seconds locally, while in CI even if it passed, it takes about 70 seconds before. Besides, fix the possible crash on macOS when closing the client, see https://github.com/apache/pulsar-client-cpp/issues/405#issuecomment-1963969215 --- lib/ClientConnection.cc | 12 +++++++ lib/ClientConnection.h | 2 ++ lib/ConsumerImpl.cc | 4 ++- lib/HandlerBase.h | 9 +++++ lib/ProducerImpl.cc | 3 +- lib/stats/ConsumerStatsBase.h | 1 + lib/stats/ConsumerStatsImpl.h | 4 +++ tests/PulsarFriend.h | 2 +- tests/extensibleLM/ExtensibleLoadManagerTest.cc | 45 ++++++++++++++----------- 9 files changed, 60 insertions(+), 22 deletions(-) diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 0beb739..abd38b4 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1800,6 +1800,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co if (it != producers_.end()) { auto producer = it->second.lock(); producer->setRedirectedClusterURI(migratedBrokerServiceUrl); + unsafeRemovePendingRequest(producer->firstRequestIdAfterConnect()); LOG_INFO("Producer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl); } else { LOG_WARN("Got invalid producer Id in topicMigrated command: " << resourceId); @@ -1809,6 +1810,7 @@ void ClientConnection::handleTopicMigrated(const proto::CommandTopicMigrated& co if (it != consumers_.end()) { auto consumer = it->second.lock(); consumer->setRedirectedClusterURI(migratedBrokerServiceUrl); + unsafeRemovePendingRequest(consumer->firstRequestIdAfterConnect()); LOG_INFO("Consumer id:" << resourceId << " is migrated to " << migratedBrokerServiceUrl); } else { LOG_WARN("Got invalid consumer Id in topicMigrated command: " << resourceId); @@ -2027,4 +2029,14 @@ void ClientConnection::handleAckResponse(const proto::CommandAckResponse& respon } } +void ClientConnection::unsafeRemovePendingRequest(long requestId) { + auto it = pendingRequests_.find(requestId); + if (it != pendingRequests_.end()) { + it->second.promise.setFailed(ResultDisconnected); + ASIO_ERROR ec; + it->second.timer->cancel(ec); + pendingRequests_.erase(it); + } +} + } // namespace pulsar diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index 3c83b4d..418cb2f 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -426,6 +426,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseProducer&); boost::optional<std::string> getAssignedBrokerServiceUrl(const proto::CommandCloseConsumer&); std::string getMigratedBrokerServiceUrl(const proto::CommandTopicMigrated&); + // This method must be called when `mutex_` is held + void unsafeRemovePendingRequest(long requestId); }; } // namespace pulsar diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 4181f05..fa33e28 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -251,7 +251,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c unAckedMessageTrackerPtr_->clear(); ClientImplPtr client = client_.lock(); - uint64_t requestId = client->newRequestId(); + long requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe( topic(), subscription_, consumerId_, requestId, getSubType(), getConsumerName(), subscriptionMode_, subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(), @@ -260,6 +260,7 @@ Future<Result, bool> ConsumerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = get_shared_this_ptr(); + setFirstRequestIdAfterConnect(requestId); cnx->sendRequestWithId(cmd, requestId) .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateConsumer(cnx, result); @@ -1719,6 +1720,7 @@ void ConsumerImpl::cancelTimers() noexcept { batchReceiveTimer_->cancel(ec); checkExpiredChunkedTimer_->cancel(ec); unAckedMessageTrackerPtr_->stop(); + consumerStatsBasePtr_->stop(); } void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCallBack cb) { diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index 415e234..b68dce3 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -100,6 +100,10 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> { const std::string& topic() const { return *topic_; } const std::shared_ptr<std::string>& getTopicPtr() const { return topic_; } + long firstRequestIdAfterConnect() const { + return firstRequestIdAfterConnect_.load(std::memory_order_acquire); + } + private: const std::shared_ptr<std::string> topic_; @@ -140,6 +144,10 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> { Result convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const; + void setFirstRequestIdAfterConnect(long requestId) { + firstRequestIdAfterConnect_.store(requestId, std::memory_order_release); + } + private: DeadlineTimerPtr timer_; DeadlineTimerPtr creationTimer_; @@ -148,6 +156,7 @@ class HandlerBase : public std::enable_shared_from_this<HandlerBase> { std::atomic<bool> reconnectionPending_; ClientConnectionWeakPtr connection_; std::string redirectedClusterURI_; + std::atomic<long> firstRequestIdAfterConnect_{-1L}; friend class ClientConnection; friend class PulsarFriend; diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index f84c255..4399ce5 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -149,7 +149,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c ClientImplPtr client = client_.lock(); cnx->registerProducer(producerId_, shared_from_this()); - int requestId = client->newRequestId(); + long requestId = client->newRequestId(); SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId, conf_.getProperties(), conf_.getSchema(), epoch_, @@ -159,6 +159,7 @@ Future<Result, bool> ProducerImpl::connectionOpened(const ClientConnectionPtr& c // Keep a reference to ensure object is kept alive. auto self = shared_from_this(); + setFirstRequestIdAfterConnect(requestId); cnx->sendRequestWithId(cmd, requestId) .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { Result handleResult = handleCreateProducer(cnx, result, responseData); diff --git a/lib/stats/ConsumerStatsBase.h b/lib/stats/ConsumerStatsBase.h index 6e2e71b..e8e17c7 100644 --- a/lib/stats/ConsumerStatsBase.h +++ b/lib/stats/ConsumerStatsBase.h @@ -28,6 +28,7 @@ namespace pulsar { class ConsumerStatsBase { public: virtual void start() {} + virtual void stop() {} virtual void receivedMessage(Message&, Result) = 0; virtual void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums = 1) = 0; virtual ~ConsumerStatsBase() {} diff --git a/lib/stats/ConsumerStatsImpl.h b/lib/stats/ConsumerStatsImpl.h index 03f3a47..3333ea8 100644 --- a/lib/stats/ConsumerStatsImpl.h +++ b/lib/stats/ConsumerStatsImpl.h @@ -59,6 +59,10 @@ class ConsumerStatsImpl : public std::enable_shared_from_this<ConsumerStatsImpl> ConsumerStatsImpl(const ConsumerStatsImpl& stats); void flushAndReset(const ASIO_ERROR&); void start() override; + void stop() override { + ASIO_ERROR error; + timer_->cancel(error); + } void receivedMessage(Message&, Result) override; void messageAcknowledged(Result, CommandAck_AckType, uint32_t ackNums) override; virtual ~ConsumerStatsImpl(); diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h index bfa11ef..e708405 100644 --- a/tests/PulsarFriend.h +++ b/tests/PulsarFriend.h @@ -169,7 +169,7 @@ class PulsarFriend { static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; } static std::string getConnectionPhysicalAddress(HandlerBase& handler) { - auto cnx = handler.connection_.lock(); + auto cnx = handler.getCnx().lock(); if (cnx) { return cnx->physicalAddress_; } diff --git a/tests/extensibleLM/ExtensibleLoadManagerTest.cc b/tests/extensibleLM/ExtensibleLoadManagerTest.cc index f4e2c81..c7e5aa0 100644 --- a/tests/extensibleLM/ExtensibleLoadManagerTest.cc +++ b/tests/extensibleLM/ExtensibleLoadManagerTest.cc @@ -25,6 +25,7 @@ #include "include/pulsar/Client.h" #include "lib/LogUtils.h" #include "lib/Semaphore.h" +#include "lib/TimeUtils.h" #include "tests/HttpHelper.h" #include "tests/PulsarFriend.h" @@ -40,6 +41,9 @@ bool checkTime() { } TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { + constexpr auto maxWaitTime = std::chrono::seconds(5); + constexpr long maxWaitTimeMs = maxWaitTime.count() * 1000L; + const static std::string blueAdminUrl = "http://localhost:8080/"; const static std::string greenAdminUrl = "http://localhost:8081/"; const static std::string topicNameSuffix = std::to_string(time(NULL)); @@ -105,12 +109,13 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { std::string content = std::to_string(i); const auto msg = MessageBuilder().setContent(content).build(); - ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { - Result sendResult = producer.send(msg); - return sendResult == ResultOk; - })); + auto start = TimeUtils::currentTimeMillis(); + Result sendResult = producer.send(msg); + auto elapsed = TimeUtils::currentTimeMillis() - start; + LOG_INFO("produce i: " << i << " " << elapsed << " ms"); + ASSERT_EQ(sendResult, ResultOk); + ASSERT_TRUE(elapsed < maxWaitTimeMs); - LOG_INFO("produced i:" << i); producedMsgs.emplace(i, i); i++; } @@ -124,18 +129,20 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { if (stopConsumer && producedMsgs.size() == msgCount && consumedMsgs.size() == msgCount) { break; } - Result receiveResult = - consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000 ms for each message - if (receiveResult != ResultOk) { - continue; - } + auto start = TimeUtils::currentTimeMillis(); + Result receiveResult = consumer.receive(receivedMsg, maxWaitTimeMs); + auto elapsed = TimeUtils::currentTimeMillis() - start; int i = std::stoi(receivedMsg.getDataAsString()); - LOG_INFO("received i:" << i); - ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { - Result ackResult = consumer.acknowledge(receivedMsg); - return ackResult == ResultOk; - })); - LOG_INFO("acked i:" << i); + LOG_INFO("receive i: " << i << " " << elapsed << " ms"); + ASSERT_EQ(receiveResult, ResultOk); + ASSERT_TRUE(elapsed < maxWaitTimeMs); + + start = TimeUtils::currentTimeMillis(); + Result ackResult = consumer.acknowledge(receivedMsg); + elapsed = TimeUtils::currentTimeMillis() - start; + LOG_INFO("acked i:" << i << " " << elapsed << " ms"); + ASSERT_TRUE(elapsed < maxWaitTimeMs); + ASSERT_EQ(ackResult, ResultOk); consumedMsgs.emplace(i, i); } LOG_INFO("consumer finished"); @@ -153,7 +160,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { std::string destinationBroker; while (checkTime()) { // make sure producers and consumers are ready - ASSERT_TRUE(waitUntil(std::chrono::seconds(30), + ASSERT_TRUE(waitUntil(maxWaitTime, [&] { return consumerImpl.isConnected() && producerImpl.isConnected(); })); std::string url = @@ -182,7 +189,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { } // make sure producers and consumers are ready - ASSERT_TRUE(waitUntil(std::chrono::seconds(30), + ASSERT_TRUE(waitUntil(maxWaitTime, [&] { return consumerImpl.isConnected() && producerImpl.isConnected(); })); std::string responseDataAfterUnload; ASSERT_TRUE(waitUntil(std::chrono::seconds(60), [&] { @@ -220,7 +227,7 @@ TEST(ExtensibleLoadManagerTest, testPubSubWhileUnloading) { LOG_INFO("res:" << res); return res == 200; })); - ASSERT_TRUE(waitUntil(std::chrono::seconds(130), [&] { + ASSERT_TRUE(waitUntil(maxWaitTime, [&] { auto &consumerImpl = PulsarFriend::getConsumerImpl(consumer); auto &producerImpl = PulsarFriend::getProducerImpl(producer); auto consumerConnAddress = PulsarFriend::getConnectionPhysicalAddress(consumerImpl);