This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 019ae66 PIP-25: C++ / Python / Go implementation for token
authentication (#3067)
019ae66 is described below
commit 019ae66658201faca134b611ac8cfc3dcdf63d43
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 28 20:42:43 2018 -0800
PIP-25: C++ / Python / Go implementation for token authentication (#3067)
* PIP-25: C++ implementation for token authentication
* PIP-25: C and Go implementation for token authentication
* PIP-25: Python implementation
* Addressed comments
* Added missing <sstream> include
* Fixed argument name that was changed earlier
* Fixed secret key path
---
pulsar-client-cpp/include/pulsar/Authentication.h | 38 ++++
.../include/pulsar/c/authentication.h | 6 +
pulsar-client-cpp/lib/Authentication.cc | 7 +
pulsar-client-cpp/lib/auth/AuthToken.cc | 117 ++++++++++++
.../c/authentication.h => lib/auth/AuthToken.h} | 36 ++--
pulsar-client-cpp/lib/c/c_Authentication.cc | 22 +++
pulsar-client-cpp/pulsar-test-service-start.sh | 13 ++
pulsar-client-cpp/python/pulsar/__init__.py | 18 ++
pulsar-client-cpp/python/pulsar_test.py | 41 +++-
pulsar-client-cpp/python/src/authentication.cc | 57 ++++++
pulsar-client-cpp/test-conf/standalone-ssl.conf | 4 +-
pulsar-client-cpp/tests/AuthTokenTest.cc | 206 +++++++++++++++++++++
pulsar-client-go/pulsar/c_client.go | 31 ++++
pulsar-client-go/pulsar/c_go_pulsar.h | 6 +
pulsar-client-go/pulsar/client.go | 10 +
pulsar-client-go/pulsar/client_test.go | 71 +++++++
16 files changed, 664 insertions(+), 19 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h
b/pulsar-client-cpp/include/pulsar/Authentication.h
index 2ea1238..659f593 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -25,6 +25,7 @@
#include <boost/shared_ptr.hpp>
#include <pulsar/Result.h>
#include <boost/make_shared.hpp>
+#include <boost/function.hpp>
#pragma GCC visibility push(default)
@@ -114,6 +115,43 @@ class AuthTls : public Authentication {
AuthenticationDataPtr authDataTls_;
};
+typedef boost::function<std::string()> TokenSupplier;
+
+/**
+ * Token based implementation of Pulsar client authentication
+ */
+class AuthToken : public Authentication {
+ public:
+ AuthToken(AuthenticationDataPtr&);
+ ~AuthToken();
+
+ static AuthenticationPtr create(ParamMap& params);
+
+ static AuthenticationPtr create(const std::string& authParamsString);
+
+ /**
+ * Create an authentication provider for token based authentication.
+ *
+ * @param token
+ * a string containing the auth token
+ */
+ static AuthenticationPtr createWithToken(const std::string& token);
+
+ /**
+ * Create an authentication provider for token based authentication.
+ *
+ * @param tokenSupplier
+ * a supplier of the client auth token
+ */
+ static AuthenticationPtr create(const TokenSupplier& tokenSupplier);
+
+ const std::string getAuthMethodName() const;
+ Result getAuthData(AuthenticationDataPtr& authDataToken) const;
+
+ private:
+ AuthenticationDataPtr authDataToken_;
+};
+
/**
* Athenz implementation of Pulsar client authentication
*/
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h
b/pulsar-client-cpp/include/pulsar/c/authentication.h
index c3a4d32..b060bbc 100644
--- a/pulsar-client-cpp/include/pulsar/c/authentication.h
+++ b/pulsar-client-cpp/include/pulsar/c/authentication.h
@@ -27,12 +27,18 @@ extern "C" {
typedef struct _pulsar_authentication pulsar_authentication_t;
+typedef char *(*token_supplier)(void *);
+
pulsar_authentication_t *pulsar_authentication_create(const char
*dynamicLibPath,
const char
*authParamsString);
pulsar_authentication_t *pulsar_authentication_tls_create(const char
*certificatePath,
const char
*privateKeyPath);
+pulsar_authentication_t *pulsar_authentication_token_create(const char *token);
+pulsar_authentication_t
*pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
+ void
*ctx);
+
pulsar_authentication_t *pulsar_authentication_athenz_create(const char
*authParamsString);
void pulsar_authentication_free(pulsar_authentication_t *authentication);
diff --git a/pulsar-client-cpp/lib/Authentication.cc
b/pulsar-client-cpp/lib/Authentication.cc
index b3ebf1c..4d4c5d8 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -21,6 +21,7 @@
#include <pulsar/Authentication.h>
#include "auth/AuthTls.h"
#include "auth/AuthAthenz.h"
+#include "auth/AuthToken.h"
#include <lib/LogUtils.h>
#include <string>
@@ -119,6 +120,9 @@ void AuthFactory::release_handles() {
AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName,
ParamMap& paramMap) {
if (boost::iequals(pluginName, TLS_PLUGIN_NAME) ||
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
return AuthTls::create(paramMap);
+ } else if (boost::iequals(pluginName, TOKEN_PLUGIN_NAME) ||
+ boost::iequals(pluginName, TOKEN_JAVA_PLUGIN_NAME)) {
+ return AuthToken::create(paramMap);
} else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
return AuthAthenz::create(paramMap);
@@ -130,6 +134,9 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string&
pluginName, ParamMap&
AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const
std::string& authParamsString) {
if (boost::iequals(pluginName, TLS_PLUGIN_NAME) ||
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
return AuthTls::create(authParamsString);
+ } else if (boost::iequals(pluginName, TOKEN_PLUGIN_NAME) ||
+ boost::iequals(pluginName, TOKEN_JAVA_PLUGIN_NAME)) {
+ return AuthToken::create(authParamsString);
} else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
return AuthAthenz::create(authParamsString);
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.cc
b/pulsar-client-cpp/lib/auth/AuthToken.cc
new file mode 100644
index 0000000..bebfe4e
--- /dev/null
+++ b/pulsar-client-cpp/lib/auth/AuthToken.cc
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "AuthToken.h"
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/bind.hpp>
+
+#include <sstream>
+#include <fstream>
+
+namespace pulsar {
+
+// AuthDataToken
+
+AuthDataToken::AuthDataToken(const TokenSupplier &tokenSupplier) {
tokenSupplier_ = tokenSupplier; }
+
+AuthDataToken::~AuthDataToken() {}
+
+bool AuthDataToken::hasDataForHttp() { return true; }
+
+std::string AuthDataToken::getHttpHeaders() { return "Authorization: Bearer "
+ tokenSupplier_(); }
+
+bool AuthDataToken::hasDataFromCommand() { return true; }
+
+std::string AuthDataToken::getCommandData() { return tokenSupplier_(); }
+
+static std::string readDirect(const std::string &token) { return token; }
+
+static std::string readFromFile(const std::string &tokenFilePath) {
+ std::ifstream input(tokenFilePath);
+ std::stringstream buffer;
+ buffer << input.rdbuf();
+ return buffer.str();
+}
+
+static std::string readFromEnv(const std::string &envVarName) {
+ char *value = getenv(envVarName.c_str());
+ if (!value) {
+ throw "Failed to read environment variable " + envVarName;
+ }
+ return std::string(value);
+}
+
+// AuthToken
+
+AuthToken::AuthToken(AuthenticationDataPtr &authDataToken) { authDataToken_ =
authDataToken; }
+
+AuthToken::~AuthToken() {}
+
+AuthenticationPtr AuthToken::create(ParamMap ¶ms) {
+ if (params.find("token") != params.end()) {
+ return create(boost::bind(&readDirect, params["token"]));
+ } else if (params.find("file") != params.end()) {
+ // Read token from a file
+ return create(boost::bind(&readFromFile, params["file"]));
+ } else if (params.find("env") != params.end()) {
+ // Read token from environment variable
+ std::string envVarName = params["env"];
+ return create(boost::bind(&readFromEnv, envVarName));
+ } else {
+ throw "Invalid configuration for token provider";
+ }
+}
+
+AuthenticationPtr AuthToken::create(const std::string &authParamsString) {
+ ParamMap params;
+ if (boost::starts_with(authParamsString, "token:")) {
+ std::string token = authParamsString.substr(strlen("token:"));
+ params["token"] = token;
+ } else if (boost::starts_with(authParamsString, "file:")) {
+ // Read token from a file
+ std::string filePath = authParamsString.substr(strlen("file://"));
+ params["file"] = filePath;
+ } else if (boost::starts_with(authParamsString, "env:")) {
+ std::string envVarName = authParamsString.substr(strlen("env:"));
+ params["env"] = envVarName;
+ } else {
+ std::string token = authParamsString;
+ params["token"] = token;
+ }
+
+ return create(params);
+}
+
+AuthenticationPtr AuthToken::createWithToken(const std::string &token) {
+ return create(boost::bind(&readDirect, token));
+}
+
+AuthenticationPtr AuthToken::create(const TokenSupplier &tokenSupplier) {
+ AuthenticationDataPtr authDataToken = AuthenticationDataPtr(new
AuthDataToken(tokenSupplier));
+ return AuthenticationPtr(new AuthToken(authDataToken));
+}
+
+const std::string AuthToken::getAuthMethodName() const { return "token"; }
+
+Result AuthToken::getAuthData(AuthenticationDataPtr &authDataContent) const {
+ authDataContent = authDataToken_;
+ return ResultOk;
+}
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h
b/pulsar-client-cpp/lib/auth/AuthToken.h
similarity index 54%
copy from pulsar-client-cpp/include/pulsar/c/authentication.h
copy to pulsar-client-cpp/lib/auth/AuthToken.h
index c3a4d32..8473fe3 100644
--- a/pulsar-client-cpp/include/pulsar/c/authentication.h
+++ b/pulsar-client-cpp/lib/auth/AuthToken.h
@@ -19,26 +19,28 @@
#pragma once
-#ifdef __cplusplus
-extern "C" {
-#endif
+#include <pulsar/Authentication.h>
+#include <string>
+#include <boost/function.hpp>
-#pragma GCC visibility push(default)
+namespace pulsar {
-typedef struct _pulsar_authentication pulsar_authentication_t;
+const std::string TOKEN_PLUGIN_NAME = "token";
+const std::string TOKEN_JAVA_PLUGIN_NAME =
"org.apache.pulsar.client.impl.auth.AuthenticationToken";
-pulsar_authentication_t *pulsar_authentication_create(const char
*dynamicLibPath,
- const char
*authParamsString);
+class AuthDataToken : public AuthenticationDataProvider {
+ public:
+ AuthDataToken(const std::string& token);
+ AuthDataToken(const TokenSupplier& tokenSupplier);
+ ~AuthDataToken();
-pulsar_authentication_t *pulsar_authentication_tls_create(const char
*certificatePath,
- const char
*privateKeyPath);
+ bool hasDataForHttp();
+ std::string getHttpHeaders();
+ bool hasDataFromCommand();
+ std::string getCommandData();
-pulsar_authentication_t *pulsar_authentication_athenz_create(const char
*authParamsString);
+ private:
+ TokenSupplier tokenSupplier_;
+};
-void pulsar_authentication_free(pulsar_authentication_t *authentication);
-
-#pragma GCC visibility pop
-
-#ifdef __cplusplus
-}
-#endif
\ No newline at end of file
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc
b/pulsar-client-cpp/lib/c/c_Authentication.cc
index c03f239..83d344b 100644
--- a/pulsar-client-cpp/lib/c/c_Authentication.cc
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -23,6 +23,8 @@
#include "c_structs.h"
+#include <cstdlib>
+
pulsar_authentication_t *pulsar_authentication_create(const char
*dynamicLibPath,
const char
*authParamsString) {
pulsar_authentication_t *authentication = new pulsar_authentication_t;
@@ -43,4 +45,24 @@ pulsar_authentication_t
*pulsar_authentication_athenz_create(const char *authPar
pulsar_authentication_t *authentication = new pulsar_authentication_t;
authentication->auth = pulsar::AuthAthenz::create(authParamsString);
return authentication;
+}
+
+pulsar_authentication_t *pulsar_authentication_token_create(const char *token)
{
+ pulsar_authentication_t *authentication = new pulsar_authentication_t;
+ authentication->auth = pulsar::AuthToken::createWithToken(token);
+ return authentication;
+}
+
+static std::string tokenSupplierWrapper(token_supplier supplier, void *ctx) {
+ const char *token = supplier(ctx);
+ std::string tokenStr = token;
+ free((void *)token);
+ return tokenStr;
+}
+
+pulsar_authentication_t
*pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
+ void
*ctx) {
+ pulsar_authentication_t *authentication = new pulsar_authentication_t;
+ authentication->auth =
pulsar::AuthToken::create(boost::bind(&tokenSupplierWrapper, tokenSupplier,
ctx));
+ return authentication;
}
\ No newline at end of file
diff --git a/pulsar-client-cpp/pulsar-test-service-start.sh
b/pulsar-client-cpp/pulsar-test-service-start.sh
index 914354a..665758e 100755
--- a/pulsar-client-cpp/pulsar-test-service-start.sh
+++ b/pulsar-client-cpp/pulsar-test-service-start.sh
@@ -44,6 +44,15 @@ mkdir -p $DATA_DIR
mkdir -p $DATA_DIR/certs
cp $SRC_DIR/pulsar-broker/src/test/resources/authentication/tls/*.pem
$DATA_DIR/certs
+# Generate secret key and token
+mkdir -p $DATA_DIR/tokens
+$PULSAR_DIR/bin/pulsar tokens create-secret-key --output
$DATA_DIR/tokens/secret.key
+
+$PULSAR_DIR/bin/pulsar tokens create \
+ --subject token-principal \
+ --secret-key file:///$DATA_DIR/tokens/secret.key \
+ > $DATA_DIR/tokens/token.txt
+
export
PULSAR_STANDALONE_CONF=$SRC_DIR/pulsar-client-cpp/test-conf/standalone-ssl.conf
$PULSAR_DIR/bin/pulsar-daemon start standalone \
--no-functions-worker --no-stream-storage \
@@ -95,4 +104,8 @@ $PULSAR_DIR/bin/pulsar-admin tenants create private -r "" -c
"standalone"
# Create "private/auth" with required authentication
$PULSAR_DIR/bin/pulsar-admin namespaces create private/auth --clusters
standalone
+$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission private/auth \
+ --actions produce,consume \
+ --role "token-principal"
+
echo "-- Ready to start tests"
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py
b/pulsar-client-cpp/python/pulsar/__init__.py
index 54fee68..8004fa6 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -211,6 +211,24 @@ class AuthenticationTLS(Authentication):
_check_type(str, private_key_path, 'private_key_path')
self.auth = _pulsar.AuthenticationTLS(certificate_path,
private_key_path)
+
+class AuthenticationToken(Authentication):
+ """
+ Token based authentication implementation
+ """
+ def __init__(self, token):
+ """
+ Create the token authentication provider instance.
+
+ **Args**
+
+ * `token`: A string containing the token or a functions that provides a
+ string with the token
+ """
+ if not (isinstance(token, str) or callable(token)):
+ raise ValueError("Argument token is expected to be of type 'str'
or a function returning 'str'")
+ self.auth = _pulsar.AuthenticationToken(token)
+
class AuthenticationAthenz(Authentication):
"""
Athenz Authentication implementation
diff --git a/pulsar-client-cpp/python/pulsar_test.py
b/pulsar-client-cpp/python/pulsar_test.py
index e9ecbc1..158c79c 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -24,7 +24,7 @@ import time
import os
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
- AuthenticationTLS, Authentication
+ AuthenticationTLS, Authentication, AuthenticationToken
from _pulsar import ProducerConfiguration, ConsumerConfiguration
@@ -793,6 +793,45 @@ class PulsarTest(TestCase):
[topic_non_partitioned])
client.close()
+ def test_token_auth(self):
+ with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+ token = tf.read().strip()
+
+ # Use adminUrl to test both HTTP request and binary protocol
+ client = Client(self.adminUrl,
+ authentication=AuthenticationToken(token))
+
+ consumer =
client.subscribe('persistent://private/auth/my-python-topic-token-auth',
+ 'my-sub',
+ consumer_type=ConsumerType.Shared)
+ producer =
client.create_producer('persistent://private/auth/my-python-topic-token-auth')
+ producer.send(b'hello')
+
+ msg = consumer.receive(1000)
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b'hello')
+ client.close()
+
+ def test_token_auth_supplier(self):
+ def read_token():
+ with open('/tmp/pulsar-test-data/tokens/token.txt') as tf:
+ return tf.read().strip()
+
+ client = Client(self.serviceUrl,
+ authentication=AuthenticationToken(read_token))
+ consumer =
client.subscribe('persistent://private/auth/my-python-topic-token-auth',
+ 'my-sub',
+ consumer_type=ConsumerType.Shared)
+ producer =
client.create_producer('persistent://private/auth/my-python-topic-token-auth')
+ producer.send(b'hello')
+
+ msg = consumer.receive(1000)
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b'hello')
+ client.close()
+
+ #####
+
def _check_value_error(self, fun):
try:
fun()
diff --git a/pulsar-client-cpp/python/src/authentication.cc
b/pulsar-client-cpp/python/src/authentication.cc
index 9cfb2e7..9547c99 100644
--- a/pulsar-client-cpp/python/src/authentication.cc
+++ b/pulsar-client-cpp/python/src/authentication.cc
@@ -32,6 +32,59 @@ struct AuthenticationTlsWrapper : public
AuthenticationWrapper {
}
};
+struct TokenSupplierWrapper {
+ PyObject* _pySupplier;
+
+ TokenSupplierWrapper(py::object pySupplier) :
+ _pySupplier(pySupplier.ptr()) {
+ Py_XINCREF(_pySupplier);
+ }
+
+ TokenSupplierWrapper(const TokenSupplierWrapper& other) {
+ _pySupplier= other._pySupplier;
+ Py_XINCREF(_pySupplier);
+ }
+
+ TokenSupplierWrapper& operator=(const TokenSupplierWrapper& other) {
+ _pySupplier = other._pySupplier;
+ Py_XINCREF(_pySupplier);
+ return *this;
+ }
+
+ virtual ~TokenSupplierWrapper() {
+ Py_XDECREF(_pySupplier);
+ }
+
+ std::string operator()() {
+ PyGILState_STATE state = PyGILState_Ensure();
+
+ std::string token;
+ try {
+ token = py::call<std::string>(_pySupplier);
+ } catch (py::error_already_set e) {
+ PyErr_Print();
+ }
+
+ PyGILState_Release(state);
+ return token;
+ }
+};
+
+
+struct AuthenticationTokenWrapper : public AuthenticationWrapper {
+ AuthenticationTokenWrapper(py::object token) :
+ AuthenticationWrapper() {
+ if (py::extract<std::string>(token).check()) {
+ // It's a string
+ std::string tokenStr = py::extract<std::string>(token);
+ this->auth = AuthToken::createWithToken(tokenStr);
+ } else {
+ // It's a function object
+ this->auth = AuthToken::create(TokenSupplierWrapper(token));
+ }
+ }
+};
+
struct AuthenticationAthenzWrapper : public AuthenticationWrapper {
AuthenticationAthenzWrapper(const std::string& authParamsString) :
AuthenticationWrapper() {
@@ -49,6 +102,10 @@ void export_authentication() {
init<const
std::string&, const std::string&>())
;
+ class_<AuthenticationTokenWrapper, bases<AuthenticationWrapper>
>("AuthenticationToken",
+
init<py::object>())
+ ;
+
class_<AuthenticationAthenzWrapper, bases<AuthenticationWrapper>
>("AuthenticationAthenz",
init<const std::string&>())
;
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf
b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index da9123c..c7e6d3c 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -98,11 +98,13 @@ anonymousUserRole=anonymous
authenticationEnabled=true
# Autentication provider name list, which is comma separated list of class
names
-authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls
+authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderTls,org.apache.pulsar.broker.authentication.AuthenticationProviderToken
# Enforce authorization
authorizationEnabled=true
+tokenSecretKey=file:///tmp/pulsar-test-data/tokens/secret.key
+
# Role names that are treated as "super-user", meaning they will be able to do
all admin
# operations and publish/consume from all topics
superUserRoles=localhost,superUser
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc
b/pulsar-client-cpp/tests/AuthTokenTest.cc
new file mode 100644
index 0000000..a77492c
--- /dev/null
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/Authentication.h>
+
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <boost/lexical_cast.hpp>
+#include <boost/asio.hpp>
+#include <boost/algorithm/string.hpp>
+#include <boost/thread.hpp>
+#include <lib/LogUtils.h>
+
+#include <string>
+#include <fstream>
+#include <streambuf>
+
+#include "lib/Future.h"
+#include "lib/Utils.h"
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string serviceUrlHttp = "http://localhost:8080";
+
+static const std::string tokenPath = "/tmp/pulsar-test-data/tokens/token.txt";
+
+static std::string getToken() {
+ std::ifstream file(tokenPath);
+ std::string str((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
+ return str;
+}
+
+TEST(AuthPluginToken, testToken) {
+ ClientConfiguration config = ClientConfiguration();
+ std::string token = getToken();
+ AuthenticationPtr auth = pulsar::AuthToken::createWithToken(token);
+
+ ASSERT_TRUE(auth != NULL);
+ ASSERT_EQ(auth->getAuthMethodName(), "token");
+
+ pulsar::AuthenticationDataPtr data;
+ ASSERT_EQ(auth->getAuthData(data), pulsar::ResultOk);
+ ASSERT_EQ(data->hasDataFromCommand(), true);
+ ASSERT_EQ(data->getCommandData(), token);
+ ASSERT_EQ(data->hasDataForTls(), false);
+ ASSERT_EQ(data->hasDataForHttp(), true);
+ ASSERT_EQ(auth.use_count(), 1);
+
+ config.setAuth(auth);
+ Client client(serviceUrl, config);
+
+ std::string topicName = "persistent://private/auth/test-token";
+ std::string subName = "subscription-name";
+ int numOfMessages = 10;
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, subName, consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ // handling dangling subscriptions
+ consumer.unsubscribe();
+ client.subscribe(topicName, subName, consumer);
+
+ std::string temp = producer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ temp = consumer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ // Send Asynchronously
+ std::string prefix = "test-token-message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, NULL);
+ LOG_INFO("sending message " << messageContent);
+ }
+
+ producer.flush();
+
+ Message receivedMsg;
+ for (int i = 0; i < numOfMessages; i++) {
+ Result res = consumer.receive(receivedMsg);
+ ASSERT_EQ(ResultOk, res);
+
+ std::string expectedMessageContent = prefix +
boost::lexical_cast<std::string>(i);
+ LOG_INFO("Received Message with [ content - "
+ << receivedMsg.getDataAsString() << "] [ messageID = " <<
receivedMsg.getMessageId() << "]");
+ ASSERT_EQ(receivedMsg.getProperty("msgIndex"),
boost::lexical_cast<std::string>(i));
+ ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ }
+}
+
+TEST(AuthPluginToken, testTokenWithHttpUrl) {
+ ClientConfiguration config = ClientConfiguration();
+ std::string token = getToken();
+ config.setAuth(pulsar::AuthToken::createWithToken(token));
+ Client client(serviceUrlHttp, config);
+
+ std::string topicName = "persistent://private/auth/test-token-http";
+ std::string subName = "subscription-name";
+ int numOfMessages = 10;
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultOk, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, subName, consumer);
+ ASSERT_EQ(ResultOk, result);
+
+ // handling dangling subscriptions
+ consumer.unsubscribe();
+ client.subscribe(topicName, subName, consumer);
+
+ std::string temp = producer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ temp = consumer.getTopic();
+ ASSERT_EQ(temp, topicName);
+ ASSERT_EQ(consumer.getSubscriptionName(), subName);
+
+ // Send Asynchronously
+ std::string prefix = "test-token-message-";
+ for (int i = 0; i < numOfMessages; i++) {
+ std::string messageContent = prefix +
boost::lexical_cast<std::string>(i);
+ Message msg = MessageBuilder()
+ .setContent(messageContent)
+ .setProperty("msgIndex",
boost::lexical_cast<std::string>(i))
+ .build();
+ producer.sendAsync(msg, NULL);
+ LOG_INFO("sending message " << messageContent);
+ }
+
+ producer.flush();
+
+ Message receivedMsg;
+ for (int i = 0; i < numOfMessages; i++) {
+ Result res = consumer.receive(receivedMsg);
+ ASSERT_EQ(ResultOk, res);
+
+ std::string expectedMessageContent = prefix +
boost::lexical_cast<std::string>(i);
+ LOG_INFO("Received Message with [ content - "
+ << receivedMsg.getDataAsString() << "] [ messageID = " <<
receivedMsg.getMessageId() << "]");
+ ASSERT_EQ(receivedMsg.getProperty("msgIndex"),
boost::lexical_cast<std::string>(i));
+ ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+ }
+}
+
+TEST(AuthPluginToken, testNoAuth) {
+ ClientConfiguration config;
+ Client client(serviceUrl, config);
+
+ std::string topicName = "persistent://private/auth/test-token";
+ std::string subName = "subscription-name";
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultConnectError, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, subName, consumer);
+ ASSERT_EQ(ResultConnectError, result);
+}
+
+TEST(AuthPluginToken, testNoAuthWithHttp) {
+ ClientConfiguration config;
+ Client client(serviceUrlHttp, config);
+
+ std::string topicName = "persistent://private/auth/test-token";
+ std::string subName = "subscription-name";
+
+ Producer producer;
+ Result result = client.createProducer(topicName, producer);
+ ASSERT_EQ(ResultConnectError, result);
+
+ Consumer consumer;
+ result = client.subscribe(topicName, subName, consumer);
+ ASSERT_EQ(ResultConnectError, result);
+}
diff --git a/pulsar-client-go/pulsar/c_client.go
b/pulsar-client-go/pulsar/c_client.go
index e513b08..5787260 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -127,6 +127,37 @@ func newAuthenticationTLS(certificatePath string,
privateKeyPath string) Authent
return auth
}
+func newAuthenticationToken(token string) Authentication {
+ cToken := C.CString(token)
+ defer C.free(unsafe.Pointer(cToken))
+
+ auth := &authentication{
+ ptr: C.pulsar_authentication_token_create(cToken),
+ }
+
+ runtime.SetFinalizer(auth, authenticationFinalizer)
+ return auth
+}
+
+//export pulsarClientTokenSupplierProxy
+func pulsarClientTokenSupplierProxy(ctx unsafe.Pointer) *C.char {
+ tokenSupplier := restorePointerNoDelete(ctx).(func() string)
+ token := tokenSupplier()
+ // The C string will be freed from within the C wrapper itself
+ return C.CString(token);
+}
+
+func newAuthenticationTokenSupplier(tokenSupplier func() string)
Authentication {
+ supplierPtr := savePointer(tokenSupplier)
+
+ auth := &authentication{
+ ptr:
C._pulsar_authentication_token_create_with_supplier(supplierPtr),
+ }
+
+ runtime.SetFinalizer(auth, authenticationFinalizer)
+ return auth
+}
+
func newAuthenticationAthenz(authParams string) Authentication {
cAuthParams := C.CString(authParams)
defer C.free(unsafe.Pointer(cAuthParams))
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h
b/pulsar-client-go/pulsar/c_go_pulsar.h
index cdbebf8..a0ca786 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -35,6 +35,12 @@ static inline void
_pulsar_client_configuration_set_logger(pulsar_client_configu
pulsar_client_configuration_set_logger(conf, pulsarClientLoggerConstProxy,
ctx);
}
+char* pulsarClientTokenSupplierProxy(void* ctx);
+
+static inline pulsar_authentication_t*
_pulsar_authentication_token_create_with_supplier(void *ctx) {
+ return
pulsar_authentication_token_create_with_supplier(pulsarClientTokenSupplierProxy,
ctx);
+}
+
void pulsarCreateProducerCallbackProxy(pulsar_result result, pulsar_producer_t
*producer, void *ctx);
static inline void _pulsar_client_create_producer_async(pulsar_client_t
*client, const char *topic,
diff --git a/pulsar-client-go/pulsar/client.go
b/pulsar-client-go/pulsar/client.go
index 405c8a3..ed07fbb 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -28,6 +28,16 @@ func NewClient(options ClientOptions) (Client, error) {
// Opaque interface that represents the authentication credentials
type Authentication interface {}
+// Create new Authentication provider with specified auth token
+func NewAuthenticationToken(token string) Authentication {
+ return newAuthenticationToken(token)
+}
+
+// Create new Authentication provider with specified auth token supplier
+func NewAuthenticationTokenSupplier(tokenSupplier func() string)
Authentication {
+ return newAuthenticationTokenSupplier(tokenSupplier)
+}
+
// Create new Authentication provider with specified TLS certificate and
private key
func NewAuthenticationTLS(certificatePath string, privateKeyPath string)
Authentication {
return newAuthenticationTLS(certificatePath, privateKeyPath)
diff --git a/pulsar-client-go/pulsar/client_test.go
b/pulsar-client-go/pulsar/client_test.go
index 7c200ac..61e4770 100644
--- a/pulsar-client-go/pulsar/client_test.go
+++ b/pulsar-client-go/pulsar/client_test.go
@@ -20,7 +20,9 @@
package pulsar
import (
+ "context"
"fmt"
+ "io/ioutil"
"testing"
)
@@ -54,3 +56,72 @@ func TestGetTopicPartitions(t *testing.T) {
assertEqual(t, len(partitions), 1)
assertEqual(t, partitions[0], topic)
}
+
+const TestTokenFilePath = "/tmp/pulsar-test-data/certs/token.txt"
+
+func readToken(t *testing.T) string {
+ data, err := ioutil.ReadFile(TestTokenFilePath)
+ assertNil(t, err)
+
+ return string(data)
+}
+
+func TestTokenAuth(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ Authentication: NewAuthenticationToken(readToken(t)),
+ })
+
+ assertNil(t, err)
+ defer client.Close()
+
+ topic := "persistent://private/auth/TestTokenAuth"
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assertNil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
+
+func TestTokenAuthSupplier(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ Authentication: NewAuthenticationTokenSupplier(func () string {
+ return readToken(t)
+ }),
+ })
+
+ assertNil(t, err)
+ defer client.Close()
+
+ topic := "persistent://private/auth/TestTokenAuth"
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+
+ assertNil(t, err)
+ defer producer.Close()
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+}