merlimat closed pull request #3067: PIP-25: C++ / Python / Go implementation 
for token authentication
URL: https://github.com/apache/pulsar/pull/3067
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h 
b/pulsar-client-cpp/include/pulsar/Authentication.h
index 2ea1238e2a..659f593803 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -25,6 +25,7 @@
 #include <boost/shared_ptr.hpp>
 #include <pulsar/Result.h>
 #include <boost/make_shared.hpp>
+#include <boost/function.hpp>
 
 #pragma GCC visibility push(default)
 
@@ -114,6 +115,43 @@ class AuthTls : public Authentication {
     AuthenticationDataPtr authDataTls_;
 };
 
+typedef boost::function<std::string()> TokenSupplier;
+
+/**
+ * Token based implementation of Pulsar client authentication
+ */
+class AuthToken : public Authentication {
+   public:
+    AuthToken(AuthenticationDataPtr&);
+    ~AuthToken();
+
+    static AuthenticationPtr create(ParamMap& params);
+
+    static AuthenticationPtr create(const std::string& authParamsString);
+
+    /**
+     * Create an authentication provider for token based authentication.
+     *
+     * @param token
+     *            a string containing the auth token
+     */
+    static AuthenticationPtr createWithToken(const std::string& token);
+
+    /**
+     * Create an authentication provider for token based authentication.
+     *
+     * @param tokenSupplier
+     *            a supplier of the client auth token
+     */
+    static AuthenticationPtr create(const TokenSupplier& tokenSupplier);
+
+    const std::string getAuthMethodName() const;
+    Result getAuthData(AuthenticationDataPtr& authDataToken) const;
+
+   private:
+    AuthenticationDataPtr authDataToken_;
+};
+
 /**
  * Athenz implementation of Pulsar client authentication
  */
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h 
b/pulsar-client-cpp/include/pulsar/c/authentication.h
index c3a4d32669..b060bbc1d9 100644
--- a/pulsar-client-cpp/include/pulsar/c/authentication.h
+++ b/pulsar-client-cpp/include/pulsar/c/authentication.h
@@ -27,12 +27,18 @@ extern "C" {
 
 typedef struct _pulsar_authentication pulsar_authentication_t;
 
+typedef char *(*token_supplier)(void *);
+
 pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
                                                       const char 
*authParamsString);
 
 pulsar_authentication_t *pulsar_authentication_tls_create(const char 
*certificatePath,
                                                           const char 
*privateKeyPath);
 
+pulsar_authentication_t *pulsar_authentication_token_create(const char *token);
+pulsar_authentication_t 
*pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
+                                                                          void 
*ctx);
+
 pulsar_authentication_t *pulsar_authentication_athenz_create(const char 
*authParamsString);
 
 void pulsar_authentication_free(pulsar_authentication_t *authentication);
diff --git a/pulsar-client-cpp/lib/Authentication.cc 
b/pulsar-client-cpp/lib/Authentication.cc
index b3ebf1c633..4d4c5d8588 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -21,6 +21,7 @@
 #include <pulsar/Authentication.h>
 #include "auth/AuthTls.h"
 #include "auth/AuthAthenz.h"
+#include "auth/AuthToken.h"
 #include <lib/LogUtils.h>
 
 #include <string>
