This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b6feac8  C++ HTTP lookup works with TLS (#2314) (#2350)
b6feac8 is described below

commit b6feac8180c978e3c59dfbcd4ef44494dd73b385
Author: Ivan Kelly <[email protected]>
AuthorDate: Fri Aug 10 20:10:49 2018 +0200

    C++ HTTP lookup works with TLS (#2314) (#2350)
    
    * C++ HTTP lookup works with TLS (#2314)
    
    Previously it did not take TLS into account at all. Now, if useTls is
    configured, or the client it configured with a https:// url, the http
    lookup will be configured to use TLS.
    
    This patch also makes some changes to make sure the URL the broker
    advertises for TLS starts with pulsar+ssl://.
    
    * format
---
 .../org/apache/pulsar/broker/PulsarService.java    |  2 +-
 pulsar-client-cpp/lib/ClientConnection.cc          |  2 +-
 pulsar-client-cpp/lib/ClientImpl.cc                |  5 ++-
 pulsar-client-cpp/lib/HTTPLookupService.cc         | 43 +++++++++++++++++++---
 pulsar-client-cpp/lib/HTTPLookupService.h          |  3 ++
 pulsar-client-cpp/lib/LookupDataResult.h           |  8 ++--
 pulsar-client-cpp/tests/AuthPluginTest.cc          | 22 +++++++++++
 .../pulsar/discovery/service/DiscoveryService.java |  2 +-
 8 files changed, 73 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 93a045e..ff224bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -799,7 +799,7 @@ public class PulsarService implements AutoCloseable {
 
     public static String brokerUrlTls(ServiceConfiguration config) {
         if (config.isTlsEnabled()) {
-            return "pulsar://" + advertisedAddress(config) + ":" + 
config.getBrokerServicePortTls();
+            return "pulsar+ssl://" + advertisedAddress(config) + ":" + 
config.getBrokerServicePortTls();
         } else {
             return "";
         }
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index da30707..7bafdf8 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -813,7 +813,7 @@ void ClientConnection::handleIncomingCommand() {
                                 
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
                             }
 
-                            
lookupResultPtr->setBrokerUrlSsl(lookupTopicResponse.brokerserviceurltls());
+                            
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
                             
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
                             
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
                                                          
CommandLookupTopicResponse::Redirect);
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index c7242c2..c31cf8e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -391,8 +391,9 @@ Future<Result, ClientConnectionWeakPtr> 
ClientImpl::getConnection(const std::str
 void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
                               Promise<Result, ClientConnectionWeakPtr> 
promise) {
     if (data) {
-        LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl());
-        const std::string& logicalAddress = data->getBrokerUrl();
+        const std::string& logicalAddress =
+            clientConfiguration_.isUseTls() ? data->getBrokerUrlTls() : 
data->getBrokerUrl();
+        LOG_DEBUG("Getting connection to broker: " << logicalAddress);
         const std::string& physicalAddress =
             data->shouldProxyThroughServiceUrl() ? serviceUrl_ : 
logicalAddress;
         Future<Result, ClientConnectionWeakPtr> future =
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index fe27e8d..b3cb023 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -39,7 +39,10 @@ HTTPLookupService::HTTPLookupService(const std::string 
&lookupUrl,
                                      const AuthenticationPtr &authData)
     : 
executorProvider_(boost::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
       authenticationPtr_(authData),
-      
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) {
+      
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
+      isUseTls_(clientConfiguration.isUseTls()),
+      tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
+      tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()) {
     if (lookupUrl[lookupUrl.length() - 1] == '/') {
         // Remove trailing '/'
         adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
@@ -186,6 +189,36 @@ Result HTTPLookupService::sendHTTPRequest(const 
std::string completeUrl, std::st
     }
     curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
 
+    // TLS
+    if (isUseTls_) {
+        if (curl_easy_setopt(handle, CURLOPT_SSLENGINE, NULL) != CURLE_OK) {
+            LOG_ERROR("Unable to load SSL engine for url " << completeUrl);
+            curl_easy_cleanup(handle);
+            return ResultConnectError;
+        }
+        if (curl_easy_setopt(handle, CURLOPT_SSLENGINE_DEFAULT, 1L) != 
CURLE_OK) {
+            LOG_ERROR("Unable to load SSL engine as default, for url " << 
completeUrl);
+            curl_easy_cleanup(handle);
+            return ResultConnectError;
+        }
+        curl_easy_setopt(handle, CURLOPT_SSLCERTTYPE, "PEM");
+
+        if (tlsAllowInsecure_) {
+            curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
+        } else {
+            curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
+        }
+
+        if (!tlsTrustCertsFilePath_.empty()) {
+            curl_easy_setopt(handle, CURLOPT_CAINFO, 
tlsTrustCertsFilePath_.c_str());
+        }
+
+        if (authDataContent->hasDataForTls()) {
+            curl_easy_setopt(handle, CURLOPT_SSLCERT, 
authDataContent->getTlsCertificates().c_str());
+            curl_easy_setopt(handle, CURLOPT_SSLKEY, 
authDataContent->getTlsPrivateKey().c_str());
+        }
+    }
+
     LOG_INFO("Curl Lookup Request sent for " << completeUrl);
 
     // Make get call to server
@@ -260,15 +293,15 @@ LookupDataResultPtr 
HTTPLookupService::parseLookupData(const std::string &json)
         return LookupDataResultPtr();
     }
 
-    const std::string brokerUrlSsl = root.get("brokerUrlSsl", 
defaultNotFoundString).asString();
-    if (brokerUrlSsl == defaultNotFoundString) {
-        LOG_ERROR("malformed json! - brokerUrlSsl not present" << json);
+    const std::string brokerUrlTls = root.get("brokerUrlTls", 
defaultNotFoundString).asString();
+    if (brokerUrlTls == defaultNotFoundString) {
+        LOG_ERROR("malformed json! - brokerUrlTls not present" << json);
         return LookupDataResultPtr();
     }
 
     LookupDataResultPtr lookupDataResultPtr = 
boost::make_shared<LookupDataResult>();
     lookupDataResultPtr->setBrokerUrl(brokerUrl);
-    lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
+    lookupDataResultPtr->setBrokerUrlTls(brokerUrlTls);
 
     LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
     return lookupDataResultPtr;
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h 
b/pulsar-client-cpp/lib/HTTPLookupService.h
index 66cd251..d34cf01 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -49,6 +49,9 @@ class HTTPLookupService : public LookupService, public 
boost::enable_shared_from
     std::string adminUrl_;
     AuthenticationPtr authenticationPtr_;
     int lookupTimeoutInSeconds_;
+    bool tlsAllowInsecure_;
+    bool isUseTls_;
+    std::string tlsTrustCertsFilePath_;
 
     static LookupDataResultPtr parsePartitionData(const std::string&);
     static LookupDataResultPtr parseLookupData(const std::string&);
diff --git a/pulsar-client-cpp/lib/LookupDataResult.h 
b/pulsar-client-cpp/lib/LookupDataResult.h
index 5c1387a..93cd50b 100644
--- a/pulsar-client-cpp/lib/LookupDataResult.h
+++ b/pulsar-client-cpp/lib/LookupDataResult.h
@@ -31,9 +31,9 @@ typedef boost::shared_ptr<LookupDataResultPromise> 
LookupDataResultPromisePtr;
 class LookupDataResult {
    public:
     void setBrokerUrl(const std::string& brokerUrl) { brokerUrl_ = brokerUrl; }
-    void setBrokerUrlSsl(const std::string& brokerUrlSsl) { brokerUrlSsl_ = 
brokerUrlSsl; }
+    void setBrokerUrlTls(const std::string& brokerUrlTls) { brokerUrlTls_ = 
brokerUrlTls; }
     const std::string& getBrokerUrl() const { return brokerUrl_; }
-    const std::string& getBrokerUrlSsl() const { return brokerUrlSsl_; }
+    const std::string& getBrokerUrlTls() const { return brokerUrlTls_; }
 
     bool isAuthoritative() const { return authoritative; }
 
@@ -56,7 +56,7 @@ class LookupDataResult {
    private:
     friend inline std::ostream& operator<<(std::ostream& os, const 
LookupDataResult& b);
     std::string brokerUrl_;
-    std::string brokerUrlSsl_;
+    std::string brokerUrlTls_;
     int partitions;
     bool authoritative;
     bool redirect;
@@ -65,7 +65,7 @@ class LookupDataResult {
 };
 
 std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
-    os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] 
[brokerUrlSsl_ = " << b.brokerUrlSsl_
+    os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] 
[brokerUrlTls_ = " << b.brokerUrlTls_
        << "] [partitions = " << b.partitions << "] [authoritative = " << 
b.authoritative
        << "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " << 
b.proxyThroughServiceUrl_
        << "] }";
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc 
b/pulsar-client-cpp/tests/AuthPluginTest.cc
index d65729f..7576d7f 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -141,6 +141,28 @@ TEST(AuthPluginTest, testTlsDetectPulsarSsl) {
     ASSERT_EQ(ResultOk, result);
 }
 
+TEST(AuthPluginTest, testTlsDetectHttps) {
+    ClientConfiguration config = ClientConfiguration();
+    config.setUseTls(true);  // shouldn't be needed soon
+    
config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem");
+    config.setTlsAllowInsecureConnection(false);
+    AuthenticationPtr auth =
+        
pulsar::AuthTls::create("../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem",
+                                
"../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem");
+    config.setAuth(auth);
+
+    Client client("https://localhost:9766";, config);
+
+    std::string topicName = 
"persistent://property/cluster/namespace/test-tls-detect-https";
+
+    Producer producer;
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+}
+
 namespace testAthenz {
 std::string principalToken;
 void mockZTS() {
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 38ae7eb..c02baaa 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -158,7 +158,7 @@ public class DiscoveryService implements Closeable {
 
     public String serviceUrlTls() {
         if (config.isTlsEnabled()) {
-            return new 
StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePortTls())
+            return new 
StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls())
                     .toString();
         } else {
             return "";

Reply via email to