This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 019ae66  PIP-25: C++ / Python / Go implementation for token 
authentication (#3067)
019ae66 is described below

commit 019ae66658201faca134b611ac8cfc3dcdf63d43
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 28 20:42:43 2018 -0800

    PIP-25: C++ / Python / Go implementation for token authentication (#3067)
    
    * PIP-25: C++ implementation for token authentication
    
    * PIP-25: C and Go implementation for token authentication
    
    * PIP-25: Python implementation
    
    * Addressed comments
    
    * Added missing <sstream> include
    
    * Fixed argument name that was changed earlier
    
    * Fixed secret key path
---
 pulsar-client-cpp/include/pulsar/Authentication.h  |  38 ++++
 .../include/pulsar/c/authentication.h              |   6 +
 pulsar-client-cpp/lib/Authentication.cc            |   7 +
 pulsar-client-cpp/lib/auth/AuthToken.cc            | 117 ++++++++++++
 .../c/authentication.h => lib/auth/AuthToken.h}    |  36 ++--
 pulsar-client-cpp/lib/c/c_Authentication.cc        |  22 +++
 pulsar-client-cpp/pulsar-test-service-start.sh     |  13 ++
 pulsar-client-cpp/python/pulsar/__init__.py        |  18 ++
 pulsar-client-cpp/python/pulsar_test.py            |  41 +++-
 pulsar-client-cpp/python/src/authentication.cc     |  57 ++++++
 pulsar-client-cpp/test-conf/standalone-ssl.conf    |   4 +-
 pulsar-client-cpp/tests/AuthTokenTest.cc           | 206 +++++++++++++++++++++
 pulsar-client-go/pulsar/c_client.go                |  31 ++++
 pulsar-client-go/pulsar/c_go_pulsar.h              |   6 +
 pulsar-client-go/pulsar/client.go                  |  10 +
 pulsar-client-go/pulsar/client_test.go             |  71 +++++++
 16 files changed, 664 insertions(+), 19 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h 
b/pulsar-client-cpp/include/pulsar/Authentication.h
index 2ea1238..659f593 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -25,6 +25,7 @@
 #include <boost/shared_ptr.hpp>
 #include <pulsar/Result.h>
 #include <boost/make_shared.hpp>
+#include <boost/function.hpp>
 
 #pragma GCC visibility push(default)
 
@@ -114,6 +115,43 @@ class AuthTls : public Authentication {
     AuthenticationDataPtr authDataTls_;
 };
 
+typedef boost::function<std::string()> TokenSupplier;
+
+/**
+ * Token based implementation of Pulsar client authentication
+ */
+class AuthToken : public Authentication {
+   public:
+    AuthToken(AuthenticationDataPtr&);
+    ~AuthToken();
+
+    static AuthenticationPtr create(ParamMap& params);
+
+    static AuthenticationPtr create(const std::string& authParamsString);
+
+    /**
+     * Create an authentication provider for token based authentication.
+     *
+     * @param token
+     *            a string containing the auth token
+     */
+    static AuthenticationPtr createWithToken(const std::string& token);
+
+    /**
+     * Create an authentication provider for token based authentication.
+     *
+     * @param tokenSupplier
+     *            a supplier of the client auth token
+     */
+    static AuthenticationPtr create(const TokenSupplier& tokenSupplier);
+
+    const std::string getAuthMethodName() const;
+    Result getAuthData(AuthenticationDataPtr& authDataToken) const;
+
+   private:
+    AuthenticationDataPtr authDataToken_;
+};
+
 /**
  * Athenz implementation of Pulsar client authentication
  */
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h 
b/pulsar-client-cpp/include/pulsar/c/authentication.h
index c3a4d32..b060bbc 100644
--- a/pulsar-client-cpp/include/pulsar/c/authentication.h
+++ b/pulsar-client-cpp/include/pulsar/c/authentication.h
@@ -27,12 +27,18 @@ extern "C" {
 
 typedef struct _pulsar_authentication pulsar_authentication_t;
 
+typedef char *(*token_supplier)(void *);
+
 pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
                                                       const char 
*authParamsString);
 
 pulsar_authentication_t *pulsar_authentication_tls_create(const char 
*certificatePath,
                                                           const char 
*privateKeyPath);
 
+pulsar_authentication_t *pulsar_authentication_token_create(const char *token);
+pulsar_authentication_t 
*pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
+                                                                          void 
*ctx);
+
 pulsar_authentication_t *pulsar_authentication_athenz_create(const char 
*authParamsString);
 
 void pulsar_authentication_free(pulsar_authentication_t *authentication);
