This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 81cc562 Added support for multiple connections to each broker (#336)
81cc562 is described below
commit 81cc562f7b366fad97e1b80c07ef9334a808390d
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Oct 31 18:43:39 2023 -0700
Added support for multiple connections to each broker (#336)
---
include/pulsar/Client.h | 2 --
include/pulsar/ClientConfiguration.h | 15 +++++++++
lib/Client.cc | 8 ++---
lib/ClientConfiguration.cc | 10 ++++++
lib/ClientConfigurationImpl.h | 1 +
lib/ClientImpl.cc | 5 ++-
lib/ClientImpl.h | 3 +-
lib/ConnectionPool.cc | 59 ++++++++++++++++++------------------
lib/ConnectionPool.h | 8 +++--
perf/PerfConsumer.cc | 19 ++++--------
perf/PerfProducer.cc | 18 +++--------
tests/LookupServiceTest.cc | 10 +++---
12 files changed, 82 insertions(+), 76 deletions(-)
diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h
index 3514934..47e82f4 100644
--- a/include/pulsar/Client.h
+++ b/include/pulsar/Client.h
@@ -415,8 +415,6 @@ class PULSAR_PUBLIC Client {
std::function<void(Result, const SchemaInfo&)>
callback);
private:
- Client(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration,
- bool poolConnections);
Client(const std::shared_ptr<ClientImpl>);
friend class PulsarFriend;
diff --git a/include/pulsar/ClientConfiguration.h
b/include/pulsar/ClientConfiguration.h
index 4074d00..3d651e9 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -46,6 +46,21 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
uint64_t getMemoryLimit() const;
+ /**
+ * Sets the max number of connection that the client library will open to
a single broker.
+ * By default, the connection pool will use a single connection for all
the producers and consumers.
+ * Increasing this parameter may improve throughput when using many
producers over a high latency
+ * connection.
+ *
+ * @param connectionsPerBroker max number of connections per broker (needs
to be greater than 0)
+ */
+ ClientConfiguration& setConnectionsPerBroker(int connectionsPerBroker);
+
+ /**
+ * @return the max number of connection that the client library will open
to a single broker
+ */
+ int getConnectionsPerBroker() const;
+
/**
* Set the authentication method to be used with the broker
*
diff --git a/lib/Client.cc b/lib/Client.cc
index 48c4a67..8e916c5 100644
--- a/lib/Client.cc
+++ b/lib/Client.cc
@@ -36,14 +36,10 @@ namespace pulsar {
Client::Client(const std::shared_ptr<ClientImpl> impl) : impl_(impl) {}
Client::Client(const std::string& serviceUrl)
- : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration(),
true)) {}
+ : impl_(std::make_shared<ClientImpl>(serviceUrl, ClientConfiguration())) {}
Client::Client(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration)
- : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration,
true)) {}
-
-Client::Client(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration,
- bool poolConnections)
- : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration,
poolConnections)) {}
+ : impl_(std::make_shared<ClientImpl>(serviceUrl, clientConfiguration)) {}
Result Client::createProducer(const std::string& topic, Producer& producer) {
return createProducer(topic, ProducerConfiguration(), producer);
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 95779e6..977a880 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -40,6 +40,16 @@ ClientConfiguration&
ClientConfiguration::setMemoryLimit(uint64_t memoryLimitByt
uint64_t ClientConfiguration::getMemoryLimit() const { return
impl_->memoryLimit; }
+ClientConfiguration& ClientConfiguration::setConnectionsPerBroker(int
connectionsPerBroker) {
+ if (connectionsPerBroker <= 0) {
+ throw std::invalid_argument("connectionsPerBroker should be greater
than 0");
+ }
+ impl_->connectionsPerBroker = connectionsPerBroker;
+ return *this;
+}
+
+int ClientConfiguration::getConnectionsPerBroker() const { return
impl_->connectionsPerBroker; }
+
ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr&
authentication) {
impl_->authenticationPtr = authentication;
return *this;
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index 0f979bb..3458a05 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -27,6 +27,7 @@ struct ClientConfigurationImpl {
AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
uint64_t memoryLimit{0ull};
int ioThreads{1};
+ int connectionsPerBroker{1};
int operationTimeoutSeconds{30};
int messageListenerThreads{1};
int concurrentLookupRequest{50000};
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index dd19225..cb11b8e 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -75,8 +75,7 @@ typedef std::unique_lock<std::mutex> Lock;
typedef std::vector<std::string> StringList;
-ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration& clientConfiguration,
- bool poolConnections)
+ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration& clientConfiguration)
: mutex_(),
state_(Open),
serviceNameResolver_(serviceUrl),
@@ -87,7 +86,7 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const
ClientConfiguration&
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
partitionListenerExecutorProvider_(
std::make_shared<ExecutorServiceProvider>(clientConfiguration_.getMessageListenerThreads())),
- pool_(clientConfiguration_, ioExecutorProvider_,
clientConfiguration_.getAuthPtr(), poolConnections,
+ pool_(clientConfiguration_, ioExecutorProvider_,
clientConfiguration_.getAuthPtr(),
ClientImpl::getClientVersion(clientConfiguration)),
producerIdGenerator_(0),
consumerIdGenerator_(0),
diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h
index c1a5167..0c6eeb4 100644
--- a/lib/ClientImpl.h
+++ b/lib/ClientImpl.h
@@ -68,8 +68,7 @@ std::string generateRandomName();
class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
public:
- ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration,
- bool poolConnections);
+ ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
clientConfiguration);
~ClientImpl();
/**
diff --git a/lib/ConnectionPool.cc b/lib/ConnectionPool.cc
index a1b19c6..3232512 100644
--- a/lib/ConnectionPool.cc
+++ b/lib/ConnectionPool.cc
@@ -34,13 +34,13 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
ConnectionPool::ConnectionPool(const ClientConfiguration& conf,
ExecutorServiceProviderPtr executorProvider,
- const AuthenticationPtr& authentication, bool
poolConnections,
- const std::string& clientVersion)
+ const AuthenticationPtr& authentication, const
std::string& clientVersion)
: clientConfiguration_(conf),
executorProvider_(executorProvider),
authentication_(authentication),
- poolConnections_(poolConnections),
- clientVersion_(clientVersion) {}
+ clientVersion_(clientVersion),
+ randomDistribution_(0, conf.getConnectionsPerBroker() - 1),
+
randomEngine_(std::chrono::high_resolution_clock::now().time_since_epoch().count())
{}
bool ConnectionPool::close() {
bool expectedState = false;
@@ -49,16 +49,15 @@ bool ConnectionPool::close() {
}
std::unique_lock<std::recursive_mutex> lock(mutex_);
- if (poolConnections_) {
- for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
- auto& cnx = cnxIt->second;
- if (cnx) {
- // The 2nd argument is false because removing a value during
the iteration will cause segfault
- cnx->close(ResultDisconnected, false);
- }
+
+ for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
+ auto& cnx = cnxIt->second;
+ if (cnx) {
+ // The 2nd argument is false because removing a value during the
iteration will cause segfault
+ cnx->close(ResultDisconnected, false);
}
- pool_.clear();
}
+ pool_.clear();
return true;
}
@@ -72,22 +71,24 @@ Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(
std::unique_lock<std::recursive_mutex> lock(mutex_);
- if (poolConnections_) {
- PoolMap::iterator cnxIt = pool_.find(logicalAddress);
- if (cnxIt != pool_.end()) {
- auto& cnx = cnxIt->second;
-
- if (!cnx->isClosed()) {
- // Found a valid or pending connection in the pool
- LOG_DEBUG("Got connection from pool for " << logicalAddress <<
" use_count: " //
- << (cnx.use_count())
<< " @ " << cnx.get());
- return cnx->getConnectFuture();
- } else {
- // The closed connection should have been removed from the
pool in ClientConnection::close
- LOG_WARN("Deleting stale connection from pool for "
- << logicalAddress << " use_count: " <<
(cnx.use_count()) << " @ " << cnx.get());
- pool_.erase(logicalAddress);
- }
+ std::stringstream ss;
+ ss << logicalAddress << '-' << randomDistribution_(randomEngine_);
+ const std::string key = ss.str();
+
+ PoolMap::iterator cnxIt = pool_.find(key);
+ if (cnxIt != pool_.end()) {
+ auto& cnx = cnxIt->second;
+
+ if (!cnx->isClosed()) {
+ // Found a valid or pending connection in the pool
+ LOG_DEBUG("Got connection from pool for " << key << " use_count: "
//
+ << (cnx.use_count()) <<
" @ " << cnx.get());
+ return cnx->getConnectFuture();
+ } else {
+ // The closed connection should have been removed from the pool in
ClientConnection::close
+ LOG_WARN("Deleting stale connection from pool for " << key << "
use_count: " << (cnx.use_count())
+ << " @ " <<
cnx.get());
+ pool_.erase(key);
}
}
@@ -107,7 +108,7 @@ Future<Result, ClientConnectionWeakPtr>
ConnectionPool::getConnectionAsync(
LOG_INFO("Created connection for " << logicalAddress);
Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
- pool_.insert(std::make_pair(logicalAddress, cnx));
+ pool_.insert(std::make_pair(key, cnx));
lock.unlock();
diff --git a/lib/ConnectionPool.h b/lib/ConnectionPool.h
index 5e7f6ee..c582dc9 100644
--- a/lib/ConnectionPool.h
+++ b/lib/ConnectionPool.h
@@ -27,6 +27,7 @@
#include <map>
#include <memory>
#include <mutex>
+#include <random>
#include <string>
#include "Future.h"
@@ -41,8 +42,7 @@ using ExecutorServiceProviderPtr =
std::shared_ptr<ExecutorServiceProvider>;
class PULSAR_PUBLIC ConnectionPool {
public:
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr
executorProvider,
- const AuthenticationPtr& authentication, bool
poolConnections,
- const std::string& clientVersion);
+ const AuthenticationPtr& authentication, const std::string&
clientVersion);
/**
* Close the connection pool.
@@ -82,11 +82,13 @@ class PULSAR_PUBLIC ConnectionPool {
AuthenticationPtr authentication_;
typedef std::map<std::string, std::shared_ptr<ClientConnection>> PoolMap;
PoolMap pool_;
- bool poolConnections_;
const std::string clientVersion_;
mutable std::recursive_mutex mutex_;
std::atomic_bool closed_{false};
+ std::uniform_int_distribution<> randomDistribution_;
+ std::mt19937 randomEngine_;
+
friend class PulsarFriend;
};
} // namespace pulsar
diff --git a/perf/PerfConsumer.cc b/perf/PerfConsumer.cc
index c4f1f00..424809b 100644
--- a/perf/PerfConsumer.cc
+++ b/perf/PerfConsumer.cc
@@ -67,20 +67,12 @@ struct Arguments {
int receiverQueueSize;
int ioThreads;
int listenerThreads;
- bool poolConnections;
+ int connectionsPerBroker;
+
std::string encKeyName;
std::string encKeyValueFile;
};
-namespace pulsar {
-class PulsarFriend {
- public:
- static Client getClient(const std::string& url, const ClientConfiguration
conf, bool poolConnections) {
- return Client(url, conf, poolConnections);
- }
-};
-} // namespace pulsar
-
#if __GNUC__ == 4 && __GNUC_MINOR__ == 4
// Used for gcc-4.4.7 with boost-1.41
#include <cstdatomic>
@@ -167,6 +159,7 @@ void startPerfConsumer(const Arguments& args) {
std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
+ conf.setConnectionsPerBroker(args.connectionsPerBroker);
conf.setIOThreads(args.ioThreads);
conf.setMessageListenerThreads(args.listenerThreads);
if (!args.authPlugin.empty()) {
@@ -174,7 +167,7 @@ void startPerfConsumer(const Arguments& args) {
conf.setAuth(auth);
}
- Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf,
args.poolConnections));
+ Client client(args.serviceURL, conf);
ConsumerConfiguration consumerConf;
consumerConf.setMessageListener(messageListener);
@@ -299,8 +292,8 @@ int main(int argc, char** argv) {
("listener-threads,l",
po::value<int>(&args.listenerThreads)->default_value(1),
"Number of listener threads") //
- ("pool-connections",
po::value<bool>(&args.poolConnections)->default_value(false),
- "whether pool connections used") //
+ ("connections-per-broker",
po::value<int>(&args.connectionsPerBroker)->default_value(1),
+ "Number of connections per each broker") //
("encryption-key-name,k",
po::value<std::string>(&args.encKeyName)->default_value(""),
"The private key name to decrypt payload") //
diff --git a/perf/PerfProducer.cc b/perf/PerfProducer.cc
index 04d5cbf..aeda8e8 100644
--- a/perf/PerfProducer.cc
+++ b/perf/PerfProducer.cc
@@ -64,21 +64,12 @@ struct Arguments {
unsigned int batchingMaxMessages;
long batchingMaxAllowedSizeInBytes;
long batchingMaxPublishDelayMs;
- bool poolConnections;
+ int connectionsPerBroker;
std::string encKeyName;
std::string encKeyValueFile;
std::string compression;
};
-namespace pulsar {
-class PulsarFriend {
- public:
- static Client getClient(const std::string& url, const ClientConfiguration
conf, bool poolConnections) {
- return Client(url, conf, poolConnections);
- }
-};
-} // namespace pulsar
-
unsigned long messagesProduced;
unsigned long bytesProduced;
using namespace boost::accumulators;
@@ -283,8 +274,8 @@ int main(int argc, char** argv) {
po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000),
"Use only is batch-size > 1, Default is 3 seconds") //
- ("pool-connections",
po::value<bool>(&args.poolConnections)->default_value(false),
- "whether pool connections used") //
+ ("connections-per-broker",
po::value<int>(&args.connectionsPerBroker)->default_value(1),
+ "Number of connections per each broker") //
("encryption-key-name,k",
po::value<std::string>(&args.encKeyName)->default_value(""),
"The public key name to encrypt payload") //
@@ -371,6 +362,7 @@ int main(int argc, char** argv) {
producerConf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
pulsar::ClientConfiguration conf;
+ conf.setConnectionsPerBroker(args.connectionsPerBroker);
conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024);
conf.setUseTls(args.isUseTls);
conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
@@ -385,7 +377,7 @@ int main(int argc, char** argv) {
conf.setAuth(auth);
}
- pulsar::Client client(pulsar::PulsarFriend::getClient(args.serviceURL,
conf, args.poolConnections));
+ pulsar::Client client(args.serviceURL, conf);
std::atomic<bool> exitCondition(false);
startPerfProducer(args, producerConf, client, exitCondition);
diff --git a/tests/LookupServiceTest.cc b/tests/LookupServiceTest.cc
index bc6ea2b..3457bd4 100644
--- a/tests/LookupServiceTest.cc
+++ b/tests/LookupServiceTest.cc
@@ -76,7 +76,7 @@ TEST(LookupServiceTest, basicLookup) {
std::string url = "pulsar://localhost:6650";
ClientConfiguration conf;
ExecutorServiceProviderPtr
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
- ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
+ ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
ServiceNameResolver serviceNameResolver(url);
BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf);
@@ -140,7 +140,7 @@ static void testMultiAddresses(LookupService&
lookupService) {
}
TEST(LookupServiceTest, testMultiAddresses) {
- ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1),
AuthFactory::Disabled(), true, "");
+ ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1),
AuthFactory::Disabled(), "");
ServiceNameResolver
serviceNameResolver("pulsar://localhost,localhost:9999");
ClientConfiguration conf;
BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool,
conf);
@@ -154,7 +154,7 @@ TEST(LookupServiceTest, testMultiAddresses) {
}
TEST(LookupServiceTest, testRetry) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
- ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true,
"");
+ ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
ServiceNameResolver
serviceNameResolver("pulsar://localhost:9999,localhost");
ClientConfiguration conf;
@@ -188,7 +188,7 @@ TEST(LookupServiceTest, testRetry) {
TEST(LookupServiceTest, testTimeout) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
- ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true,
"");
+ ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), "");
ServiceNameResolver
serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
ClientConfiguration conf;
@@ -451,7 +451,7 @@ TEST(LookupServiceTest, testRedirectionLimit) {
ClientConfiguration conf;
conf.setMaxLookupRedirects(redirect_limit);
ExecutorServiceProviderPtr
ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
- ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, "");
+ ConnectionPool pool_(conf, ioExecutorProvider_, authData, "");
std::string url = "pulsar://localhost:6650";
ServiceNameResolver serviceNameResolver(url);
BinaryProtoLookupServiceRedirectTestHelper
lookupService(serviceNameResolver, pool_, conf);