This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c339b  Allow construction of c++ builtin auth plugins via factory 
(#2177)
a5c339b is described below

commit a5c339b4e84d037e6739e70f8467b6d2d2486178
Author: Ivan Kelly <[email protected]>
AuthorDate: Wed Jul 18 17:58:12 2018 +0100

    Allow construction of c++ builtin auth plugins via factory (#2177)
    
    * Allow construction of c++ builtin auth plugins via factory
    
    Previously, to create the TLS or athenz plugin you had to create the
    auth plugin explicitly. Some application need to configure auth based
    on user provided parameters, so they need a factory like method to
    select the plugin.
    
    There was already a factory, but it only took a library path as first
    parameter. Thus it was impossible to use it with TLS or Athenz.
    
    This patch adds handling to the factory, that if "tls", "athenz" or
    the names of their respective java plugins are passed in, the correct
    builtin plugin will be created.
    
    * Fixed typo
---
 pulsar-client-cpp/include/pulsar/Authentication.h |  8 ++-
 pulsar-client-cpp/lib/Authentication.cc           | 74 ++++++++++++++++-------
 pulsar-client-cpp/lib/auth/AuthAthenz.h           |  3 +
 pulsar-client-cpp/lib/auth/AuthTls.cc             |  4 ++
 pulsar-client-cpp/lib/auth/AuthTls.h              |  3 +
 pulsar-client-cpp/python/pulsar_test.py           | 60 ++++++++++++++++++
 6 files changed, 128 insertions(+), 24 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h 
b/pulsar-client-cpp/include/pulsar/Authentication.h
index 7656477..bde4134 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -85,9 +85,10 @@ class AuthFactory {
      * @param dynamicLibPath
      * @return
      */
-    static AuthenticationPtr create(const std::string& dynamicLibPath);
-    static AuthenticationPtr create(const std::string& dynamicLibPath, const 
std::string& authParamsString);
-    static AuthenticationPtr create(const std::string& dynamicLibPath, 
ParamMap& params);
+    static AuthenticationPtr create(const std::string& 
pluginNameOrDynamicLibPath);
+    static AuthenticationPtr create(const std::string& 
pluginNameOrDynamicLibPath,
+                                    const std::string& authParamsString);
+    static AuthenticationPtr create(const std::string& 
pluginNameOrDynamicLibPath, ParamMap& params);
 
    protected:
     static bool isShutdownHookRegistered_;
@@ -102,6 +103,7 @@ class AuthTls : public Authentication {
    public:
     AuthTls(AuthenticationDataPtr&);
     ~AuthTls();
+    static AuthenticationPtr create(ParamMap& params);
     static AuthenticationPtr create(const std::string& certificatePath, const 
std::string& privateKeyPath);
     const std::string getAuthMethodName() const;
     Result getAuthData(AuthenticationDataPtr& authDataTls) const;
diff --git a/pulsar-client-cpp/lib/Authentication.cc 
b/pulsar-client-cpp/lib/Authentication.cc
index 84bbf34..c1025e1 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -19,6 +19,8 @@
 #include <stdio.h>
 
 #include <pulsar/Authentication.h>
+#include "auth/AuthTls.h"
+#include "auth/AuthAthenz.h"
 #include <lib/LogUtils.h>
 
 #include <string>
@@ -32,7 +34,7 @@
 
 DECLARE_LOG_OBJECT()
 
-namespace pulsar {
+using namespace pulsar;
 
 AuthenticationDataProvider::AuthenticationDataProvider() {}
 
@@ -80,9 +82,9 @@ AuthenticationPtr AuthFactory::Disabled() {
     return AuthDisabled::create(params);
 }
 
-AuthenticationPtr AuthFactory::create(const std::string& dynamicLibPath) {
+AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibPath) {
     ParamMap params;
-    return AuthFactory::create(dynamicLibPath, params);
+    return AuthFactory::create(pluginNameOrDynamicLibPath, params);
 }
 
 boost::mutex mutex;
@@ -98,7 +100,18 @@ void AuthFactory::release_handles() {
     loadedLibrariesHandles_.clear();
 }
 
-AuthenticationPtr AuthFactory::create(const std::string& dynamicLibPath,
+AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, 
ParamMap& paramMap) {
+    if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
+        return AuthTls::create(paramMap);
+    } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
+               boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
+        return AuthAthenz::create(paramMap);
+    } else {
+        return AuthenticationPtr();
+    }
+}
+
+AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibPath,
                                       const std::string& authParamsString) {
     {
         boost::lock_guard<boost::mutex> lock(mutex);
@@ -107,8 +120,27 @@ AuthenticationPtr AuthFactory::create(const std::string& 
dynamicLibPath,
             AuthFactory::isShutdownHookRegistered_ = true;
         }
     }
+
+    ParamMap paramMap;
+    if (!authParamsString.empty()) {
+        std::vector<std::string> params;
+        boost::algorithm::split(params, authParamsString, 
boost::is_any_of(","));
+        for (int i = 0; i < params.size(); i++) {
+            std::vector<std::string> kv;
+            boost::algorithm::split(kv, params[i], boost::is_any_of(":"));
+            if (kv.size() == 2) {
+                paramMap[kv[0]] = kv[1];
+            }
+        }
+    }
+
+    AuthenticationPtr authPtr = 
tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, paramMap);
+    if (authPtr) {
+        return authPtr;
+    }
+
     Authentication* auth = NULL;
-    void* handle = dlopen(dynamicLibPath.c_str(), RTLD_LAZY);
+    void* handle = dlopen(pluginNameOrDynamicLibPath.c_str(), RTLD_LAZY);
     if (handle != NULL) {
         {
             boost::lock_guard<boost::mutex> lock(mutex);
@@ -119,25 +151,16 @@ AuthenticationPtr AuthFactory::create(const std::string& 
dynamicLibPath,
         if (createAuthentication != NULL) {
             auth = createAuthentication(authParamsString);
         } else {
-            ParamMap paramMap;
-            if (!authParamsString.empty()) {
-                std::vector<std::string> params;
-                boost::algorithm::split(params, authParamsString, 
boost::is_any_of(","));
-                for (int i = 0; i < params.size(); i++) {
-                    std::vector<std::string> kv;
-                    boost::algorithm::split(kv, params[i], 
boost::is_any_of(":"));
-                    if (kv.size() == 2) {
-                        paramMap[kv[0]] = kv[1];
-                    }
-                }
-            }
-            return AuthFactory::create(dynamicLibPath, paramMap);
+            return AuthFactory::create(pluginNameOrDynamicLibPath, paramMap);
         }
     }
+    if (!auth) {
+        LOG_WARN("Couldn't load auth plugin " << pluginNameOrDynamicLibPath);
+    }
     return AuthenticationPtr(auth);
 }
 
-AuthenticationPtr AuthFactory::create(const std::string& dynamicLibPath, 
ParamMap& params) {
+AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibPath, ParamMap& params) {
     {
         boost::lock_guard<boost::mutex> lock(mutex);
         if (!AuthFactory::isShutdownHookRegistered_) {
@@ -145,8 +168,14 @@ AuthenticationPtr AuthFactory::create(const std::string& 
dynamicLibPath, ParamMa
             AuthFactory::isShutdownHookRegistered_ = true;
         }
     }
+
+    AuthenticationPtr authPtr = 
tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, params);
+    if (authPtr) {
+        return authPtr;
+    }
+
     Authentication* auth = NULL;
-    void* handle = dlopen(dynamicLibPath.c_str(), RTLD_LAZY);
+    void* handle = dlopen(pluginNameOrDynamicLibPath.c_str(), RTLD_LAZY);
     if (handle != NULL) {
         boost::lock_guard<boost::mutex> lock(mutex);
         loadedLibrariesHandles_.push_back(handle);
@@ -156,6 +185,9 @@ AuthenticationPtr AuthFactory::create(const std::string& 
dynamicLibPath, ParamMa
             auth = createAuthentication(params);
         }
     }
+    if (!auth) {
+        LOG_WARN("Couldn't load auth plugin " << pluginNameOrDynamicLibPath);
+    }
+
     return AuthenticationPtr(auth);
 }
-}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/auth/AuthAthenz.h 
b/pulsar-client-cpp/lib/auth/AuthAthenz.h
index 7add5fc..70bca7c 100644
--- a/pulsar-client-cpp/lib/auth/AuthAthenz.h
+++ b/pulsar-client-cpp/lib/auth/AuthAthenz.h
@@ -25,6 +25,9 @@
 
 namespace pulsar {
 
+const std::string ATHENZ_PLUGIN_NAME = "athenz";
+const std::string ATHENZ_JAVA_PLUGIN_NAME = 
"org.apache.pulsar.client.impl.auth.AuthenticationAthenz";
+
 class AuthDataAthenz : public AuthenticationDataProvider {
    public:
     AuthDataAthenz(ParamMap& params);
diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc 
b/pulsar-client-cpp/lib/auth/AuthTls.cc
index d449a39..f076aaf 100644
--- a/pulsar-client-cpp/lib/auth/AuthTls.cc
+++ b/pulsar-client-cpp/lib/auth/AuthTls.cc
@@ -36,6 +36,10 @@ AuthTls::AuthTls(AuthenticationDataPtr& authDataTls) { 
authDataTls_ = authDataTl
 
 AuthTls::~AuthTls() {}
 
+AuthenticationPtr AuthTls::create(ParamMap& params) {
+    return create(params["tlsCertFile"], params["tlsKeyFile"]);
+}
+
 AuthenticationPtr AuthTls::create(const std::string& certificatePath, const 
std::string& privateKeyPath) {
     AuthenticationDataPtr authDataTls =
         AuthenticationDataPtr(new AuthDataTls(certificatePath, 
privateKeyPath));
diff --git a/pulsar-client-cpp/lib/auth/AuthTls.h 
b/pulsar-client-cpp/lib/auth/AuthTls.h
index e9b711d..510aea0 100644
--- a/pulsar-client-cpp/lib/auth/AuthTls.h
+++ b/pulsar-client-cpp/lib/auth/AuthTls.h
@@ -24,6 +24,9 @@
 
 namespace pulsar {
 
+const std::string TLS_PLUGIN_NAME = "tls";
+const std::string TLS_JAVA_PLUGIN_NAME = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+
 class AuthDataTls : public AuthenticationDataProvider {
    public:
     AuthDataTls(const std::string& certificatePath, const std::string& 
privateKeyPath);
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index 200a107..25564b7 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -152,6 +152,66 @@ class PulsarTest(TestCase):
 
         client.close()
 
+    def test_tls_auth2(self):
+        certs_dir = 
'/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
+        if not os.path.exists(certs_dir):
+            certs_dir = 
"../../pulsar-broker/src/test/resources/authentication/tls/"
+        authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"
+        authParams = 
"tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (certs_dir, 
certs_dir)
+
+        client = Client(self.serviceUrlTls,
+                        tls_trust_certs_file_path=certs_dir + 'cacert.pem',
+                        tls_allow_insecure_connection=False,
+                        authentication=Authentication(authPlugin, authParams))
+
+        consumer = 
client.subscribe('persistent://property/cluster/namespace/my-python-topic-producer-consumer',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = 
client.create_producer('persistent://property/cluster/namespace/my-python-topic-producer-consumer')
+        producer.send('hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+
+        try:
+            msg = consumer.receive(100)
+            self.assertTrue(False)  # Should not reach this point
+        except:
+            pass  # Exception is expected
+
+        client.close()
+
+    def test_tls_auth3(self):
+        certs_dir = 
'/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
+        if not os.path.exists(certs_dir):
+            certs_dir = 
"../../pulsar-broker/src/test/resources/authentication/tls/"
+        authPlugin = "tls"
+        authParams = 
"tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (certs_dir, 
certs_dir)
+
+        client = Client(self.serviceUrlTls,
+                        tls_trust_certs_file_path=certs_dir + 'cacert.pem',
+                        tls_allow_insecure_connection=False,
+                        authentication=Authentication(authPlugin, authParams))
+
+        consumer = 
client.subscribe('persistent://property/cluster/namespace/my-python-topic-producer-consumer',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = 
client.create_producer('persistent://property/cluster/namespace/my-python-topic-producer-consumer')
+        producer.send('hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+
+        try:
+            msg = consumer.receive(100)
+            self.assertTrue(False)  # Should not reach this point
+        except:
+            pass  # Exception is expected
+
+        client.close()
+
     def test_auth_junk_params(self):
         certs_dir = 
'/pulsar/pulsar-broker/src/test/resources/authentication/tls/'
         if not os.path.exists(certs_dir):

Reply via email to