diff --git a/pulsar-client-cpp/lib/Authentication.cc 
b/pulsar-client-cpp/lib/Authentication.cc
index b3ebf1c..4d4c5d8 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -21,6 +21,7 @@
 #include <pulsar/Authentication.h>
 #include "auth/AuthTls.h"
 #include "auth/AuthAthenz.h"
+#include "auth/AuthToken.h"
 #include <lib/LogUtils.h>
 
 #include <string>
@@ -119,6 +120,9 @@ void AuthFactory::release_handles() {
 AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, 
ParamMap& paramMap) {
     if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
         return AuthTls::create(paramMap);
+    } else if (boost::iequals(pluginName, TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthToken::create(paramMap);
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(paramMap);
@@ -130,6 +134,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& 
pluginName, ParamMap&
 AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const 
std::string& authParamsString) {
     if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
         return AuthTls::create(authParamsString);
+    } else if (boost::iequals(pluginName, TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthToken::create(authParamsString);
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(authParamsString);
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.cc 
b/pulsar-client-cpp/lib/auth/AuthToken.cc
new file mode 100644
index 0000000..bebfe4e
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthToken.cc
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "AuthToken.h"
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/bind.hpp>
+
+#include <sstream>
+#include <fstream>
+
+namespace pulsar {
+
+// AuthDataToken
+
+AuthDataToken::AuthDataToken(const TokenSupplier &tokenSupplier) { 
tokenSupplier_ = tokenSupplier; }
+
+AuthDataToken::~AuthDataToken() {}
+
+bool AuthDataToken::hasDataForHttp() { return true; }
+
+std::string AuthDataToken::getHttpHeaders() { return "Authorization: Bearer " 
+ tokenSupplier_(); }
+
+bool AuthDataToken::hasDataFromCommand() { return true; }
+
+std::string AuthDataToken::getCommandData() { return tokenSupplier_(); }
+
+static std::string readDirect(const std::string &token) { return token; }
+
+static std::string readFromFile(const std::string &tokenFilePath) {
+    std::ifstream input(tokenFilePath);
+    std::stringstream buffer;
+    buffer << input.rdbuf();
+    return buffer.str();
+}
+
+static std::string readFromEnv(const std::string &envVarName) {
+    char *value = getenv(envVarName.c_str());
+    if (!value) {
+        throw "Failed to read environment variable " + envVarName;
+    }
+    return std::string(value);
+}
+
+// AuthToken
+
+AuthToken::AuthToken(AuthenticationDataPtr &authDataToken) { authDataToken_ = 
authDataToken; }
+
+AuthToken::~AuthToken() {}
+
+AuthenticationPtr AuthToken::create(ParamMap &params) {
+    if (params.find("token") != params.end()) {
+        return create(boost::bind(&readDirect, params["token"]));
+    } else if (params.find("file") != params.end()) {
+        // Read token from a file
+        return create(boost::bind(&readFromFile, params["file"]));
+    } else if (params.find("env") != params.end()) {
+        // Read token from environment variable
+        std::string envVarName = params["env"];
+        return create(boost::bind(&readFromEnv, envVarName));
+    } else {
+        throw "Invalid configuration for token provider";
+    }
+}
+
+AuthenticationPtr AuthToken::create(const std::string &authParamsString) {
+    ParamMap params;
+    if (boost::starts_with(authParamsString, "token:")) {
+        std::string token = authParamsString.substr(strlen("token:"));
+        params["token"] = token;
+    } else if (boost::starts_with(authParamsString, "file:")) {
+        // Read token from a file
+        std::string filePath = authParamsString.substr(strlen("file://"));
+        params["file"] = filePath;
+    } else if (boost::starts_with(authParamsString, "env:")) {
+        std::string envVarName = authParamsString.substr(strlen("env:"));
+        params["env"] = envVarName;
+    } else {
+        std::string token = authParamsString;
+        params["token"] = token;
+    }
+
+    return create(params);
+}
+
+AuthenticationPtr AuthToken::createWithToken(const std::string &token) {
+    return create(boost::bind(&readDirect, token));
+}
+
+AuthenticationPtr AuthToken::create(const TokenSupplier &tokenSupplier) {
+    AuthenticationDataPtr authDataToken = AuthenticationDataPtr(new 
AuthDataToken(tokenSupplier));
+    return AuthenticationPtr(new AuthToken(authDataToken));
+}
+
+const std::string AuthToken::getAuthMethodName() const { return "token"; }
+
+Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) const {
+    authDataContent = authDataToken_;
+    return ResultOk;
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h 
b/pulsar-client-cpp/lib/auth/AuthToken.h
similarity index 54%
copy from pulsar-client-cpp/include/pulsar/c/authentication.h
copy to pulsar-client-cpp/lib/auth/AuthToken.h
index c3a4d32..8473fe3 100644
--- a/pulsar-client-cpp/include/pulsar/c/authentication.h
+++ b/pulsar-client-cpp/lib/auth/AuthToken.h
@@ -19,26 +19,28 @@
 
 #pragma once
 
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <pulsar/Authentication.h>
+#include <string>
+#include <boost/function.hpp>
 
-#pragma GCC visibility push(default)
+namespace pulsar {
 
-typedef struct _pulsar_authentication pulsar_authentication_t;
+const std::string TOKEN_PLUGIN_NAME = "token";
+const std::string TOKEN_JAVA_PLUGIN_NAME = 
"org.apache.pulsar.client.impl.auth.AuthenticationToken";
 
-pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
-                                                      const char 
*authParamsString);
+class AuthDataToken : public AuthenticationDataProvider {
+   public:
+    AuthDataToken(const std::string& token);
+    AuthDataToken(const TokenSupplier& tokenSupplier);
+    ~AuthDataToken();
 
-pulsar_authentication_t *pulsar_authentication_tls_create(const char 
*certificatePath,
-                                                          const char 
*privateKeyPath);
+    bool hasDataForHttp();
+    std::string getHttpHeaders();
+    bool hasDataFromCommand();
+    std::string getCommandData();
 
-pulsar_authentication_t *pulsar_authentication_athenz_create(const char 
*authParamsString);
+   private:
+    TokenSupplier tokenSupplier_;
+};
 
-void pulsar_authentication_free(pulsar_authentication_t *authentication);
-
-#pragma GCC visibility pop
-
-#ifdef __cplusplus
-}
-#endif
\ No newline at end of file
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc 
b/pulsar-client-cpp/lib/c/c_Authentication.cc
index c03f239..83d344b 100644
--- a/pulsar-client-cpp/lib/c/c_Authentication.cc
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -23,6 +23,8 @@
 
 #include "c_structs.h"
 
+#include <cstdlib>
+
 pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
                                                       const char 
*authParamsString) {
     pulsar_authentication_t *authentication = new pulsar_authentication_t;
@@ -43,4 +45,24 @@ pulsar_authentication_t 
*pulsar_authentication_athenz_create(const char *authPar
     pulsar_authentication_t *authentication = new pulsar_authentication_t;
     authentication->auth = pulsar::AuthAthenz::create(authParamsString);
     return authentication;
+}
+
+pulsar_authentication_t *pulsar_authentication_token_create(const char *token) 
{
+    pulsar_authentication_t *authentication = new pulsar_authentication_t;
+    authentication->auth = pulsar::AuthToken::createWithToken(token);
+    return authentication;
+}
+
+static std::string tokenSupplierWrapper(token_supplier supplier, void *ctx) {
+    const char *token = supplier(ctx);
+    std::string tokenStr = token;
+    free((void *)token);
+    return tokenStr;
+}
+
+pulsar_authentication_t 
*pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
+                                                                          void 
*ctx) {
+    pulsar_authentication_t *authentication = new pulsar_authentication_t;
+    authentication->auth = 
pulsar::AuthToken::create(boost::bind(&tokenSupplierWrapper, tokenSupplier, 
ctx));
+    return authentication;
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh 
b/pulsar-client-cpp/pulsar-test-service-start.sh
index 914354a..665758e 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -44,6 +44,15 @@ mkdir -p $DATA_DIR
 mkdir -p $DATA_DIR/certs
 cp $SRC_DIR/pulsar-broker/src/test/resources/authentication/tls/*.pem 
$DATA_DIR/certs
 
+# Generate secret key and token
+mkdir -p $DATA_DIR/tokens
+$PULSAR_DIR/bin/pulsar tokens create-secret-key --output 
$DATA_DIR/tokens/secret.key
+
+$PULSAR_DIR/bin/pulsar tokens create \
+            --subject token-principal \
+            --secret-key file:///$DATA_DIR/tokens/secret.key \
+            > $DATA_DIR/tokens/token.txt
+
 export 
PULSAR_STANDALONE_CONF=$SRC_DIR/pulsar-client-cpp/test-conf/standalone-ssl.conf
 $PULSAR_DIR/bin/pulsar-daemon start standalone \
         --no-functions-worker --no-stream-storage \
@@ -95,4 +104,8 @@ $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c 
"standalone"
 # Create "private/auth" with required authentication
 $PULSAR_DIR/bin/pulsar-admin namespaces create private/auth --clusters 
standalone
 
+$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission private/auth \
+                        --actions produce,consume \
+                        --role "token-principal"
+
 echo "-- Ready to start tests"
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 54fee68..8004fa6 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -211,6 +211,24 @@ class AuthenticationTLS(Authentication):
         _check_type(str, private_key_path, 'private_key_path')
         self.auth = _pulsar.AuthenticationTLS(certificate_path, 
private_key_path)
 
+
+class AuthenticationToken(Authentication):
+    """
+    Token based authentication implementation
+    """
+    def __init__(self, token):
+        """
+        Create the token authentication provider instance.
+
+        **Args**
+
+        * `token`: A string containing the token or a functions that provides a
+                   string with the token
+        """
+        if not (isinstance(token, str) or callable(token)):
+            raise ValueError("Argument token is expected to be of type 'str' 
or a function returning 'str'")
+        self.auth = _pulsar.AuthenticationToken(token)
+
 class AuthenticationAthenz(Authentication):
     """
     Athenz Authentication implementation
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index e9ecbc1..158c79c 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -24,7 +24,7 @@ import time
 import os
 from pulsar import Client, MessageId, \
             CompressionType, ConsumerType, PartitionsRoutingMode, \
-            AuthenticationTLS, Authentication
+            AuthenticationTLS, Authentication, AuthenticationToken
 
 from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
@@ -793,6 +793,45 @@ class PulsarTest(TestCase):
                          [topic_non_partitioned])
         client.close()
 
+    def test_token_auth(self):
+        with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+            token = tf.read().strip()
+
+        # Use adminUrl to test both HTTP request and binary protocol
+        client = Client(self.adminUrl,
+                        authentication=AuthenticationToken(token))
+
+        consumer = 
client.subscribe('persistent://private/auth/my-python-topic-token-auth',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = 
client.create_producer('persistent://private/auth/my-python-topic-token-auth')
+        producer.send(b'hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+        client.close()
+
+    def test_token_auth_supplier(self):
+        def read_token():
+            with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+                return tf.read().strip()
+
+        client = Client(self.serviceUrl,
+                        authentication=AuthenticationToken(read_token))
+        consumer = 
client.subscribe('persistent://private/auth/my-python-topic-token-auth',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = 
client.create_producer('persistent://private/auth/my-python-topic-token-auth')
+        producer.send(b'hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+        client.close()
+
+    #####
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/python/src/authentication.cc 
b/pulsar-client-cpp/python/src/authentication.cc
index 9cfb2e7..9547c99 100644
--- a/pulsar-client-cpp/python/src/authentication.cc
+++ b/pulsar-client-cpp/python/src/authentication.cc
@@ -32,6 +32,59 @@ struct AuthenticationTlsWrapper : public 
AuthenticationWrapper {
     }
 };
 
+struct TokenSupplierWrapper {
+    PyObject* _pySupplier;
+
+    TokenSupplierWrapper(py::object pySupplier) :
+        _pySupplier(pySupplier.ptr()) {
+        Py_XINCREF(_pySupplier);
+    }
+
+    TokenSupplierWrapper(const TokenSupplierWrapper& other) {
+        _pySupplier= other._pySupplier;
+        Py_XINCREF(_pySupplier);
+    }
+
+    TokenSupplierWrapper& operator=(const TokenSupplierWrapper& other) {
+        _pySupplier = other._pySupplier;
+        Py_XINCREF(_pySupplier);
+        return *this;
+    }
+
+    virtual ~TokenSupplierWrapper() {
+        Py_XDECREF(_pySupplier);
+    }
+
+    std::string operator()() {
+        PyGILState_STATE state = PyGILState_Ensure();
+
+        std::string token;
+        try {
+            token = py::call<std::string>(_pySupplier);
+        } catch (py::error_already_set e) {
+            PyErr_Print();
+        }
+
+        PyGILState_Release(state);
+        return token;
+    }
+};
+
+
+struct AuthenticationTokenWrapper : public AuthenticationWrapper {
+    AuthenticationTokenWrapper(py::object token) :
+            AuthenticationWrapper() {
+        if (py::extract<std::string>(token).check()) {
+            // It's a string
+            std::string tokenStr = py::extract<std::string>(token);
+            this->auth = AuthToken::createWithToken(tokenStr);
+        } else {
+            // It's a function object
+            this->auth = AuthToken::create(TokenSupplierWrapper(token));
+        }
+    }
+};
+
 struct AuthenticationAthenzWrapper : public AuthenticationWrapper {
     AuthenticationAthenzWrapper(const std::string& authParamsString) :
             AuthenticationWrapper() {
@@ -49,6 +102,10 @@ void export_authentication() {
                                                                     init<const 
std::string&, const std::string&>())
             ;
 
+    class_<AuthenticationTokenWrapper, bases<AuthenticationWrapper> 
>("AuthenticationToken",
+                                                                    
init<py::object>())
+            ;
+
     class_<AuthenticationAthenzWrapper, bases<AuthenticationWrapper> 
>("AuthenticationAthenz",
                                                                        
init<const std::string&>())
             ;
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf 
b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index da9123c..c7e6d3c 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -98,11 +98,13 @@ anonymousUserRole=anonymous
 authenticationEnabled=true
 
 # Autentication provider name list, which is comma separated list of class 
names
-authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls,org.apache.pulsar.broker.authentication.AuthenticationProviderToken
 
 # Enforce authorization
 authorizationEnabled=true
 
+tokenSecretKey=file:///tmp/pulsar-test-data/tokens/secret.key
+
 # Role names that are treated as "super-user", meaning they will be able to do 
all admin
 # operations and publish/consume from all topics
 superUserRoles=localhost,superUser
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc 
b/pulsar-client-cpp/tests/AuthTokenTest.cc
new file mode 100644
index 0000000..a77492c
--- /dev/null
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/Authentication.h>
+
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <boost/lexical_cast.hpp>
+#include <boost/asio.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/thread.hpp>
+#include <lib/LogUtils.h>
+
+#include <string>
+#include <fstream>
+#include <streambuf>
+
+#include "lib/Future.h"
+#include "lib/Utils.h"
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string serviceUrlHttp = "http://localhost:8080";;
+
+static const std::string tokenPath = "/tmp/pulsar-test-data/tokens/token.txt";
+
+static std::string getToken() {
+    std::ifstream file(tokenPath);
+    std::string str((std::istreambuf_iterator<char>(file)), 
std::istreambuf_iterator<char>());
+    return str;
+}
+
+TEST(AuthPluginToken, testToken) {
+    ClientConfiguration config = ClientConfiguration();
+    std::string token = getToken();
+    AuthenticationPtr auth = pulsar::AuthToken::createWithToken(token);
+
+    ASSERT_TRUE(auth != NULL);
+    ASSERT_EQ(auth->getAuthMethodName(), "token");
+
+    pulsar::AuthenticationDataPtr data;
+    ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk);
+    ASSERT_EQ(data->hasDataFromCommand(), true);
+    ASSERT_EQ(data->getCommandData(), token);
+    ASSERT_EQ(data->hasDataForTls(), false);
+    ASSERT_EQ(data->hasDataForHttp(), true);
+    ASSERT_EQ(auth.use_count(), 1);
+
+    config.setAuth(auth);
+    Client client(serviceUrl, config);
+
+    std::string topicName = "persistent://private/auth/test-token";
+    std::string subName = "subscription-name";
+    int numOfMessages = 10;
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // handling dangling subscriptions
+    consumer.unsubscribe();
+    client.subscribe(topicName, subName, consumer);
+
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    // Send Asynchronously
+    std::string prefix = "test-token-message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, NULL);
+        LOG_INFO("sending message " << messageContent);
+    }
+
+    producer.flush();
+
+    Message receivedMsg;
+    for (int i = 0; i < numOfMessages; i++) {
+        Result res = consumer.receive(receivedMsg);
+        ASSERT_EQ(ResultOk, res);
+
+        std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        LOG_INFO("Received Message with [ content - "
+                 << receivedMsg.getDataAsString() << "] [ messageID = " << 
receivedMsg.getMessageId() << "]");
+        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i));
+        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+}
+
+TEST(AuthPluginToken, testTokenWithHttpUrl) {
+    ClientConfiguration config = ClientConfiguration();
+    std::string token = getToken();
+    config.setAuth(pulsar::AuthToken::createWithToken(token));
+    Client client(serviceUrlHttp, config);
+
+    std::string topicName = "persistent://private/auth/test-token-http";
+    std::string subName = "subscription-name";
+    int numOfMessages = 10;
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // handling dangling subscriptions
+    consumer.unsubscribe();
+    client.subscribe(topicName, subName, consumer);
+
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    // Send Asynchronously
+    std::string prefix = "test-token-message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, NULL);
+        LOG_INFO("sending message " << messageContent);
+    }
+
+    producer.flush();
+
+    Message receivedMsg;
+    for (int i = 0; i < numOfMessages; i++) {
+        Result res = consumer.receive(receivedMsg);
+        ASSERT_EQ(ResultOk, res);
+
+        std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        LOG_INFO("Received Message with [ content - "
+                 << receivedMsg.getDataAsString() << "] [ messageID = " << 
receivedMsg.getMessageId() << "]");
+        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i));
+        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+}
+
+TEST(AuthPluginToken, testNoAuth) {
+    ClientConfiguration config;
+    Client client(serviceUrl, config);
+
+    std::string topicName = "persistent://private/auth/test-token";
+    std::string subName = "subscription-name";
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultConnectError, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultConnectError, result);
+}
+
+TEST(AuthPluginToken, testNoAuthWithHttp) {
+    ClientConfiguration config;
+    Client client(serviceUrlHttp, config);
+
+    std::string topicName = "persistent://private/auth/test-token";
+    std::string subName = "subscription-name";
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultConnectError, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultConnectError, result);
+}
diff --git a/pulsar-client-go/pulsar/c_client.go 
b/pulsar-client-go/pulsar/c_client.go
index e513b08..5787260 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -127,6 +127,37 @@ func newAuthenticationTLS(certificatePath string, 
privateKeyPath string) Authent
        return auth
 }
 
+func newAuthenticationToken(token string) Authentication {
+       cToken := C.CString(token)
+       defer C.free(unsafe.Pointer(cToken))
+
+       auth := &authentication{
+               ptr: C.pulsar_authentication_token_create(cToken),
+       }
+
+       runtime.SetFinalizer(auth, authenticationFinalizer)
+       return auth
+}
+
+//export pulsarClientTokenSupplierProxy
+func pulsarClientTokenSupplierProxy(ctx unsafe.Pointer) *C.char {
+       tokenSupplier := restorePointerNoDelete(ctx).(func() string)
+       token := tokenSupplier()
+       // The C string will be freed from within the C wrapper itself
+       return C.CString(token);
+}
+
+func newAuthenticationTokenSupplier(tokenSupplier func() string) 
Authentication {
+       supplierPtr := savePointer(tokenSupplier)
+
+       auth := &authentication{
+               ptr: 
C._pulsar_authentication_token_create_with_supplier(supplierPtr),
+       }
+
+       runtime.SetFinalizer(auth, authenticationFinalizer)
+       return auth
+}
+
 func newAuthenticationAthenz(authParams string) Authentication {
        cAuthParams := C.CString(authParams)
        defer C.free(unsafe.Pointer(cAuthParams))
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h 
b/pulsar-client-go/pulsar/c_go_pulsar.h
index cdbebf8..a0ca786 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -35,6 +35,12 @@ static inline void 
_pulsar_client_configuration_set_logger(pulsar_client_configu
     pulsar_client_configuration_set_logger(conf, pulsarClientLoggerConstProxy, 
ctx);
 }
 
+char* pulsarClientTokenSupplierProxy(void* ctx);
+
+static inline pulsar_authentication_t* 
_pulsar_authentication_token_create_with_supplier(void *ctx) {
+    return 
pulsar_authentication_token_create_with_supplier(pulsarClientTokenSupplierProxy,
 ctx);
+}
+
 void pulsarCreateProducerCallbackProxy(pulsar_result result, pulsar_producer_t 
*producer, void *ctx);
 
 static inline void _pulsar_client_create_producer_async(pulsar_client_t 
*client, const char *topic,
diff --git a/pulsar-client-go/pulsar/client.go 
b/pulsar-client-go/pulsar/client.go
index 405c8a3..ed07fbb 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -28,6 +28,16 @@ func NewClient(options ClientOptions) (Client, error) {
 // Opaque interface that represents the authentication credentials
 type Authentication interface {}
 
+// Create new Authentication provider with specified auth token
+func NewAuthenticationToken(token string) Authentication {
+       return newAuthenticationToken(token)
+}
+
+// Create new Authentication provider with specified auth token supplier
+func NewAuthenticationTokenSupplier(tokenSupplier func() string) 
Authentication {
+       return newAuthenticationTokenSupplier(tokenSupplier)
+}
+
 // Create new Authentication provider with specified TLS certificate and 
private key
 func NewAuthenticationTLS(certificatePath string, privateKeyPath string) 
Authentication {
        return newAuthenticationTLS(certificatePath, privateKeyPath)
diff --git a/pulsar-client-go/pulsar/client_test.go 
b/pulsar-client-go/pulsar/client_test.go
index 7c200ac..61e4770 100644
--- a/pulsar-client-go/pulsar/client_test.go
+++ b/pulsar-client-go/pulsar/client_test.go
@@ -20,7 +20,9 @@
 package pulsar
 
 import (
+       "context"
        "fmt"
+       "io/ioutil"
        "testing"
 )
 
@@ -54,3 +56,72 @@ func TestGetTopicPartitions(t *testing.T) {
        assertEqual(t, len(partitions), 1)
        assertEqual(t, partitions[0], topic)
 }
+
+const TestTokenFilePath = "/tmp/pulsar-test-data/certs/token.txt"
+
+func readToken(t *testing.T) string {
+       data, err := ioutil.ReadFile(TestTokenFilePath)
+       assertNil(t, err)
+
+       return string(data)
+}
+
+func TestTokenAuth(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:            "pulsar://localhost:6650",
+               Authentication: NewAuthenticationToken(readToken(t)),
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       topic := "persistent://private/auth/TestTokenAuth"
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assertNil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+}
+
+func TestTokenAuthSupplier(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:            "pulsar://localhost:6650",
+               Authentication: NewAuthenticationTokenSupplier(func () string {
+                       return readToken(t)
+               }),
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       topic := "persistent://private/auth/TestTokenAuth"
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assertNil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+}

Reply via email to