This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b6feac8 C++ HTTP lookup works with TLS (#2314) (#2350)
b6feac8 is described below
commit b6feac8180c978e3c59dfbcd4ef44494dd73b385
Author: Ivan Kelly <[email protected]>
AuthorDate: Fri Aug 10 20:10:49 2018 +0200
C++ HTTP lookup works with TLS (#2314) (#2350)
* C++ HTTP lookup works with TLS (#2314)
Previously it did not take TLS into account at all. Now, if useTls is
configured, or the client it configured with a https:// url, the http
lookup will be configured to use TLS.
This patch also makes some changes to make sure the URL the broker
advertises for TLS starts with pulsar+ssl://.
* format
---
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
pulsar-client-cpp/lib/ClientConnection.cc | 2 +-
pulsar-client-cpp/lib/ClientImpl.cc | 5 ++-
pulsar-client-cpp/lib/HTTPLookupService.cc | 43 +++++++++++++++++++---
pulsar-client-cpp/lib/HTTPLookupService.h | 3 ++
pulsar-client-cpp/lib/LookupDataResult.h | 8 ++--
pulsar-client-cpp/tests/AuthPluginTest.cc | 22 +++++++++++
.../pulsar/discovery/service/DiscoveryService.java | 2 +-
8 files changed, 73 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 93a045e..ff224bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -799,7 +799,7 @@ public class PulsarService implements AutoCloseable {
public static String brokerUrlTls(ServiceConfiguration config) {
if (config.isTlsEnabled()) {
- return "pulsar://" + advertisedAddress(config) + ":" +
config.getBrokerServicePortTls();
+ return "pulsar+ssl://" + advertisedAddress(config) + ":" +
config.getBrokerServicePortTls();
} else {
return "";
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc
b/pulsar-client-cpp/lib/ClientConnection.cc
index da30707..7bafdf8 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -813,7 +813,7 @@ void ClientConnection::handleIncomingCommand() {
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
}
-
lookupResultPtr->setBrokerUrlSsl(lookupTopicResponse.brokerserviceurltls());
+
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
CommandLookupTopicResponse::Redirect);
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc
b/pulsar-client-cpp/lib/ClientImpl.cc
index c7242c2..c31cf8e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -391,8 +391,9 @@ Future<Result, ClientConnectionWeakPtr>
ClientImpl::getConnection(const std::str
void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
Promise<Result, ClientConnectionWeakPtr>
promise) {
if (data) {
- LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl());
- const std::string& logicalAddress = data->getBrokerUrl();
+ const std::string& logicalAddress =
+ clientConfiguration_.isUseTls() ? data->getBrokerUrlTls() :
data->getBrokerUrl();
+ LOG_DEBUG("Getting connection to broker: " << logicalAddress);
const std::string& physicalAddress =
data->shouldProxyThroughServiceUrl() ? serviceUrl_ :
logicalAddress;
Future<Result, ClientConnectionWeakPtr> future =
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index fe27e8d..b3cb023 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -39,7 +39,10 @@ HTTPLookupService::HTTPLookupService(const std::string
&lookupUrl,
const AuthenticationPtr &authData)
:
executorProvider_(boost::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
authenticationPtr_(authData),
-
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) {
+
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
+ isUseTls_(clientConfiguration.isUseTls()),
+ tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
+ tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()) {
if (lookupUrl[lookupUrl.length() - 1] == '/') {
// Remove trailing '/'
adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
@@ -186,6 +189,36 @@ Result HTTPLookupService::sendHTTPRequest(const
std::string completeUrl, std::st
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
+ // TLS
+ if (isUseTls_) {
+ if (curl_easy_setopt(handle, CURLOPT_SSLENGINE, NULL) != CURLE_OK) {
+ LOG_ERROR("Unable to load SSL engine for url " << completeUrl);
+ curl_easy_cleanup(handle);
+ return ResultConnectError;
+ }
+ if (curl_easy_setopt(handle, CURLOPT_SSLENGINE_DEFAULT, 1L) !=
CURLE_OK) {
+ LOG_ERROR("Unable to load SSL engine as default, for url " <<
completeUrl);
+ curl_easy_cleanup(handle);
+ return ResultConnectError;
+ }
+ curl_easy_setopt(handle, CURLOPT_SSLCERTTYPE, "PEM");
+
+ if (tlsAllowInsecure_) {
+ curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
+ } else {
+ curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
+ }
+
+ if (!tlsTrustCertsFilePath_.empty()) {
+ curl_easy_setopt(handle, CURLOPT_CAINFO,
tlsTrustCertsFilePath_.c_str());
+ }
+
+ if (authDataContent->hasDataForTls()) {
+ curl_easy_setopt(handle, CURLOPT_SSLCERT,
authDataContent->getTlsCertificates().c_str());
+ curl_easy_setopt(handle, CURLOPT_SSLKEY,
authDataContent->getTlsPrivateKey().c_str());
+ }
+ }
+
LOG_INFO("Curl Lookup Request sent for " << completeUrl);
// Make get call to server
@@ -260,15 +293,15 @@ LookupDataResultPtr
HTTPLookupService::parseLookupData(const std::string &json)
return LookupDataResultPtr();
}
- const std::string brokerUrlSsl = root.get("brokerUrlSsl",
defaultNotFoundString).asString();
- if (brokerUrlSsl == defaultNotFoundString) {
- LOG_ERROR("malformed json! - brokerUrlSsl not present" << json);
+ const std::string brokerUrlTls = root.get("brokerUrlTls",
defaultNotFoundString).asString();
+ if (brokerUrlTls == defaultNotFoundString) {
+ LOG_ERROR("malformed json! - brokerUrlTls not present" << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr =
boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
- lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
+ lookupDataResultPtr->setBrokerUrlTls(brokerUrlTls);
LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h
b/pulsar-client-cpp/lib/HTTPLookupService.h
index 66cd251..d34cf01 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -49,6 +49,9 @@ class HTTPLookupService : public LookupService, public
boost::enable_shared_from
std::string adminUrl_;
AuthenticationPtr authenticationPtr_;
int lookupTimeoutInSeconds_;
+ bool tlsAllowInsecure_;
+ bool isUseTls_;
+ std::string tlsTrustCertsFilePath_;
static LookupDataResultPtr parsePartitionData(const std::string&);
static LookupDataResultPtr parseLookupData(const std::string&);
diff --git a/pulsar-client-cpp/lib/LookupDataResult.h
b/pulsar-client-cpp/lib/LookupDataResult.h
index 5c1387a..93cd50b 100644
--- a/pulsar-client-cpp/lib/LookupDataResult.h
+++ b/pulsar-client-cpp/lib/LookupDataResult.h
@@ -31,9 +31,9 @@ typedef boost::shared_ptr<LookupDataResultPromise>
LookupDataResultPromisePtr;
class LookupDataResult {
public:
void setBrokerUrl(const std::string& brokerUrl) { brokerUrl_ = brokerUrl; }
- void setBrokerUrlSsl(const std::string& brokerUrlSsl) { brokerUrlSsl_ =
brokerUrlSsl; }
+ void setBrokerUrlTls(const std::string& brokerUrlTls) { brokerUrlTls_ =
brokerUrlTls; }
const std::string& getBrokerUrl() const { return brokerUrl_; }
- const std::string& getBrokerUrlSsl() const { return brokerUrlSsl_; }
+ const std::string& getBrokerUrlTls() const { return brokerUrlTls_; }
bool isAuthoritative() const { return authoritative; }
@@ -56,7 +56,7 @@ class LookupDataResult {
private:
friend inline std::ostream& operator<<(std::ostream& os, const
LookupDataResult& b);
std::string brokerUrl_;
- std::string brokerUrlSsl_;
+ std::string brokerUrlTls_;
int partitions;
bool authoritative;
bool redirect;
@@ -65,7 +65,7 @@ class LookupDataResult {
};
std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
- os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "]
[brokerUrlSsl_ = " << b.brokerUrlSsl_
+ os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "]
[brokerUrlTls_ = " << b.brokerUrlTls_
<< "] [partitions = " << b.partitions << "] [authoritative = " <<
b.authoritative
<< "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " <<
b.proxyThroughServiceUrl_
<< "] }";
diff --git a/pulsar-client-cpp/tests/AuthPluginTest.cc
b/pulsar-client-cpp/tests/AuthPluginTest.cc
index d65729f..7576d7f 100644
--- a/pulsar-client-cpp/tests/AuthPluginTest.cc
+++ b/pulsar-client-cpp/tests/AuthPluginTest.cc
@@ -141,6 +141,28 @@ TEST(AuthPluginTest, testTlsDetectPulsarSsl) {
ASSERT_EQ(ResultOk, result);
}
+TEST(AuthPluginTest, testTlsDetectHttps) {
+ ClientConfiguration config = ClientConfiguration();
+ config.setUseTls(true); // shouldn't be needed soon
+
config.setTlsTrustCertsFilePath("../../pulsar-broker/src/test/resources/authentication/tls/cacert.pem");
+ config.setTlsAllowInsecureConnection(false);
+ AuthenticationPtr auth =
+
pulsar::AuthTls::create("../../pulsar-broker/src/test/resources/authentication/tls/client-cert.pem",
+
"../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem");
+ config.setAuth(auth);
+
+ Client client("https://localhost:9766", config);
+
+ std::string topicName =
"persistent://property/cluster/namespace/test-tls-detect-https";
+
+ Producer producer;
+ Promise<Result, Producer> producerPromise;
+ client.createProducerAsync(topicName,
WaitForCallbackValue<Producer>(producerPromise));
+ Future<Result, Producer> producerFuture = producerPromise.getFuture();
+ Result result = producerFuture.get(producer);
+ ASSERT_EQ(ResultOk, result);
+}
+
namespace testAthenz {
std::string principalToken;
void mockZTS() {
diff --git
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
index 38ae7eb..c02baaa 100644
---
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
+++
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/DiscoveryService.java
@@ -158,7 +158,7 @@ public class DiscoveryService implements Closeable {
public String serviceUrlTls() {
if (config.isTlsEnabled()) {
- return new
StringBuilder("pulsar://").append(host()).append(":").append(config.getServicePortTls())
+ return new
StringBuilder("pulsar+ssl://").append(host()).append(":").append(config.getServicePortTls())
.toString();
} else {
return "";