This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 5940cb5 Fix consumer might not subscribe after a reconnection (#438)
5940cb5 is described below
commit 5940cb518bdf3a13ae7f859fab10d6940eec51b7
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Aug 13 22:30:48 2024 +0800
Fix consumer might not subscribe after a reconnection (#438)
Fixes https://github.com/apache/pulsar-client-cpp/issues/436
### Motivation
When a consumer starts grabbing the connection, it registers a timer after
the operation timeout. When that
timer is expired, it will fail the connection and cancel the connection
timer. However, it results a race
condition that:
1. The consumer's connection is closed (e.g. the keep alive timer failed)
2. The connection timer is registered on the executor and will trigger
the reconnection after 100ms
3. The connection timer is cancelled, then the reconnection won't start.
### Modifications
Cancel the `creationTimer_` once `HandlerBase#start` succeeded first
time. Add `testReconnectWhenFirstConnectTimedOut` to cover this case.
---
lib/ClientConnection.h | 1 +
lib/HandlerBase.cc | 23 +++++++++++++++++------
lib/HandlerBase.h | 2 ++
tests/ConsumerTest.cc | 32 ++++++++++++++++++++++++++++++++
tests/ConsumerTest.h | 23 +++++++++++++++++++++++
5 files changed, 75 insertions(+), 6 deletions(-)
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 418cb2f..638edb4 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -404,6 +404,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
const size_t poolIndex_;
friend class PulsarFriend;
+ friend class ConsumerTest;
void checkServerError(ServerError error, const std::string& message);
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 46b918f..65aa0db 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -18,6 +18,9 @@
*/
#include "HandlerBase.h"
+#include <chrono>
+
+#include "AsioDefines.h"
#include "Backoff.h"
#include "ClientConnection.h"
#include "ClientImpl.h"
@@ -63,6 +66,7 @@ void HandlerBase::start() {
creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
auto self = weakSelf.lock();
if (self && !error) {
+ LOG_WARN("Cancel the pending reconnection due to the start
timeout");
connectionFailed(ResultTimeout);
ASIO_ERROR ignored;
timer_->cancel(ignored);
@@ -118,13 +122,21 @@ void HandlerBase::grabCnx(const
boost::optional<std::string>& assignedBrokerUrl)
}
auto self = shared_from_this();
auto cnxFuture = getConnection(client, assignedBrokerUrl);
- cnxFuture.addListener([this, self](Result result, const
ClientConnectionPtr& cnx) {
+ using namespace std::chrono;
+ auto before = high_resolution_clock::now();
+ cnxFuture.addListener([this, self, before](Result result, const
ClientConnectionPtr& cnx) {
if (result == ResultOk) {
- LOG_DEBUG(getName() << "Connected to broker: " <<
cnx->cnxString());
- connectionOpened(cnx).addListener([this, self](Result result,
bool) {
+ connectionOpened(cnx).addListener([this, self, before](Result
result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
- if (result != ResultOk && isResultRetryable(result)) {
+ if (result == ResultOk) {
+ connectionTimeMs_ =
+
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
+ // Prevent the creationTimer_ from cancelling the timer_
in future
+ ASIO_ERROR ignored;
+ creationTimer_->cancel(ignored);
+ LOG_INFO("Finished connecting to broker after " <<
connectionTimeMs_ << " ms")
+ } else if (isResultRetryable(result)) {
scheduleReconnection();
}
});
@@ -194,8 +206,7 @@ void HandlerBase::scheduleReconnection(const
boost::optional<std::string>& assig
void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const
boost::optional<std::string>& assignedBrokerUrl) {
if (ec) {
- LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec
<< "]");
- return;
+ LOG_INFO(getName() << "Ignoring timer cancelled event, code[" << ec <<
"]");
} else {
epoch_++;
grabCnx(assignedBrokerUrl);
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index b68dce3..df7dc24 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -157,9 +157,11 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
ClientConnectionWeakPtr connection_;
std::string redirectedClusterURI_;
std::atomic<long> firstRequestIdAfterConnect_{-1L};
+ std::atomic<long> connectionTimeMs_{0}; // only for tests
friend class ClientConnection;
friend class PulsarFriend;
+ friend class ConsumerTest;
};
} // namespace pulsar
#endif //_PULSAR_HANDLER_BASE_HEADER_
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f9840f9..105e5f6 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include "ConsumerTest.h"
+
#include <gtest/gtest.h>
#include <pulsar/Client.h>
@@ -26,6 +28,7 @@
#include <map>
#include <mutex>
#include <set>
+#include <string>
#include <thread>
#include <vector>
@@ -1467,4 +1470,33 @@ TEST(ConsumerTest, testMultiConsumerListenerAndAck) {
client.close();
}
+// When a consumer starts grabbing the connection, it registers a timer after
the operation timeout. When that
+// timer is expired, it will fail the connection and cancel the connection
timer. However, it results a race
+// condition that:
+// 1. The consumer's connection is closed (e.g. the keep alive timer failed)
+// 2. The connection timer is registered on the executor and will trigger
the reconnection after 100ms
+// 3. The connection timer is cancelled, then the reconnection won't start.
+TEST(ConsumerTest, testReconnectWhenFirstConnectTimedOut) {
+ ClientConfiguration conf;
+ conf.setOperationTimeoutSeconds(1);
+ Client client{lookupUrl, conf};
+
+ auto topic = "consumer-test-reconnect-when-first-connect-timed-out" +
std::to_string(time(nullptr));
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+ auto timer = ConsumerTest::scheduleCloseConnection(consumer,
std::chrono::seconds(1));
+ ASSERT_TRUE(timer != nullptr);
+ timer->wait();
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg").build()));
+
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+ ASSERT_EQ("msg", msg.getDataAsString());
+ client.close();
+}
+
} // namespace pulsar
diff --git a/tests/ConsumerTest.h b/tests/ConsumerTest.h
index ca84aa7..8248287 100644
--- a/tests/ConsumerTest.h
+++ b/tests/ConsumerTest.h
@@ -16,9 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include <chrono>
+#include <memory>
+#include <stdexcept>
#include <string>
+#include "lib/ClientConnection.h"
#include "lib/ConsumerImpl.h"
+#include "lib/ExecutorService.h"
using std::string;
@@ -28,5 +33,23 @@ class ConsumerTest {
static int getNumOfMessagesInQueue(const Consumer& consumer) {
return consumer.impl_->getNumOfPrefetchedMessages();
}
+
+ template <typename T>
+ static DeadlineTimerPtr scheduleCloseConnection(const Consumer& consumer,
T delaySinceStartGrabCnx) {
+ auto impl = std::dynamic_pointer_cast<ConsumerImpl>(consumer.impl_);
+ if (!impl) {
+ throw std::runtime_error("scheduleCloseConnection can only be
called on ConsumerImpl");
+ }
+
+ auto cnx = impl->getCnx().lock();
+ if (!cnx) {
+ return nullptr;
+ }
+ auto timer = cnx->executor_->createDeadlineTimer();
+ timer->expires_from_now(delaySinceStartGrabCnx -
+
std::chrono::milliseconds(impl->connectionTimeMs_ + 50));
+ timer->async_wait([cnx](const ASIO_ERROR&) { cnx->close(); });
+ return timer;
+ }
};
} // namespace pulsar