This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new f337eff  Fix ProducerBusy or ConsumerBusy error when configuring 
multiple brokers per connection (#337)
f337eff is described below

commit f337eff7caae93730ec1260810655cbb5a345e70
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 7 10:42:23 2023 +0800

    Fix ProducerBusy or ConsumerBusy error when configuring multiple brokers 
per connection (#337)
    
    ### Motivation
    
    This is a catch up for https://github.com/apache/pulsar/pull/21144
    
    When a producer or consumer reconnects, a random number will be generated 
as the key suffix in `ConnectionPool` to create or get the `ClientConnection` 
object from the pool.
    
    
https://github.com/apache/pulsar-client-cpp/blob/81cc562f7b366fad97e1b80c07ef9334a808390d/lib/ConnectionPool.cc#L75
    
    If a new connection is created with the same producer or consumer name to 
the broker, the broker will respond with a  `ProducerBusy` or `ConsumerBusy` 
error so that the reconnection will never succeed.
    
    ### Modifications
    
    - Add an overload of `ConnectionPool::getConnectionAsync` that accepts an 
integer parameter as the key suffix. If it's not specified, generate the random 
number as the suffix. In this method, choose the executor by `key suffix % 
size`.
    - Generate the random number and save it when creating the `HandlerBase` 
object. When connecting the owner broker of its topic, pass that index so that 
the reconnection will always reuse the same `ClientConnection` object.
    
    ### Verifying this change
    
    `ProducerTest.testReconnectMultiConnectionsPerBroker` is added to protected 
the change.
    
    (cherry picked from commit 6f115e76da42ffbcf39a5fbe1df298e6d6635f4d)
---
 lib/ClientImpl.cc      |  6 +++---
 lib/ClientImpl.h       |  4 +++-
 lib/ConnectionPool.cc  |  9 +++++----
 lib/ConnectionPool.h   | 14 +++++++++++++-
 lib/ExecutorService.cc |  4 ++--
 lib/ExecutorService.h  |  6 ++++--
 lib/HandlerBase.cc     |  4 +++-
 lib/HandlerBase.h      |  1 +
 lib/ProducerImpl.cc    |  2 +-
 tests/ProducerTest.cc  | 15 +++++++++++++++
 tests/PulsarFriend.h   |  8 ++++++++
 11 files changed, 58 insertions(+), 15 deletions(-)

diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index cb11b8e..1032b79 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -516,7 +516,7 @@ void ClientImpl::handleConsumerCreated(Result result, 
ConsumerImplBaseWeakPtr co
     }
 }
 
-Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const 
std::string& topic) {
+Future<Result, ClientConnectionPtr> ClientImpl::getConnection(const 
std::string& topic, size_t key) {
     Promise<Result, ClientConnectionPtr> promise;
 
     const auto topicNamePtr = TopicName::get(topic);
@@ -528,12 +528,12 @@ Future<Result, ClientConnectionPtr> 
ClientImpl::getConnection(const std::string&
 
     auto self = shared_from_this();
     lookupServicePtr_->getBroker(*topicNamePtr)
-        .addListener([this, self, promise](Result result, const 
LookupService::LookupResult& data) {
+        .addListener([this, self, promise, key](Result result, const 
LookupService::LookupResult& data) {
             if (result != ResultOk) {
                 promise.setFailed(result);
                 return;
             }
-            pool_.getConnectionAsync(data.logicalAddress, data.physicalAddress)
+            pool_.getConnectionAsync(data.logicalAddress, 
data.physicalAddress, key)
                 .addListener([promise](Result result, const 
ClientConnectionWeakPtr& weakCnx) {
                     if (result == ResultOk) {
                         auto cnx = weakCnx.lock();
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index 0c6eeb4..f7b5a89 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -95,7 +95,7 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
 
     void getPartitionsForTopicAsync(const std::string& topic, 
GetPartitionsCallback callback);
 
-    Future<Result, ClientConnectionPtr> getConnection(const std::string& 
topic);
+    Future<Result, ClientConnectionPtr> getConnection(const std::string& 
topic, size_t key);
 
     void closeAsync(CloseCallback callback);
     void shutdown();
@@ -123,6 +123,8 @@ class ClientImpl : public 
std::enable_shared_from_this<ClientImpl> {
 
     std::shared_ptr<std::atomic<uint64_t>> getRequestIdGenerator() const { 
return requestIdGenerator_; }
 
+    ConnectionPool& getConnectionPool() noexcept { return pool_; }
+
     friend class PulsarFriend;
 
    private:
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index 3232512..01285e9 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -61,8 +61,9 @@ bool ConnectionPool::close() {
     return true;
 }
 
-Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
-    const std::string& logicalAddress, const std::string& physicalAddress) {
+Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
+                                                                           
const std::string& physicalAddress,
+                                                                           
size_t keySuffix) {
     if (closed_) {
         Promise<Result, ClientConnectionWeakPtr> promise;
         promise.setFailed(ResultAlreadyClosed);
@@ -72,7 +73,7 @@ Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(
     std::unique_lock<std::recursive_mutex> lock(mutex_);
 
     std::stringstream ss;
-    ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
+    ss << logicalAddress << '-' << keySuffix;
     const std::string key = ss.str();
 
     PoolMap::iterator cnxIt = pool_.find(key);
@@ -95,7 +96,7 @@ Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(
     // No valid or pending connection found in the pool, creating a new one
     ClientConnectionPtr cnx;
     try {
-        cnx.reset(new ClientConnection(logicalAddress, physicalAddress, 
executorProvider_->get(),
+        cnx.reset(new ClientConnection(logicalAddress, physicalAddress, 
executorProvider_->get(keySuffix),
                                        clientConfiguration_, authentication_, 
clientVersion_, *this));
     } catch (const std::runtime_error& e) {
         lock.unlock();
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index c582dc9..a51205b 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -65,17 +65,29 @@ class PULSAR_PUBLIC ConnectionPool {
      * a proxy layer. Essentially, the pool is using the logical address as a 
way to
      * decide whether to reuse a particular connection.
      *
+     * There could be many connections to the same broker, so this pool uses 
an integer key as the suffix of
+     * the key that represents the connection.
+     *
      * @param logicalAddress the address to use as the broker tag
      * @param physicalAddress the real address where the TCP connection should 
be made
+     * @param keySuffix the key suffix to choose which connection on the same 
broker
      * @return a future that will produce the ClientCnx object
      */
     Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& logicalAddress,
-                                                               const 
std::string& physicalAddress);
+                                                               const 
std::string& physicalAddress,
+                                                               size_t 
keySuffix);
+
+    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& logicalAddress,
+                                                               const 
std::string& physicalAddress) {
+        return getConnectionAsync(logicalAddress, physicalAddress, 
generateRandomIndex());
+    }
 
     Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const 
std::string& address) {
         return getConnectionAsync(address, address);
     }
 
+    size_t generateRandomIndex() { return randomDistribution_(randomEngine_); }
+
    private:
     ClientConfiguration clientConfiguration_;
     ExecutorServiceProviderPtr executorProvider_;
diff --git a/lib/ExecutorService.cc b/lib/ExecutorService.cc
index a0dff0b..53be8ea 100644
--- a/lib/ExecutorService.cc
+++ b/lib/ExecutorService.cc
@@ -133,10 +133,10 @@ void ExecutorService::postWork(std::function<void(void)> 
task) { io_service_.pos
 ExecutorServiceProvider::ExecutorServiceProvider(int nthreads)
     : executors_(nthreads), executorIdx_(0), mutex_() {}
 
-ExecutorServicePtr ExecutorServiceProvider::get() {
+ExecutorServicePtr ExecutorServiceProvider::get(size_t idx) {
+    idx %= executors_.size();
     Lock lock(mutex_);
 
-    int idx = executorIdx_++ % executors_.size();
     if (!executors_[idx]) {
         executors_[idx] = ExecutorService::create();
     }
diff --git a/lib/ExecutorService.h b/lib/ExecutorService.h
index 4717ccb..a373c0a 100644
--- a/lib/ExecutorService.h
+++ b/lib/ExecutorService.h
@@ -88,7 +88,9 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
    public:
     explicit ExecutorServiceProvider(int nthreads);
 
-    ExecutorServicePtr get();
+    ExecutorServicePtr get() { return get(executorIdx_++); }
+
+    ExecutorServicePtr get(size_t index);
 
     // See TimeoutProcessor for the semantics of the parameter.
     void close(long timeoutMs = 3000);
@@ -96,7 +98,7 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
    private:
     typedef std::vector<ExecutorServicePtr> ExecutorList;
     ExecutorList executors_;
-    int executorIdx_;
+    std::atomic_size_t executorIdx_;
     std::mutex mutex_;
     typedef std::unique_lock<std::mutex> Lock;
 };
diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc
index 986063e..b55f751 100644
--- a/lib/HandlerBase.cc
+++ b/lib/HandlerBase.cc
@@ -32,6 +32,7 @@ namespace pulsar {
 HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& 
topic, const Backoff& backoff)
     : topic_(std::make_shared<std::string>(topic)),
       client_(client),
+      connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()),
       executor_(client->getIOExecutorProvider()->get()),
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
@@ -88,7 +89,8 @@ void HandlerBase::grabCnx() {
         return;
     }
     auto self = shared_from_this();
-    client->getConnection(topic()).addListener([this, self](Result result, 
const ClientConnectionPtr& cnx) {
+    auto cnxFuture = client->getConnection(topic(), connectionKeySuffix_);
+    cnxFuture.addListener([this, self](Result result, const 
ClientConnectionPtr& cnx) {
         if (result == ResultOk) {
             LOG_DEBUG(getName() << "Connected to broker: " << 
cnx->cnxString());
             connectionOpened(cnx).addListener([this, self](Result result, 
bool) {
diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h
index 937b308..f62c4df 100644
--- a/lib/HandlerBase.h
+++ b/lib/HandlerBase.h
@@ -99,6 +99,7 @@ class HandlerBase : public 
std::enable_shared_from_this<HandlerBase> {
 
    protected:
     ClientImplWeakPtr client_;
+    const size_t connectionKeySuffix_;
     ExecutorServicePtr executor_;
     mutable std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index a66fbfb..80ee735 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -966,7 +966,7 @@ bool ProducerImpl::encryptMessage(proto::MessageMetadata& 
metadata, SharedBuffer
 }
 
 void ProducerImpl::disconnectProducer() {
-    LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
+    LOG_INFO("Broker notification of Closed producer: " << producerId_);
     resetCnx();
     scheduleReconnection();
 }
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index eeda2b4..9685cc8 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -618,4 +618,19 @@ TEST(ProducerTest, 
testNoDeadlockWhenClosingPartitionedProducerAfterPartitionsUp
     client.close();
 }
 
+TEST(ProducerTest, testReconnectMultiConnectionsPerBroker) {
+    ClientConfiguration conf;
+    conf.setConnectionsPerBroker(10);
+
+    Client client(serviceUrl, conf);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer("producer-test-reconnect-twice", 
producer));
+
+    for (int i = 0; i < 5; i++) {
+        ASSERT_TRUE(PulsarFriend::reconnect(producer)) << "i: " << i;
+    }
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index c25b140..de82ce4 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -21,6 +21,7 @@
 
 #include <string>
 
+#include "WaitUtils.h"
 #include "lib/ClientConnection.h"
 #include "lib/ClientImpl.h"
 #include "lib/ConsumerConfigurationImpl.h"
@@ -197,6 +198,13 @@ class PulsarFriend {
         lookupData->setPartitions(newPartitions);
         partitionedProducer.handleGetPartitions(ResultOk, lookupData);
     }
+
+    static bool reconnect(Producer producer) {
+        auto producerImpl = 
std::dynamic_pointer_cast<ProducerImpl>(producer.impl_);
+        producerImpl->disconnectProducer();
+        return waitUntil(std::chrono::seconds(3),
+                         [producerImpl] { return 
!producerImpl->getCnx().expired(); });
+    }
 };
 }  // namespace pulsar
 

Reply via email to