This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 25ea451  [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP 
client (#373)
25ea451 is described below

commit 25ea451eb3a79d48966689ec64460ae03d5d57da
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Dec 18 19:36:12 2023 -0800

    [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373)
    
    * [PIP-60] [Proxy-Server] Support SNI routing for Pulsar CPP client
    
    * fix format
    
    * fix const def
    
    * Fix format check
    
    ---------
    
    Co-authored-by: Yunze Xu <[email protected]>
---
 include/pulsar/ClientConfiguration.h | 31 +++++++++++++++++++++++++++++++
 lib/ClientConfiguration.cc           | 16 ++++++++++++++++
 lib/ClientConfigurationImpl.h        |  2 ++
 lib/ClientConnection.cc              | 20 ++++++++++++++++----
 lib/ClientConnection.h               |  5 +++++
 tests/ConsumerTest.cc                | 12 ++++++++++++
 6 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/include/pulsar/ClientConfiguration.h 
b/include/pulsar/ClientConfiguration.h
index 3d651e9..cc3c3ed 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -32,6 +32,10 @@ class PULSAR_PUBLIC ClientConfiguration {
     ~ClientConfiguration();
     ClientConfiguration(const ClientConfiguration&);
     ClientConfiguration& operator=(const ClientConfiguration&);
+    enum ProxyProtocol
+    {
+        SNI = 0
+    };
 
     /**
      * Configure a limit on the amount of memory that will be allocated by 
this client instance.
@@ -320,6 +324,33 @@ class PULSAR_PUBLIC ClientConfiguration {
      */
     ClientConfiguration& setConnectionTimeout(int timeoutMs);
 
+    /**
+     * Set proxy-service url when client would like to connect to broker via 
proxy. Client must configure both
+     * proxyServiceUrl and appropriate proxyProtocol.
+     *
+     * Example: pulsar+ssl://ats-proxy.example.com:4443
+     *
+     * @param proxyServiceUrl proxy url to connect with broker
+     * @return
+     */
+    ClientConfiguration& setProxyServiceUrl(const std::string& 
proxyServiceUrl);
+
+    const std::string& getProxyServiceUrl() const;
+
+    /**
+     * Set appropriate proxy-protocol along with proxy-service url. Currently 
Pulsar supports SNI proxy
+     * routing.
+     *
+     * SNI routing:
+     * 
https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
+     *
+     * @param proxyProtocol possible options (SNI)
+     * @return
+     */
+    ClientConfiguration& setProxyProtocol(ProxyProtocol proxyProtocol);
+
+    ProxyProtocol getProxyProtocol() const;
+
     /**
      * The getter associated with setConnectionTimeout().
      */
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 63c0bf8..6e7c745 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -134,6 +134,22 @@ ClientConfiguration& 
ClientConfiguration::setConcurrentLookupRequest(int concurr
     return *this;
 }
 
+ClientConfiguration& ClientConfiguration::setProxyServiceUrl(const 
std::string& proxyServiceUrl) {
+    impl_->proxyServiceUrl = proxyServiceUrl;
+    return *this;
+}
+
+const std::string& ClientConfiguration::getProxyServiceUrl() const { return 
impl_->proxyServiceUrl; }
+
+ClientConfiguration& 
ClientConfiguration::setProxyProtocol(ClientConfiguration::ProxyProtocol 
proxyProtocol) {
+    impl_->proxyProtocol = proxyProtocol;
+    return *this;
+}
+
+ClientConfiguration::ProxyProtocol ClientConfiguration::getProxyProtocol() 
const {
+    return impl_->proxyProtocol;
+}
+
 int ClientConfiguration::getConcurrentLookupRequest() const { return 
impl_->concurrentLookupRequest; }
 
 ClientConfiguration& ClientConfiguration::setMaxLookupRedirects(int 
maxLookupRedirects) {
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index 3458a05..b62b97c 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -46,6 +46,8 @@ struct ClientConfigurationImpl {
     std::string listenerName;
     int connectionTimeoutMs{10000};  // 10 seconds
     std::string description;
+    std::string proxyServiceUrl;
+    ClientConfiguration::ProxyProtocol proxyProtocol;
 
     std::unique_ptr<LoggerFactory> takeLogger() { return 
std::move(loggerFactory); }
 };
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 97d8847..61aa7f7 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -209,7 +209,15 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
         boost::asio::ssl::context ctx(executor_->getIOService(), 
boost::asio::ssl::context::tlsv1_client);
 #endif
         Url serviceUrl;
+        Url proxyUrl;
         Url::parse(physicalAddress, serviceUrl);
+        proxyServiceUrl_ = clientConfiguration.getProxyServiceUrl();
+        proxyProtocol_ = clientConfiguration.getProxyProtocol();
+        if (proxyProtocol_ == ClientConfiguration::SNI && 
!proxyServiceUrl_.empty()) {
+            Url::parse(proxyServiceUrl_, proxyUrl);
+            isSniProxy_ = true;
+            LOG_INFO("Configuring SNI Proxy-url=" << proxyServiceUrl_);
+        }
         if (clientConfiguration.isTlsAllowInsecureConnection()) {
             ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
             isTlsAllowInsecureConnection_ = true;
@@ -257,7 +265,8 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
 
         if (!clientConfiguration.isTlsAllowInsecureConnection() && 
clientConfiguration.isValidateHostName()) {
             LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" 
<< serviceUrl.port());
-            
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(serviceUrl.host()));
+            std::string urlHost = isSniProxy_ ? proxyUrl.host() : 
serviceUrl.host();
+            
tlsSocket_->set_verify_callback(boost::asio::ssl::rfc2818_verification(urlHost));
         }
 
         LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
@@ -403,7 +412,8 @@ void ClientConnection::handleTcpConnected(const 
boost::system::error_code& err,
         if (logicalAddress_ == physicalAddress_) {
             LOG_INFO(cnxString_ << "Connected to broker");
         } else {
-            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical 
broker: " << logicalAddress_);
+            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical 
broker: " << logicalAddress_
+                                << ", proxy: " << proxyServiceUrl_);
         }
 
         Lock lock(mutex_);
@@ -572,7 +582,8 @@ void ClientConnection::tcpConnectAsync() {
 
     boost::system::error_code err;
     Url service_url;
-    if (!Url::parse(physicalAddress_, service_url)) {
+    std::string hostUrl = isSniProxy_ ? proxyServiceUrl_ : physicalAddress_;
+    if (!Url::parse(hostUrl, service_url)) {
         LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " 
<< err.message());
         close();
         return;
@@ -600,7 +611,8 @@ void ClientConnection::tcpConnectAsync() {
 void ClientConnection::handleResolve(const boost::system::error_code& err,
                                      tcp::resolver::iterator endpointIterator) 
{
     if (err) {
-        LOG_ERROR(cnxString_ << "Resolve error: " << err << " : " << 
err.message());
+        std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
+        LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << 
err.message());
         close();
         return;
     }
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index ce2bd88..1bc1bd8 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -333,6 +333,10 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
      */
     const std::string physicalAddress_;
 
+    std::string proxyServiceUrl_;
+
+    ClientConfiguration::ProxyProtocol proxyProtocol_;
+
     // Represent both endpoint of the tcp connection. eg: [client:1234 -> 
server:6650]
     std::string cnxString_;
 
@@ -384,6 +388,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     // Signals whether we're waiting for a response from broker
     bool havePendingPingRequest_ = false;
+    bool isSniProxy_ = false;
     DeadlineTimerPtr keepAliveTimer_;
     DeadlineTimerPtr consumerStatsRequestTimer_;
 
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f4ae88d..c7e98bc 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -1478,4 +1478,16 @@ TEST(ConsumerTest, testConsumerName) {
     client.close();
 }
 
+TEST(ConsumerTest, testSNIProxyConnect) {
+    ClientConfiguration clientConfiguration;
+    clientConfiguration.setProxyServiceUrl(lookupUrl);
+    clientConfiguration.setProxyProtocol(ClientConfiguration::SNI);
+
+    Client client(lookupUrl, clientConfiguration);
+    const std::string topic = "testSNIProxy-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));
+    client.close();
+}
 }  // namespace pulsar

Reply via email to