This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 81cc562  Added support for multiple connections to each broker (#336)
81cc562 is described below

commit 81cc562f7b366fad97e1b80c07ef9334a808390d
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 31 18:43:39 2023 -0700

    Added support for multiple connections to each broker (#336)
---
 include/pulsar/Client.h              |  2 --
 include/pulsar/ClientConfiguration.h | 15 +++++++++
 lib/Client.cc                        |  8 ++---
 lib/ClientConfiguration.cc           | 10 ++++++
 lib/ClientConfigurationImpl.h        |  1 +
 lib/ClientImpl.cc                    |  5 ++-
 lib/ClientImpl.h                     |  3 +-
 lib/ConnectionPool.cc                | 59 ++++++++++++++++++------------------
 lib/ConnectionPool.h                 |  8 +++--
 perf/PerfConsumer.cc                 | 19 ++++--------
 perf/PerfProducer.cc                 | 18 +++--------
 tests/LookupServiceTest.cc           | 10 +++---
 12 files changed, 82 insertions(+), 76 deletions(-)

diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 3514934..47e82f4 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -415,8 +415,6 @@ class PULSAR_PUBLIC Client {
                             std::function<void(Result, const SchemaInfo&)> 
callback);
 
    private:
-    Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration,
-           bool poolConnections);
     Client(const std::shared_ptr<ClientImpl>);
 
     friend class PulsarFriend;
diff --git a/include/pulsar/ClientConfiguration.h 
b/include/pulsar/ClientConfiguration.h
index 4074d00..3d651e9 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -46,6 +46,21 @@ class PULSAR_PUBLIC ClientConfiguration {
      */
     uint64_t getMemoryLimit() const;
 
+    /**
+     * Sets the max number of connection that the client library will open to 
a single broker.
+     * By default, the connection pool will use a single connection for all 
the producers and consumers.
+     * Increasing this parameter may improve throughput when using many 
producers over a high latency
+     * connection.
+     *
+     * @param connectionsPerBroker max number of connections per broker (needs 
to be greater than 0)
+     */
+    ClientConfiguration& setConnectionsPerBroker(int connectionsPerBroker);
+
+    /**
+     * @return the max number of connection that the client library will open 
to a single broker
+     */
+    int getConnectionsPerBroker() const;
+
     /**
      * Set the authentication method to be used with the broker
      *
diff --git a/lib/Client.cc b/lib/Client.cc
index 48c4a67..8e916c5 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -36,14 +36,10 @@ namespace pulsar {
 Client::Client(const std::shared_ptr<ClientImpl> impl) : impl_(impl) {}
 
 Client::Client(const std::string& serviceUrl)
-    : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration(), 
true)) {}
+    : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
 
 Client::Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration)
-    : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, 
true)) {}
-
-Client::Client(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration,
-               bool poolConnections)
-    : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration, 
poolConnections)) {}
+    : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
 
 Result Client::createProducer(const std::string& topic, Producer& producer) {
     return createProducer(topic, ProducerConfiguration(), producer);
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 95779e6..977a880 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -40,6 +40,16 @@ ClientConfiguration& 
ClientConfiguration::setMemoryLimit(uint64_t memoryLimitByt
 
 uint64_t ClientConfiguration::getMemoryLimit() const { return 
impl_->memoryLimit; }
 
+ClientConfiguration& ClientConfiguration::setConnectionsPerBroker(int 
connectionsPerBroker) {
+    if (connectionsPerBroker <= 0) {
+        throw std::invalid_argument("connectionsPerBroker should be greater 
than 0");
+    }
+    impl_->connectionsPerBroker = connectionsPerBroker;
+    return *this;
+}
+
+int ClientConfiguration::getConnectionsPerBroker() const { return 
impl_->connectionsPerBroker; }
+
 ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& 
authentication) {
     impl_->authenticationPtr = authentication;
     return *this;
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index 0f979bb..3458a05 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -27,6 +27,7 @@ struct ClientConfigurationImpl {
     AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
     uint64_t memoryLimit{0ull};
     int ioThreads{1};
+    int connectionsPerBroker{1};
     int operationTimeoutSeconds{30};
     int messageListenerThreads{1};
     int concurrentLookupRequest{50000};
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index dd19225..cb11b8e 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -75,8 +75,7 @@ typedef std::unique_lock<std::mutex> Lock;
 
 typedef std::vector<std::string> StringList;
 
-ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration& clientConfiguration,
-                       bool poolConnections)
+ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration& clientConfiguration)
     : mutex_(),
       state_(Open),
       serviceNameResolver_(serviceUrl),
@@ -87,7 +86,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const 
ClientConfiguration&
           
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
       partitionListenerExecutorProvider_(
           
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
-      pool_(clientConfiguration_, ioExecutorProvider_, 
clientConfiguration_.getAuthPtr(), poolConnections,
+      pool_(clientConfiguration_, ioExecutorProvider_, 
clientConfiguration_.getAuthPtr(),
             ClientImpl::getClientVersion(clientConfiguration)),
       producerIdGenerator_(0),
       consumerIdGenerator_(0),
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index c1a5167..0c6eeb4 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -68,8 +68,7 @@ std::string generateRandomName();
 
 class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
    public:
-    ClientImpl(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration,
-               bool poolConnections);
+    ClientImpl(const std::string& serviceUrl, const ClientConfiguration& 
clientConfiguration);
     ~ClientImpl();
 
     /**
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index a1b19c6..3232512 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -34,13 +34,13 @@ DECLARE_LOG_OBJECT()
 namespace pulsar {
 
 ConnectionPool::ConnectionPool(const ClientConfiguration& conf, 
ExecutorServiceProviderPtr executorProvider,
-                               const AuthenticationPtr& authentication, bool 
poolConnections,
-                               const std::string& clientVersion)
+                               const AuthenticationPtr& authentication, const 
std::string& clientVersion)
     : clientConfiguration_(conf),
       executorProvider_(executorProvider),
       authentication_(authentication),
-      poolConnections_(poolConnections),
-      clientVersion_(clientVersion) {}
+      clientVersion_(clientVersion),
+      randomDistribution_(0, conf.getConnectionsPerBroker() - 1),
+      
randomEngine_(std::chrono::high_resolution_clock::now().time_since_epoch().count())
 {}
 
 bool ConnectionPool::close() {
     bool expectedState = false;
@@ -49,16 +49,15 @@ bool ConnectionPool::close() {
     }
 
     std::unique_lock<std::recursive_mutex> lock(mutex_);
-    if (poolConnections_) {
-        for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
-            auto& cnx = cnxIt->second;
-            if (cnx) {
-                // The 2nd argument is false because removing a value during 
the iteration will cause segfault
-                cnx->close(ResultDisconnected, false);
-            }
+
+    for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
+        auto& cnx = cnxIt->second;
+        if (cnx) {
+            // The 2nd argument is false because removing a value during the 
iteration will cause segfault
+            cnx->close(ResultDisconnected, false);
         }
-        pool_.clear();
     }
+    pool_.clear();
     return true;
 }
 
@@ -72,22 +71,24 @@ Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(
 
     std::unique_lock<std::recursive_mutex> lock(mutex_);
 
-    if (poolConnections_) {
-        PoolMap::iterator cnxIt = pool_.find(logicalAddress);
-        if (cnxIt != pool_.end()) {
-            auto& cnx = cnxIt->second;
-
-            if (!cnx->isClosed()) {
-                // Found a valid or pending connection in the pool
-                LOG_DEBUG("Got connection from pool for " << logicalAddress << 
" use_count: "  //
-                                                          << (cnx.use_count()) 
<< " @ " << cnx.get());
-                return cnx->getConnectFuture();
-            } else {
-                // The closed connection should have been removed from the 
pool in ClientConnection::close
-                LOG_WARN("Deleting stale connection from pool for "
-                         << logicalAddress << " use_count: " << 
(cnx.use_count()) << " @ " << cnx.get());
-                pool_.erase(logicalAddress);
-            }
+    std::stringstream ss;
+    ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
+    const std::string key = ss.str();
+
+    PoolMap::iterator cnxIt = pool_.find(key);
+    if (cnxIt != pool_.end()) {
+        auto& cnx = cnxIt->second;
+
+        if (!cnx->isClosed()) {
+            // Found a valid or pending connection in the pool
+            LOG_DEBUG("Got connection from pool for " << key << " use_count: " 
 //
+                                                      << (cnx.use_count()) << 
" @ " << cnx.get());
+            return cnx->getConnectFuture();
+        } else {
+            // The closed connection should have been removed from the pool in 
ClientConnection::close
+            LOG_WARN("Deleting stale connection from pool for " << key << " 
use_count: " << (cnx.use_count())
+                                                                << " @ " << 
cnx.get());
+            pool_.erase(key);
         }
     }
 
@@ -107,7 +108,7 @@ Future<Result, ClientConnectionWeakPtr> 
ConnectionPool::getConnectionAsync(
     LOG_INFO("Created connection for " << logicalAddress);
 
     Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
-    pool_.insert(std::make_pair(logicalAddress, cnx));
+    pool_.insert(std::make_pair(key, cnx));
 
     lock.unlock();
 
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 5e7f6ee..c582dc9 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -27,6 +27,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <random>
 #include <string>
 
 #include "Future.h"
@@ -41,8 +42,7 @@ using ExecutorServiceProviderPtr = 
std::shared_ptr<ExecutorServiceProvider>;
 class PULSAR_PUBLIC ConnectionPool {
    public:
     ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr 
executorProvider,
-                   const AuthenticationPtr& authentication, bool 
poolConnections,
-                   const std::string& clientVersion);
+                   const AuthenticationPtr& authentication, const std::string& 
clientVersion);
 
     /**
      * Close the connection pool.
@@ -82,11 +82,13 @@ class PULSAR_PUBLIC ConnectionPool {
     AuthenticationPtr authentication_;
     typedef std::map<std::string, std::shared_ptr<ClientConnection>> PoolMap;
     PoolMap pool_;
-    bool poolConnections_;
     const std::string clientVersion_;
     mutable std::recursive_mutex mutex_;
     std::atomic_bool closed_{false};
 
+    std::uniform_int_distribution<> randomDistribution_;
+    std::mt19937 randomEngine_;
+
     friend class PulsarFriend;
 };
 }  // namespace pulsar
diff --git a/perf/PerfConsumer.cc b/perf/PerfConsumer.cc
index c4f1f00..424809b 100644
--- a/perf/PerfConsumer.cc
+++ b/perf/PerfConsumer.cc
@@ -67,20 +67,12 @@ struct Arguments {
     int receiverQueueSize;
     int ioThreads;
     int listenerThreads;
-    bool poolConnections;
+    int connectionsPerBroker;
+
     std::string encKeyName;
     std::string encKeyValueFile;
 };
 
-namespace pulsar {
-class PulsarFriend {
-   public:
-    static Client getClient(const std::string& url, const ClientConfiguration 
conf, bool poolConnections) {
-        return Client(url, conf, poolConnections);
-    }
-};
-}  // namespace pulsar
-
 #if __GNUC__ == 4 && __GNUC_MINOR__ == 4
 // Used for gcc-4.4.7 with boost-1.41
 #include <cstdatomic>
@@ -167,6 +159,7 @@ void startPerfConsumer(const Arguments& args) {
         std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
         conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
     }
+    conf.setConnectionsPerBroker(args.connectionsPerBroker);
     conf.setIOThreads(args.ioThreads);
     conf.setMessageListenerThreads(args.listenerThreads);
     if (!args.authPlugin.empty()) {
@@ -174,7 +167,7 @@ void startPerfConsumer(const Arguments& args) {
         conf.setAuth(auth);
     }
 
-    Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf, 
args.poolConnections));
+    Client client(args.serviceURL, conf);
 
     ConsumerConfiguration consumerConf;
     consumerConf.setMessageListener(messageListener);
@@ -299,8 +292,8 @@ int main(int argc, char** argv) {
         ("listener-threads,l", 
po::value<int>(&args.listenerThreads)->default_value(1),
          "Number of listener threads")  //
 
-        ("pool-connections", 
po::value<bool>(&args.poolConnections)->default_value(false),
-         "whether pool connections used")  //
+        ("connections-per-broker", 
po::value<int>(&args.connectionsPerBroker)->default_value(1),
+         "Number of connections per each broker")  //
 
         ("encryption-key-name,k", 
po::value<std::string>(&args.encKeyName)->default_value(""),
          "The private key name to decrypt payload")  //
diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc
index 04d5cbf..aeda8e8 100644
--- a/perf/PerfProducer.cc
+++ b/perf/PerfProducer.cc
@@ -64,21 +64,12 @@ struct Arguments {
     unsigned int batchingMaxMessages;
     long batchingMaxAllowedSizeInBytes;
     long batchingMaxPublishDelayMs;
-    bool poolConnections;
+    int connectionsPerBroker;
     std::string encKeyName;
     std::string encKeyValueFile;
     std::string compression;
 };
 
-namespace pulsar {
-class PulsarFriend {
-   public:
-    static Client getClient(const std::string& url, const ClientConfiguration 
conf, bool poolConnections) {
-        return Client(url, conf, poolConnections);
-    }
-};
-}  // namespace pulsar
-
 unsigned long messagesProduced;
 unsigned long bytesProduced;
 using namespace boost::accumulators;
@@ -283,8 +274,8 @@ int main(int argc, char** argv) {
          po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000),
          "Use only is batch-size > 1, Default is 3 seconds")  //
 
-        ("pool-connections", 
po::value<bool>(&args.poolConnections)->default_value(false),
-         "whether pool connections used")  //
+        ("connections-per-broker", 
po::value<int>(&args.connectionsPerBroker)->default_value(1),
+         "Number of connections per each broker")  //
 
         ("encryption-key-name,k", 
po::value<std::string>(&args.encKeyName)->default_value(""),
          "The public key name to encrypt payload")  //
@@ -371,6 +362,7 @@ int main(int argc, char** argv) {
     
producerConf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
 
     pulsar::ClientConfiguration conf;
+    conf.setConnectionsPerBroker(args.connectionsPerBroker);
     conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024);
     conf.setUseTls(args.isUseTls);
     conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
@@ -385,7 +377,7 @@ int main(int argc, char** argv) {
         conf.setAuth(auth);
     }
 
-    pulsar::Client client(pulsar::PulsarFriend::getClient(args.serviceURL, 
conf, args.poolConnections));
+    pulsar::Client client(args.serviceURL, conf);
 
     std::atomic<bool> exitCondition(false);
     startPerfProducer(args, producerConf, client, exitCondition);
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index bc6ea2b..3457bd4 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -76,7 +76,7 @@ TEST(LookupServiceTest, basicLookup) {
     std::string url = "pulsar://localhost:6650";
     ClientConfiguration conf;
     ExecutorServiceProviderPtr 
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
-    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
+    ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
     ServiceNameResolver serviceNameResolver(url);
     BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
 
@@ -140,7 +140,7 @@ static void testMultiAddresses(LookupService& 
lookupService) {
 }
 
 TEST(LookupServiceTest, testMultiAddresses) {
-    ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), 
AuthFactory::Disabled(), true, "");
+    ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), 
AuthFactory::Disabled(), "");
     ServiceNameResolver 
serviceNameResolver("pulsar://localhost,localhost:9999");
     ClientConfiguration conf;
     BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, 
conf);
@@ -154,7 +154,7 @@ TEST(LookupServiceTest, testMultiAddresses) {
 }
 TEST(LookupServiceTest, testRetry) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
-    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, 
"");
+    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
     ServiceNameResolver 
serviceNameResolver("pulsar://localhost:9999,localhost");
     ClientConfiguration conf;
 
@@ -188,7 +188,7 @@ TEST(LookupServiceTest, testRetry) {
 
 TEST(LookupServiceTest, testTimeout) {
     auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
-    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, 
"");
+    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
     ServiceNameResolver 
serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
     ClientConfiguration conf;
 
@@ -451,7 +451,7 @@ TEST(LookupServiceTest, testRedirectionLimit) {
     ClientConfiguration conf;
     conf.setMaxLookupRedirects(redirect_limit);
     ExecutorServiceProviderPtr 
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
-    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
+    ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
     std::string url = "pulsar://localhost:6650";
     ServiceNameResolver serviceNameResolver(url);
     BinaryProtoLookupServiceRedirectTestHelper 
lookupService(serviceNameResolver, pool_, conf);

Reply via email to