This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 25ea451 [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP
client (#373)
25ea451 is described below
commit 25ea451eb3a79d48966689ec64460ae03d5d57da
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Dec 18 19:36:12 2023 -0800
[PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373)
* [PIP-60] [Proxy-Server] Support SNI routing for Pulsar CPP client
* fix format
* fix const def
* Fix format check
---------
Co-authored-by: Yunze Xu <[email protected]>
---
include/pulsar/ClientConfiguration.h | 31 +++++++++++++++++++++++++++++++
lib/ClientConfiguration.cc | 16 ++++++++++++++++
lib/ClientConfigurationImpl.h | 2 ++
lib/ClientConnection.cc | 20 ++++++++++++++++----
lib/ClientConnection.h | 5 +++++
tests/ConsumerTest.cc | 12 ++++++++++++
6 files changed, 82 insertions(+), 4 deletions(-)
diff --git a/include/pulsar/ClientConfiguration.h
b/include/pulsar/ClientConfiguration.h
index 3d651e9..cc3c3ed 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -32,6 +32,10 @@ class PULSAR_PUBLIC ClientConfiguration {
~ClientConfiguration();
ClientConfiguration(const ClientConfiguration&);
ClientConfiguration& operator=(const ClientConfiguration&);
+ enum ProxyProtocol
+ {
+ SNI = 0
+ };
/**
* Configure a limit on the amount of memory that will be allocated by
this client instance.
@@ -320,6 +324,33 @@ class PULSAR_PUBLIC ClientConfiguration {
*/
ClientConfiguration& setConnectionTimeout(int timeoutMs);
+ /**
+ * Set proxy-service url when client would like to connect to broker via
proxy. Client must configure both
+ * proxyServiceUrl and appropriate proxyProtocol.
+ *
+ * Example: pulsar+ssl://ats-proxy.example.com:4443
+ *
+ * @param proxyServiceUrl proxy url to connect with broker
+ * @return
+ */
+ ClientConfiguration& setProxyServiceUrl(const std::string&
proxyServiceUrl);
+
+ const std::string& getProxyServiceUrl() const;
+
+ /**
+ * Set appropriate proxy-protocol along with proxy-service url. Currently
Pulsar supports SNI proxy
+ * routing.
+ *
+ * SNI routing:
+ *
https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
+ *
+ * @param proxyProtocol possible options (SNI)
+ * @return
+ */
+ ClientConfiguration& setProxyProtocol(ProxyProtocol proxyProtocol);
+
+ ProxyProtocol getProxyProtocol() const;
+
/**
* The getter associated with setConnectionTimeout().
*/
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 63c0bf8..6e7c745 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -134,6 +134,22 @@ ClientConfiguration&
ClientConfiguration::setConcurrentLookupRequest(int concurr
return *this;
}
+ClientConfiguration& ClientConfiguration::setProxyServiceUrl(const
std::string& proxyServiceUrl) {
+ impl_->proxyServiceUrl = proxyServiceUrl;
+ return *this;
+}
+
+const std::string& ClientConfiguration::getProxyServiceUrl() const { return
impl_->proxyServiceUrl; }
+
+ClientConfiguration&
ClientConfiguration::setProxyProtocol(ClientConfiguration::ProxyProtocol
proxyProtocol) {
+ impl_->proxyProtocol = proxyProtocol;
+ return *this;
+}
+
+ClientConfiguration::ProxyProtocol ClientConfiguration::getProxyProtocol()
const {
+ return impl_->proxyProtocol;
+}
+
int ClientConfiguration::getConcurrentLookupRequest() const { return
impl_->concurrentLookupRequest; }
ClientConfiguration& ClientConfiguration::setMaxLookupRedirects(int
maxLookupRedirects) {
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index 3458a05..b62b97c 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -46,6 +46,8 @@ struct ClientConfigurationImpl {
std::string listenerName;
int connectionTimeoutMs{10000}; // 10 seconds
std::string description;
+ std::string proxyServiceUrl;
+ ClientConfiguration::ProxyProtocol proxyProtocol;
std::unique_ptr<LoggerFactory> takeLogger() { return
std::move(loggerFactory); }
};
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 97d8847..61aa7f7 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -209,7 +209,15 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
boost::asio::ssl::context ctx(executor_->getIOService(),
boost::asio::ssl::context::tlsv1_client);
#endif
Url serviceUrl;
+ Url proxyUrl;
Url::parse(physicalAddress, serviceUrl);
+ proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl();
+ proxyProtocol_ = clientConfiguration.getProxyProtocol();
+ if (proxyProtocol_ == ClientConfiguration::SNI &&
!proxyServiceUrl_.empty()) {
+ Url::parse(proxyServiceUrl_, proxyUrl);
+ isSniProxy_ = true;
+ LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
+ }
if (clientConfiguration.isTlsAllowInsecureConnection()) {
ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
isTlsAllowInsecureConnection_ = true;
@@ -257,7 +265,8 @@ ClientConnection::ClientConnection(const std::string&
logicalAddress, const std:
if (!clientConfiguration.isTlsAllowInsecureConnection() &&
clientConfiguration.isValidateHostName()) {
LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":"
<< serviceUrl.port());
-
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(serviceUrl.host()));
+ std::string urlHost = isSniProxy_ ? proxyUrl.host() :
serviceUrl.host();
+
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost));
}
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
@@ -403,7 +412,8 @@ void ClientConnection::handleTcpConnected(const
boost::system::error_code& err,
if (logicalAddress_ == physicalAddress_) {
LOG_INFO(cnxString_ << "Connected to broker");
} else {
- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical
broker: " << logicalAddress_);
+ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical
broker: " << logicalAddress_
+ << ", proxy: " << proxyServiceUrl_);
}
Lock lock(mutex_);
@@ -572,7 +582,8 @@ void ClientConnection::tcpConnectAsync() {
boost::system::error_code err;
Url service_url;
- if (!Url::parse(physicalAddress_, service_url)) {
+ std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
+ if (!Url::parse(hostUrl, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " "
<< err.message());
close();
return;
@@ -600,7 +611,8 @@ void ClientConnection::tcpConnectAsync() {
void ClientConnection::handleResolve(const boost::system::error_code& err,
tcp::resolver::iterator endpointIterator)
{
if (err) {
- LOG_ERROR(cnxString_ << "Resolve error: " << err << " : " <<
err.message());
+ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " <<
err.message());
close();
return;
}
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index ce2bd88..1bc1bd8 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -333,6 +333,10 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
*/
const std::string physicalAddress_;
+ std::string proxyServiceUrl_;
+
+ ClientConfiguration::ProxyProtocol proxyProtocol_;
+
// Represent both endpoint of the tcp connection. eg: [client:1234 ->
server:6650]
std::string cnxString_;
@@ -384,6 +388,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
// Signals whether we're waiting for a response from broker
bool havePendingPingRequest_ = false;
+ bool isSniProxy_ = false;
DeadlineTimerPtr keepAliveTimer_;
DeadlineTimerPtr consumerStatsRequestTimer_;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f4ae88d..c7e98bc 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1478,4 +1478,16 @@ TEST(ConsumerTest, testConsumerName) {
client.close();
}
+TEST(ConsumerTest, testSNIProxyConnect) {
+ ClientConfiguration clientConfiguration;
+ clientConfiguration.setProxyServiceUrl(lookupUrl);
+ clientConfiguration.setProxyProtocol(ClientConfiguration::SNI);
+
+ Client client(lookupUrl, clientConfiguration);
+ const std::string topic = "testSNIProxy-" + std::to_string(time(nullptr));
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
+ client.close();
+}
} // namespace pulsar