This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new f337eff Fix ProducerBusy or ConsumerBusy error when configuring
multiple brokers per connection (#337)
f337eff is described below
commit f337eff7caae93730ec1260810655cbb5a345e70
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 7 10:42:23 2023 +0800
Fix ProducerBusy or ConsumerBusy error when configuring multiple brokers
per connection (#337)
### Motivation
This is a catch up for https://github.com/apache/pulsar/pull/21144
When a producer or consumer reconnects, a random number will be generated
as the key suffix in `ConnectionPool` to create or get the `ClientConnection`
object from the pool.
https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75
If a new connection is created with the same producer or consumer name to
the broker, the broker will respond with a `ProducerBusy` or `ConsumerBusy`
error so that the reconnection will never succeed.
### Modifications
- Add an overload of `ConnectionPool::getConnectionAsync` that accepts an
integer parameter as the key suffix. If it's not specified, generate the random
number as the suffix. In this method, choose the executor by `key suffix %
size`.
- Generate the random number and save it when creating the `HandlerBase`
object. When connecting the owner broker of its topic, pass that index so that
the reconnection will always reuse the same `ClientConnection` object.
### Verifying this change
`ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected
the change.
(cherry picked from commit 6f115e76da42ffbcf39a5fbe1df298e6d6635f4d)
---
lib/ClientImpl.cc | 6 +++---
lib/ClientImpl.h | 4 +++-
lib/ConnectionPool.cc | 9 +++++----
lib/ConnectionPool.h | 14 +++++++++++++-
lib/ExecutorService.cc | 4 ++--
lib/ExecutorService.h | 6 ++++--
lib/HandlerBase.cc | 4 +++-
lib/HandlerBase.h | 1 +
lib/ProducerImpl.cc | 2 +-
tests/ProducerTest.cc | 15 +++++++++++++++
tests/PulsarFriend.h | 8 ++++++++
11 files changed, 58 insertions(+), 15 deletions(-)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index cb11b8e..1032b79 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result,
ConsumerImplBaseWeakPtr co
}
}
-Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const
std::string& topic) {
+Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const
std::string& topic, size_t key) {
Promise<Result, ClientConnectionPtr> promise;
const auto topicNamePtr = TopicName::get(topic);
@@ -528,12 +528,12 @@ Future<Result, ClientConnectionPtr>
ClientImpl::getConnection(const std::string&
auto self = shared_from_this();
lookupServicePtr_->getBroker(*topicNamePtr)
- .addListener([this, self, promise](Result result, const
LookupService::LookupResult& data) {
+ .addListener([this, self, promise, key](Result result, const
LookupService::LookupResult& data) {
if (result != ResultOk) {
promise.setFailed(result);
return;
}
- pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
+ pool_.getConnectionAsync(data.logicalAddress,
data.physicalAddress, key)
.addListener([promise](Result result, const
ClientConnectionWeakPtr& weakCnx) {
if (result == ResultOk) {
auto cnx = weakCnx.lock();
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 0c6eeb4..f7b5a89 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -95,7 +95,7 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
void getPartitionsForTopicAsync(const std::string& topic,
GetPartitionsCallback callback);
- Future<Result, ClientConnectionPtr> getConnection(const std::string&
topic);
+ Future<Result, ClientConnectionPtr> getConnection(const std::string&
topic, size_t key);
void closeAsync(CloseCallback callback);
void shutdown();
@@ -123,6 +123,8 @@ class ClientImpl : public
std::enable_shared_from_this<ClientImpl> {
std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const {
return requestIdGenerator_; }
+ ConnectionPool& getConnectionPool() noexcept { return pool_; }
+
friend class PulsarFriend;
private:
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index 3232512..01285e9 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -61,8 +61,9 @@ bool ConnectionPool::close() {
return true;
}
-Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
- const std::string& logicalAddress, const std::string& physicalAddress) {
+Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
+
const std::string& physicalAddress,
+
size_t keySuffix) {
if (closed_) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultAlreadyClosed);
@@ -72,7 +73,7 @@ Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(
std::unique_lock<std::recursive_mutex> lock(mutex_);
std::stringstream ss;
- ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
+ ss << logicalAddress << '-' << keySuffix;
const std::string key = ss.str();
PoolMap::iterator cnxIt = pool_.find(key);
@@ -95,7 +96,7 @@ Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(
// No valid or pending connection found in the pool, creating a new one
ClientConnectionPtr cnx;
try {
- cnx.reset(new ClientConnection(logicalAddress, physicalAddress,
executorProvider_->get(),
+ cnx.reset(new ClientConnection(logicalAddress, physicalAddress,
executorProvider_->get(keySuffix),
clientConfiguration_, authentication_,
clientVersion_, *this));
} catch (const std::runtime_error& e) {
lock.unlock();
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index c582dc9..a51205b 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -65,17 +65,29 @@ class PULSAR_PUBLIC ConnectionPool {
* a proxy layer. Essentially, the pool is using the logical address as a
way to
* decide whether to reuse a particular connection.
*
+ * There could be many connections to the same broker, so this pool uses
an integer key as the suffix of
+ * the key that represents the connection.
+ *
* @param logicalAddress the address to use as the broker tag
* @param physicalAddress the real address where the TCP connection should
be made
+ * @param keySuffix the key suffix to choose which connection on the same
broker
* @return a future that will produce the ClientCnx object
*/
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& logicalAddress,
- const
std::string& physicalAddress);
+ const
std::string& physicalAddress,
+ size_t
keySuffix);
+
+ Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& logicalAddress,
+ const
std::string& physicalAddress) {
+ return getConnectionAsync(logicalAddress, physicalAddress,
generateRandomIndex());
+ }
Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const
std::string& address) {
return getConnectionAsync(address, address);
}
+ size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }
+
private:
ClientConfiguration clientConfiguration_;
ExecutorServiceProviderPtr executorProvider_;
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index a0dff0b..53be8ea 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function<void(void)>
task) { io_service_.pos
ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
: executors_(nthreads), executorIdx_(0), mutex_() {}
-ExecutorServicePtr ExecutorServiceProvider::get() {
+ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
+ idx %= executors_.size();
Lock lock(mutex_);
- int idx = executorIdx_++ % executors_.size();
if (!executors_[idx]) {
executors_[idx] = ExecutorService::create();
}
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index 4717ccb..a373c0a 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -88,7 +88,9 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
public:
explicit ExecutorServiceProvider(int nthreads);
- ExecutorServicePtr get();
+ ExecutorServicePtr get() { return get(executorIdx_++); }
+
+ ExecutorServicePtr get(size_t index);
// See TimeoutProcessor for the semantics of the parameter.
void close(long timeoutMs = 3000);
@@ -96,7 +98,7 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
private:
typedef std::vector<ExecutorServicePtr> ExecutorList;
ExecutorList executors_;
- int executorIdx_;
+ std::atomic_size_t executorIdx_;
std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;
};
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 986063e..b55f751 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -32,6 +32,7 @@ namespace pulsar {
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string&
topic, const Backoff& backoff)
: topic_(std::make_shared<std::string>(topic)),
client_(client),
+ connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()),
executor_(client->getIOExecutorProvider()->get()),
mutex_(),
creationTimestamp_(TimeUtils::now()),
@@ -88,7 +89,8 @@ void HandlerBase::grabCnx() {
return;
}
auto self = shared_from_this();
- client->getConnection(topic()).addListener([this, self](Result result,
const ClientConnectionPtr& cnx) {
+ auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
+ cnxFuture.addListener([this, self](Result result, const
ClientConnectionPtr& cnx) {
if (result == ResultOk) {
LOG_DEBUG(getName() << "Connected to broker: " <<
cnx->cnxString());
connectionOpened(cnx).addListener([this, self](Result result,
bool) {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 937b308..f62c4df 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -99,6 +99,7 @@ class HandlerBase : public
std::enable_shared_from_this<HandlerBase> {
protected:
ClientImplWeakPtr client_;
+ const size_t connectionKeySuffix_;
ExecutorServicePtr executor_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index a66fbfb..80ee735 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata&
metadata, SharedBuffer
}
void ProducerImpl::disconnectProducer() {
- LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
+ LOG_INFO("Broker notification of Closed producer: " << producerId_);
resetCnx();
scheduleReconnection();
}
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index eeda2b4..9685cc8 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -618,4 +618,19 @@ TEST(ProducerTest,
testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUp
client.close();
}
+TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) {
+ ClientConfiguration conf;
+ conf.setConnectionsPerBroker(10);
+
+ Client client(serviceUrl, conf);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer("producer-test-reconnect-twice",
producer));
+
+ for (int i = 0; i < 5; i++) {
+ ASSERT_TRUE(PulsarFriend::reconnect(producer)) << "i: " << i;
+ }
+
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index c25b140..de82ce4 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -21,6 +21,7 @@
#include <string>
+#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/ClientImpl.h"
#include "lib/ConsumerConfigurationImpl.h"
@@ -197,6 +198,13 @@ class PulsarFriend {
lookupData->setPartitions(newPartitions);
partitionedProducer.handleGetPartitions(ResultOk, lookupData);
}
+
+ static bool reconnect(Producer producer) {
+ auto producerImpl =
std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
+ producerImpl->disconnectProducer();
+ return waitUntil(std::chrono::seconds(3),
+ [producerImpl] { return
!producerImpl->getCnx().expired(); });
+ }
};
} // namespace pulsar