@@ -119,6 +120,9 @@ void AuthFactory::release_handles() {
 AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, 
ParamMap& paramMap) {
     if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
         return AuthTls::create(paramMap);
+    } else if (boost::iequals(pluginName, TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthToken::create(paramMap);
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(paramMap);
@@ -130,6 +134,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& 
pluginName, ParamMap&
 AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const 
std::string& authParamsString) {
     if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
         return AuthTls::create(authParamsString);
+    } else if (boost::iequals(pluginName, TOKEN_PLUGIN_NAME) ||
+               boost::iequals(pluginName, TOKEN_JAVA_PLUGIN_NAME)) {
+        return AuthToken::create(authParamsString);
     } else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
                boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
         return AuthAthenz::create(authParamsString);
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.cc 
b/pulsar-client-cpp/lib/auth/AuthToken.cc
new file mode 100644
index 0000000000..bebfe4ee13
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthToken.cc
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "AuthToken.h"
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/bind.hpp>
+
+#include <sstream>
+#include <fstream>
+
+namespace pulsar {
+
+// AuthDataToken
+
+AuthDataToken::AuthDataToken(const TokenSupplier &tokenSupplier) { 
tokenSupplier_ = tokenSupplier; }
+
+AuthDataToken::~AuthDataToken() {}
+
+bool AuthDataToken::hasDataForHttp() { return true; }
+
+std::string AuthDataToken::getHttpHeaders() { return "Authorization: Bearer " 
+ tokenSupplier_(); }
+
+bool AuthDataToken::hasDataFromCommand() { return true; }
+
+std::string AuthDataToken::getCommandData() { return tokenSupplier_(); }
+
+static std::string readDirect(const std::string &token) { return token; }
+
+static std::string readFromFile(const std::string &tokenFilePath) {
+    std::ifstream input(tokenFilePath);
+    std::stringstream buffer;
+    buffer << input.rdbuf();
+    return buffer.str();
+}
+
+static std::string readFromEnv(const std::string &envVarName) {
+    char *value = getenv(envVarName.c_str());
+    if (!value) {
+        throw "Failed to read environment variable " + envVarName;
+    }
+    return std::string(value);
+}
+
+// AuthToken
+
+AuthToken::AuthToken(AuthenticationDataPtr &authDataToken) { authDataToken_ = 
authDataToken; }
+
+AuthToken::~AuthToken() {}
+
+AuthenticationPtr AuthToken::create(ParamMap &params) {
+    if (params.find("token") != params.end()) {
+        return create(boost::bind(&readDirect, params["token"]));
+    } else if (params.find("file") != params.end()) {
+        // Read token from a file
+        return create(boost::bind(&readFromFile, params["file"]));
+    } else if (params.find("env") != params.end()) {
+        // Read token from environment variable
+        std::string envVarName = params["env"];
+        return create(boost::bind(&readFromEnv, envVarName));
+    } else {
+        throw "Invalid configuration for token provider";
+    }
+}
+
+AuthenticationPtr AuthToken::create(const std::string &authParamsString) {
+    ParamMap params;
+    if (boost::starts_with(authParamsString, "token:")) {
+        std::string token = authParamsString.substr(strlen("token:"));
+        params["token"] = token;
+    } else if (boost::starts_with(authParamsString, "file:")) {
+        // Read token from a file
+        std::string filePath = authParamsString.substr(strlen("file://"));
+        params["file"] = filePath;
+    } else if (boost::starts_with(authParamsString, "env:")) {
+        std::string envVarName = authParamsString.substr(strlen("env:"));
+        params["env"] = envVarName;
+    } else {
+        std::string token = authParamsString;
+        params["token"] = token;
+    }
+
+    return create(params);
+}
+
+AuthenticationPtr AuthToken::createWithToken(const std::string &token) {
+    return create(boost::bind(&readDirect, token));
+}
+
+AuthenticationPtr AuthToken::create(const TokenSupplier &tokenSupplier) {
+    AuthenticationDataPtr authDataToken = AuthenticationDataPtr(new 
AuthDataToken(tokenSupplier));
+    return AuthenticationPtr(new AuthToken(authDataToken));
+}
+
+const std::string AuthToken::getAuthMethodName() const { return "token"; }
+
+Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) const {
+    authDataContent = authDataToken_;
+    return ResultOk;
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.h 
b/pulsar-client-cpp/lib/auth/AuthToken.h
new file mode 100644
index 0000000000..8473fe3135
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthToken.h
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/Authentication.h>
+#include <string>
+#include <boost/function.hpp>
+
+namespace pulsar {
+
+const std::string TOKEN_PLUGIN_NAME = "token";
+const std::string TOKEN_JAVA_PLUGIN_NAME = 
"org.apache.pulsar.client.impl.auth.AuthenticationToken";
+
+class AuthDataToken : public AuthenticationDataProvider {
+   public:
+    AuthDataToken(const std::string& token);
+    AuthDataToken(const TokenSupplier& tokenSupplier);
+    ~AuthDataToken();
+
+    bool hasDataForHttp();
+    std::string getHttpHeaders();
+    bool hasDataFromCommand();
+    std::string getCommandData();
+
+   private:
+    TokenSupplier tokenSupplier_;
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc 
b/pulsar-client-cpp/lib/c/c_Authentication.cc
index c03f239feb..83d344b007 100644
--- a/pulsar-client-cpp/lib/c/c_Authentication.cc
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -23,6 +23,8 @@
 
 #include "c_structs.h"
 
+#include <cstdlib>
+
 pulsar_authentication_t *pulsar_authentication_create(const char 
*dynamicLibPath,
                                                       const char 
*authParamsString) {
     pulsar_authentication_t *authentication = new pulsar_authentication_t;
@@ -43,4 +45,24 @@ pulsar_authentication_t 
*pulsar_authentication_athenz_create(const char *authPar
     pulsar_authentication_t *authentication = new pulsar_authentication_t;
     authentication->auth = pulsar::AuthAthenz::create(authParamsString);
     return authentication;
+}
+
+pulsar_authentication_t *pulsar_authentication_token_create(const char *token) 
{
+    pulsar_authentication_t *authentication = new pulsar_authentication_t;
+    authentication->auth = pulsar::AuthToken::createWithToken(token);
+    return authentication;
+}
+
+static std::string tokenSupplierWrapper(token_supplier supplier, void *ctx) {
+    const char *token = supplier(ctx);
+    std::string tokenStr = token;
+    free((void *)token);
+    return tokenStr;
+}
+
+pulsar_authentication_t 
*pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
+                                                                          void 
*ctx) {
+    pulsar_authentication_t *authentication = new pulsar_authentication_t;
+    authentication->auth = 
pulsar::AuthToken::create(boost::bind(&tokenSupplierWrapper, tokenSupplier, 
ctx));
+    return authentication;
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh 
b/pulsar-client-cpp/pulsar-test-service-start.sh
index 914354a3b6..665758ea97 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -44,6 +44,15 @@ mkdir -p $DATA_DIR
 mkdir -p $DATA_DIR/certs
 cp $SRC_DIR/pulsar-broker/src/test/resources/authentication/tls/*.pem 
$DATA_DIR/certs
 
+# Generate secret key and token
+mkdir -p $DATA_DIR/tokens
+$PULSAR_DIR/bin/pulsar tokens create-secret-key --output 
$DATA_DIR/tokens/secret.key
+
+$PULSAR_DIR/bin/pulsar tokens create \
+            --subject token-principal \
+            --secret-key file:///$DATA_DIR/tokens/secret.key \
+            > $DATA_DIR/tokens/token.txt
+
 export 
PULSAR_STANDALONE_CONF=$SRC_DIR/pulsar-client-cpp/test-conf/standalone-ssl.conf
 $PULSAR_DIR/bin/pulsar-daemon start standalone \
         --no-functions-worker --no-stream-storage \
@@ -95,4 +104,8 @@ $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c 
"standalone"
 # Create "private/auth" with required authentication
 $PULSAR_DIR/bin/pulsar-admin namespaces create private/auth --clusters 
standalone
 
+$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission private/auth \
+                        --actions produce,consume \
+                        --role "token-principal"
+
 echo "-- Ready to start tests"
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index 54fee68f98..8004fa6466 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -211,6 +211,24 @@ def __init__(self, certificate_path, private_key_path):
         _check_type(str, private_key_path, 'private_key_path')
         self.auth = _pulsar.AuthenticationTLS(certificate_path, 
private_key_path)
 
+
+class AuthenticationToken(Authentication):
+    """
+    Token based authentication implementation
+    """
+    def __init__(self, token):
+        """
+        Create the token authentication provider instance.
+
+        **Args**
+
+        * `token`: A string containing the token or a functions that provides a
+                   string with the token
+        """
+        if not (isinstance(token, str) or callable(token)):
+            raise ValueError("Argument token is expected to be of type 'str' 
or a function returning 'str'")
+        self.auth = _pulsar.AuthenticationToken(token)
+
 class AuthenticationAthenz(Authentication):
     """
     Athenz Authentication implementation
diff --git a/pulsar-client-cpp/python/pulsar_test.py 
b/pulsar-client-cpp/python/pulsar_test.py
index e9ecbc1b00..158c79ca08 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -24,7 +24,7 @@
 import os
 from pulsar import Client, MessageId, \
             CompressionType, ConsumerType, PartitionsRoutingMode, \
-            AuthenticationTLS, Authentication
+            AuthenticationTLS, Authentication, AuthenticationToken
 
 from _pulsar import ProducerConfiguration, ConsumerConfiguration
 
@@ -793,6 +793,45 @@ def test_get_topics_partitions(self):
                          [topic_non_partitioned])
         client.close()
 
+    def test_token_auth(self):
+        with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+            token = tf.read().strip()
+
+        # Use adminUrl to test both HTTP request and binary protocol
+        client = Client(self.adminUrl,
+                        authentication=AuthenticationToken(token))
+
+        consumer = 
client.subscribe('persistent://private/auth/my-python-topic-token-auth',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = 
client.create_producer('persistent://private/auth/my-python-topic-token-auth')
+        producer.send(b'hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+        client.close()
+
+    def test_token_auth_supplier(self):
+        def read_token():
+            with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+                return tf.read().strip()
+
+        client = Client(self.serviceUrl,
+                        authentication=AuthenticationToken(read_token))
+        consumer = 
client.subscribe('persistent://private/auth/my-python-topic-token-auth',
+                                    'my-sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = 
client.create_producer('persistent://private/auth/my-python-topic-token-auth')
+        producer.send(b'hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg)
+        self.assertEqual(msg.data(), b'hello')
+        client.close()
+
+    #####
+
     def _check_value_error(self, fun):
         try:
             fun()
diff --git a/pulsar-client-cpp/python/src/authentication.cc 
b/pulsar-client-cpp/python/src/authentication.cc
index 9cfb2e717b..9547c99971 100644
--- a/pulsar-client-cpp/python/src/authentication.cc
+++ b/pulsar-client-cpp/python/src/authentication.cc
@@ -32,6 +32,59 @@ struct AuthenticationTlsWrapper : public 
AuthenticationWrapper {
     }
 };
 
+struct TokenSupplierWrapper {
+    PyObject* _pySupplier;
+
+    TokenSupplierWrapper(py::object pySupplier) :
+        _pySupplier(pySupplier.ptr()) {
+        Py_XINCREF(_pySupplier);
+    }
+
+    TokenSupplierWrapper(const TokenSupplierWrapper& other) {
+        _pySupplier= other._pySupplier;
+        Py_XINCREF(_pySupplier);
+    }
+
+    TokenSupplierWrapper& operator=(const TokenSupplierWrapper& other) {
+        _pySupplier = other._pySupplier;
+        Py_XINCREF(_pySupplier);
+        return *this;
+    }
+
+    virtual ~TokenSupplierWrapper() {
+        Py_XDECREF(_pySupplier);
+    }
+
+    std::string operator()() {
+        PyGILState_STATE state = PyGILState_Ensure();
+
+        std::string token;
+        try {
+            token = py::call<std::string>(_pySupplier);
+        } catch (py::error_already_set e) {
+            PyErr_Print();
+        }
+
+        PyGILState_Release(state);
+        return token;
+    }
+};
+
+
+struct AuthenticationTokenWrapper : public AuthenticationWrapper {
+    AuthenticationTokenWrapper(py::object token) :
+            AuthenticationWrapper() {
+        if (py::extract<std::string>(token).check()) {
+            // It's a string
+            std::string tokenStr = py::extract<std::string>(token);
+            this->auth = AuthToken::createWithToken(tokenStr);
+        } else {
+            // It's a function object
+            this->auth = AuthToken::create(TokenSupplierWrapper(token));
+        }
+    }
+};
+
 struct AuthenticationAthenzWrapper : public AuthenticationWrapper {
     AuthenticationAthenzWrapper(const std::string& authParamsString) :
             AuthenticationWrapper() {
@@ -49,6 +102,10 @@ void export_authentication() {
                                                                     init<const 
std::string&, const std::string&>())
             ;
 
+    class_<AuthenticationTokenWrapper, bases<AuthenticationWrapper> 
>("AuthenticationToken",
+                                                                    
init<py::object>())
+            ;
+
     class_<AuthenticationAthenzWrapper, bases<AuthenticationWrapper> 
>("AuthenticationAthenz",
                                                                        
init<const std::string&>())
             ;
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf 
b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index da9123c4cc..c7e6d3c80c 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -98,11 +98,13 @@ anonymousUserRole=anonymous
 authenticationEnabled=true
 
 # Autentication provider name list, which is comma separated list of class 
names
-authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls,org.apache.pulsar.broker.authentication.AuthenticationProviderToken
 
 # Enforce authorization
 authorizationEnabled=true
 
+tokenSecretKey=file:///tmp/pulsar-test-data/tokens/secret.key
+
 # Role names that are treated as "super-user", meaning they will be able to do 
all admin
 # operations and publish/consume from all topics
 superUserRoles=localhost,superUser
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc 
b/pulsar-client-cpp/tests/AuthTokenTest.cc
new file mode 100644
index 0000000000..a77492c92f
--- /dev/null
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/Authentication.h>
+
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <boost/lexical_cast.hpp>
+#include <boost/asio.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/thread.hpp>
+#include <lib/LogUtils.h>
+
+#include <string>
+#include <fstream>
+#include <streambuf>
+
+#include "lib/Future.h"
+#include "lib/Utils.h"
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string serviceUrlHttp = "http://localhost:8080";;
+
+static const std::string tokenPath = "/tmp/pulsar-test-data/tokens/token.txt";
+
+static std::string getToken() {
+    std::ifstream file(tokenPath);
+    std::string str((std::istreambuf_iterator<char>(file)), 
std::istreambuf_iterator<char>());
+    return str;
+}
+
+TEST(AuthPluginToken, testToken) {
+    ClientConfiguration config = ClientConfiguration();
+    std::string token = getToken();
+    AuthenticationPtr auth = pulsar::AuthToken::createWithToken(token);
+
+    ASSERT_TRUE(auth != NULL);
+    ASSERT_EQ(auth->getAuthMethodName(), "token");
+
+    pulsar::AuthenticationDataPtr data;
+    ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk);
+    ASSERT_EQ(data->hasDataFromCommand(), true);
+    ASSERT_EQ(data->getCommandData(), token);
+    ASSERT_EQ(data->hasDataForTls(), false);
+    ASSERT_EQ(data->hasDataForHttp(), true);
+    ASSERT_EQ(auth.use_count(), 1);
+
+    config.setAuth(auth);
+    Client client(serviceUrl, config);
+
+    std::string topicName = "persistent://private/auth/test-token";
+    std::string subName = "subscription-name";
+    int numOfMessages = 10;
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // handling dangling subscriptions
+    consumer.unsubscribe();
+    client.subscribe(topicName, subName, consumer);
+
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    // Send Asynchronously
+    std::string prefix = "test-token-message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, NULL);
+        LOG_INFO("sending message " << messageContent);
+    }
+
+    producer.flush();
+
+    Message receivedMsg;
+    for (int i = 0; i < numOfMessages; i++) {
+        Result res = consumer.receive(receivedMsg);
+        ASSERT_EQ(ResultOk, res);
+
+        std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        LOG_INFO("Received Message with [ content - "
+                 << receivedMsg.getDataAsString() << "] [ messageID = " << 
receivedMsg.getMessageId() << "]");
+        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i));
+        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+}
+
+TEST(AuthPluginToken, testTokenWithHttpUrl) {
+    ClientConfiguration config = ClientConfiguration();
+    std::string token = getToken();
+    config.setAuth(pulsar::AuthToken::createWithToken(token));
+    Client client(serviceUrlHttp, config);
+
+    std::string topicName = "persistent://private/auth/test-token-http";
+    std::string subName = "subscription-name";
+    int numOfMessages = 10;
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // handling dangling subscriptions
+    consumer.unsubscribe();
+    client.subscribe(topicName, subName, consumer);
+
+    std::string temp = producer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    temp = consumer.getTopic();
+    ASSERT_EQ(temp, topicName);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+    // Send Asynchronously
+    std::string prefix = "test-token-message-";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, NULL);
+        LOG_INFO("sending message " << messageContent);
+    }
+
+    producer.flush();
+
+    Message receivedMsg;
+    for (int i = 0; i < numOfMessages; i++) {
+        Result res = consumer.receive(receivedMsg);
+        ASSERT_EQ(ResultOk, res);
+
+        std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        LOG_INFO("Received Message with [ content - "
+                 << receivedMsg.getDataAsString() << "] [ messageID = " << 
receivedMsg.getMessageId() << "]");
+        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i));
+        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+}
+
+TEST(AuthPluginToken, testNoAuth) {
+    ClientConfiguration config;
+    Client client(serviceUrl, config);
+
+    std::string topicName = "persistent://private/auth/test-token";
+    std::string subName = "subscription-name";
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultConnectError, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultConnectError, result);
+}
+
+TEST(AuthPluginToken, testNoAuthWithHttp) {
+    ClientConfiguration config;
+    Client client(serviceUrlHttp, config);
+
+    std::string topicName = "persistent://private/auth/test-token";
+    std::string subName = "subscription-name";
+
+    Producer producer;
+    Result result = client.createProducer(topicName, producer);
+    ASSERT_EQ(ResultConnectError, result);
+
+    Consumer consumer;
+    result = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(ResultConnectError, result);
+}
diff --git a/pulsar-client-go/pulsar/c_client.go 
b/pulsar-client-go/pulsar/c_client.go
index e513b081f3..578726045a 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -127,6 +127,37 @@ func newAuthenticationTLS(certificatePath string, 
privateKeyPath string) Authent
        return auth
 }
 
+func newAuthenticationToken(token string) Authentication {
+       cToken := C.CString(token)
+       defer C.free(unsafe.Pointer(cToken))
+
+       auth := &authentication{
+               ptr: C.pulsar_authentication_token_create(cToken),
+       }
+
+       runtime.SetFinalizer(auth, authenticationFinalizer)
+       return auth
+}
+
+//export pulsarClientTokenSupplierProxy
+func pulsarClientTokenSupplierProxy(ctx unsafe.Pointer) *C.char {
+       tokenSupplier := restorePointerNoDelete(ctx).(func() string)
+       token := tokenSupplier()
+       // The C string will be freed from within the C wrapper itself
+       return C.CString(token);
+}
+
+func newAuthenticationTokenSupplier(tokenSupplier func() string) 
Authentication {
+       supplierPtr := savePointer(tokenSupplier)
+
+       auth := &authentication{
+               ptr: 
C._pulsar_authentication_token_create_with_supplier(supplierPtr),
+       }
+
+       runtime.SetFinalizer(auth, authenticationFinalizer)
+       return auth
+}
+
 func newAuthenticationAthenz(authParams string) Authentication {
        cAuthParams := C.CString(authParams)
        defer C.free(unsafe.Pointer(cAuthParams))
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h 
b/pulsar-client-go/pulsar/c_go_pulsar.h
index cdbebf8771..a0ca786548 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -35,6 +35,12 @@ static inline void 
_pulsar_client_configuration_set_logger(pulsar_client_configu
     pulsar_client_configuration_set_logger(conf, pulsarClientLoggerConstProxy, 
ctx);
 }
 
+char* pulsarClientTokenSupplierProxy(void* ctx);
+
+static inline pulsar_authentication_t* 
_pulsar_authentication_token_create_with_supplier(void *ctx) {
+    return 
pulsar_authentication_token_create_with_supplier(pulsarClientTokenSupplierProxy,
 ctx);
+}
+
 void pulsarCreateProducerCallbackProxy(pulsar_result result, pulsar_producer_t 
*producer, void *ctx);
 
 static inline void _pulsar_client_create_producer_async(pulsar_client_t 
*client, const char *topic,
diff --git a/pulsar-client-go/pulsar/client.go 
b/pulsar-client-go/pulsar/client.go
index 405c8a396f..ed07fbba1c 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -28,6 +28,16 @@ func NewClient(options ClientOptions) (Client, error) {
 // Opaque interface that represents the authentication credentials
 type Authentication interface {}
 
+// Create new Authentication provider with specified auth token
+func NewAuthenticationToken(token string) Authentication {
+       return newAuthenticationToken(token)
+}
+
+// Create new Authentication provider with specified auth token supplier
+func NewAuthenticationTokenSupplier(tokenSupplier func() string) 
Authentication {
+       return newAuthenticationTokenSupplier(tokenSupplier)
+}
+
 // Create new Authentication provider with specified TLS certificate and 
private key
 func NewAuthenticationTLS(certificatePath string, privateKeyPath string) 
Authentication {
        return newAuthenticationTLS(certificatePath, privateKeyPath)
diff --git a/pulsar-client-go/pulsar/client_test.go 
b/pulsar-client-go/pulsar/client_test.go
index 7c200ac01a..61e47708b0 100644
--- a/pulsar-client-go/pulsar/client_test.go
+++ b/pulsar-client-go/pulsar/client_test.go
@@ -20,7 +20,9 @@
 package pulsar
 
 import (
+       "context"
        "fmt"
+       "io/ioutil"
        "testing"
 )
 
@@ -54,3 +56,72 @@ func TestGetTopicPartitions(t *testing.T) {
        assertEqual(t, len(partitions), 1)
        assertEqual(t, partitions[0], topic)
 }
+
+const TestTokenFilePath = "/tmp/pulsar-test-data/certs/token.txt"
+
+func readToken(t *testing.T) string {
+       data, err := ioutil.ReadFile(TestTokenFilePath)
+       assertNil(t, err)
+
+       return string(data)
+}
+
+func TestTokenAuth(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:            "pulsar://localhost:6650",
+               Authentication: NewAuthenticationToken(readToken(t)),
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       topic := "persistent://private/auth/TestTokenAuth"
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assertNil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+}
+
+func TestTokenAuthSupplier(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL:            "pulsar://localhost:6650",
+               Authentication: NewAuthenticationTokenSupplier(func () string {
+                       return readToken(t)
+               }),
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       topic := "persistent://private/auth/TestTokenAuth"
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assertNil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to