This is an automated email from the ASF dual-hosted git repository. massakam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit 0dccd84811597258a559f137cf45593f7734f08f Author: Masahiro Sakamoto <[email protected]> AuthorDate: Mon Mar 25 16:21:16 2019 +0900 Support authentication --- binding.gyp | 3 +- index.js => examples/consumer_tls_auth.js | 36 +++++++++--- index.js => examples/producer_tls_auth.js | 38 ++++++++++--- index.js | 6 ++ src/Authentication.cc | 95 +++++++++++++++++++++++++++++++ src/{addon.cc => Authentication.h} | 29 +++++----- index.js => src/AuthenticationAthenz.js | 13 +++-- index.js => src/AuthenticationTls.js | 12 ++-- index.js => src/AuthenticationToken.js | 12 ++-- src/Client.cc | 11 ++++ src/addon.cc | 6 +- 11 files changed, 213 insertions(+), 48 deletions(-) diff --git a/binding.gyp b/binding.gyp index 88a6c70..cfcf8b7 100644 --- a/binding.gyp +++ b/binding.gyp @@ -29,12 +29,13 @@ "sources": [ "src/addon.cc", "src/Message.cc", + "src/MessageId.cc", + "src/Authentication.cc", "src/Client.cc", "src/Producer.cc", "src/ProducerConfig.cc", "src/Consumer.cc", "src/ConsumerConfig.cc", - "src/MessageId.cc", ], "libraries": ["-lpulsar"], } diff --git a/index.js b/examples/consumer_tls_auth.js similarity index 51% copy from index.js copy to examples/consumer_tls_auth.js index 03f3a08..e71b251 100644 --- a/index.js +++ b/examples/consumer_tls_auth.js @@ -17,12 +17,34 @@ * under the License. */ -const PulsarBinding = require('bindings')('Pulsar'); +const Pulsar = require('../index.js'); -const Pulsar = { - Client: PulsarBinding.Client, - Message: PulsarBinding.Message, - MessageId: PulsarBinding.MessageId, -}; +(async () => { + const auth = new Pulsar.AuthenticationTls({ + certificatePath: '/path/to/client.crt', + privateKeyPath: '/path/to/client.key', + }); -module.exports = Pulsar; + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar+ssl://localhost:6651', + authentication: auth, + tlsTrustCertsFilePath: '/path/to/server.crt', + }); + + // Create a consumer + const consumer = await client.subscribe({ + topic: 'persistent://public/default/my-topic', + subscription: 'sub1', + }); + + // Receive messages + for (let i = 0; i < 10; i += 1) { + const msg = await consumer.receive(); + console.log(msg.getData().toString()); + consumer.acknowledge(msg); + } + + await consumer.close(); + await client.close(); +})(); diff --git a/index.js b/examples/producer_tls_auth.js similarity index 50% copy from index.js copy to examples/producer_tls_auth.js index 03f3a08..df4e33e 100644 --- a/index.js +++ b/examples/producer_tls_auth.js @@ -17,12 +17,36 @@ * under the License. */ -const PulsarBinding = require('bindings')('Pulsar'); +const Pulsar = require('../index.js'); -const Pulsar = { - Client: PulsarBinding.Client, - Message: PulsarBinding.Message, - MessageId: PulsarBinding.MessageId, -}; +(async () => { + const auth = new Pulsar.AuthenticationTls({ + certificatePath: '/path/to/client.crt', + privateKeyPath: '/path/to/client.key', + }); -module.exports = Pulsar; + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar+ssl://localhost:6651', + authentication: auth, + tlsTrustCertsFilePath: '/path/to/server.crt', + }); + + // Create a producer + const producer = await client.createProducer({ + topic: 'persistent://public/default/my-topic', + }); + + // Send messages + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + console.log(`Sent message: ${msg}`); + } + await producer.flush(); + + await producer.close(); + await client.close(); +})(); diff --git a/index.js b/index.js index 03f3a08..97c83c6 100644 --- a/index.js +++ b/index.js @@ -18,11 +18,17 @@ */ const PulsarBinding = require('bindings')('Pulsar'); +const AuthenticationTls = require('./src/AuthenticationTls.js'); +const AuthenticationAthenz = require('./src/AuthenticationAthenz.js'); +const AuthenticationToken = require('./src/AuthenticationToken.js'); const Pulsar = { Client: PulsarBinding.Client, Message: PulsarBinding.Message, MessageId: PulsarBinding.MessageId, + AuthenticationTls, + AuthenticationAthenz, + AuthenticationToken, }; module.exports = Pulsar; diff --git a/src/Authentication.cc b/src/Authentication.cc new file mode 100644 index 0000000..226fd0a --- /dev/null +++ b/src/Authentication.cc @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Authentication.h" + +static const std::string PARAM_TLS_CERT = "certificatePath"; +static const std::string PARAM_TLS_KEY = "privateKeyPath"; +static const std::string PARAM_TOKEN = "token"; + +Napi::FunctionReference Authentication::constructor; + +Napi::Object Authentication::Init(Napi::Env env, Napi::Object exports) { + Napi::HandleScope scope(env); + + Napi::Function func = DefineClass(env, "Authentication", {}); + + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); + + exports.Set("Authentication", func); + return exports; +} + +Authentication::Authentication(const Napi::CallbackInfo &info) + : Napi::ObjectWrap<Authentication>(info), cAuthentication(nullptr) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + + if (info.Length() < 1 || !info[0].IsString() || info[0].ToString().Utf8Value().empty()) { + Napi::Error::New(env, "Authentication method is not specified").ThrowAsJavaScriptException(); + return; + } + + std::string authMethod = info[0].ToString().Utf8Value(); + + if (authMethod == "tls" || authMethod == "token") { + if (info.Length() < 2 || !info[1].IsObject()) { + Napi::Error::New(env, "Authentication parameter must be a object").ThrowAsJavaScriptException(); + return; + } + + Napi::Object obj = info[1].ToObject(); + + if (authMethod == "tls") { + if (!obj.Has(PARAM_TLS_CERT) || !obj.Get(PARAM_TLS_CERT).IsString() || !obj.Has(PARAM_TLS_KEY) || + !obj.Get(PARAM_TLS_KEY).IsString()) { + Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException(); + return; + } + this->cAuthentication = + pulsar_authentication_tls_create(obj.Get(PARAM_TLS_CERT).ToString().Utf8Value().c_str(), + obj.Get(PARAM_TLS_KEY).ToString().Utf8Value().c_str()); + } else if (authMethod == "token") { + if (!obj.Has(PARAM_TOKEN) || !obj.Get(PARAM_TOKEN).IsString()) { + Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException(); + return; + } + this->cAuthentication = + pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str()); + } + } else if (authMethod == "athenz") { + if (info.Length() < 2 || !info[1].IsString()) { + Napi::Error::New(env, "Authentication parameter must be a JSON string").ThrowAsJavaScriptException(); + return; + } + this->cAuthentication = pulsar_authentication_athenz_create(info[1].ToString().Utf8Value().c_str()); + } else { + Napi::Error::New(env, "Unsupported authentication method").ThrowAsJavaScriptException(); + return; + } +} + +Authentication::~Authentication() { + if (this->cAuthentication != nullptr) { + pulsar_authentication_free(this->cAuthentication); + } +} + +pulsar_authentication_t *Authentication::GetCAuthentication() { return this->cAuthentication; } diff --git a/src/addon.cc b/src/Authentication.h similarity index 65% copy from src/addon.cc copy to src/Authentication.h index 9e75d3f..3666bd8 100644 --- a/src/addon.cc +++ b/src/Authentication.h @@ -17,19 +17,22 @@ * under the License. */ -#include "Client.h" -#include "Consumer.h" -#include "Message.h" -#include "MessageId.h" -#include "Producer.h" +#ifndef AUTH_H +#define AUTH_H + #include <napi.h> +#include <pulsar/c/authentication.h> + +class Authentication : public Napi::ObjectWrap<Authentication> { + public: + static Napi::Object Init(Napi::Env env, Napi::Object exports); + Authentication(const Napi::CallbackInfo &info); + ~Authentication(); + pulsar_authentication_t *GetCAuthentication(); -Napi::Object InitAll(Napi::Env env, Napi::Object exports) { - Message::Init(env, exports); - MessageId::Init(env, exports); - Producer::Init(env, exports); - Consumer::Init(env, exports); - return Client::Init(env, exports); -} + private: + static Napi::FunctionReference constructor; + pulsar_authentication_t *cAuthentication; +}; -NODE_API_MODULE(NODE_GYP_MODULE_NAME, InitAll) +#endif diff --git a/index.js b/src/AuthenticationAthenz.js similarity index 77% copy from index.js copy to src/AuthenticationAthenz.js index 03f3a08..edbce3b 100644 --- a/index.js +++ b/src/AuthenticationAthenz.js @@ -19,10 +19,11 @@ const PulsarBinding = require('bindings')('Pulsar'); -const Pulsar = { - Client: PulsarBinding.Client, - Message: PulsarBinding.Message, - MessageId: PulsarBinding.MessageId, -}; +class AuthenticationAthenz { + constructor(params) { + const paramsStr = (typeof params === 'object') ? JSON.stringify(params) : params; + this.binding = new PulsarBinding.Authentication('athenz', paramsStr); + } +} -module.exports = Pulsar; +module.exports = AuthenticationAthenz; diff --git a/index.js b/src/AuthenticationTls.js similarity index 84% copy from index.js copy to src/AuthenticationTls.js index 03f3a08..f00b579 100644 --- a/index.js +++ b/src/AuthenticationTls.js @@ -19,10 +19,10 @@ const PulsarBinding = require('bindings')('Pulsar'); -const Pulsar = { - Client: PulsarBinding.Client, - Message: PulsarBinding.Message, - MessageId: PulsarBinding.MessageId, -}; +class AuthenticationTls { + constructor(params) { + this.binding = new PulsarBinding.Authentication('tls', params); + } +} -module.exports = Pulsar; +module.exports = AuthenticationTls; diff --git a/index.js b/src/AuthenticationToken.js similarity index 83% copy from index.js copy to src/AuthenticationToken.js index 03f3a08..e40c892 100644 --- a/index.js +++ b/src/AuthenticationToken.js @@ -19,10 +19,10 @@ const PulsarBinding = require('bindings')('Pulsar'); -const Pulsar = { - Client: PulsarBinding.Client, - Message: PulsarBinding.Message, - MessageId: PulsarBinding.MessageId, -}; +class AuthenticationToken { + constructor(params) { + this.binding = new PulsarBinding.Authentication('token', params); + } +} -module.exports = Pulsar; +module.exports = AuthenticationToken; diff --git a/src/Client.cc b/src/Client.cc index 16fb13d..123fc0f 100644 --- a/src/Client.cc +++ b/src/Client.cc @@ -20,11 +20,14 @@ #include "Client.h" #include "Consumer.h" #include "Producer.h" +#include "Authentication.h" #include <pulsar/c/client.h> #include <pulsar/c/client_configuration.h> #include <pulsar/c/result.h> static const std::string CFG_SERVICE_URL = "serviceUrl"; +static const std::string CFG_AUTH = "authentication"; +static const std::string CFG_AUTH_PROP = "binding"; static const std::string CFG_OP_TIMEOUT = "operationTimeoutSeconds"; static const std::string CFG_IO_THREADS = "ioThreads"; static const std::string CFG_LISTENER_THREADS = "messageListenerThreads"; @@ -68,6 +71,14 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info) pulsar_client_configuration_t *cClientConfig = pulsar_client_configuration_create(); + if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) { + Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject(); + if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) { + Authentication *auth = Authentication::Unwrap(obj.Get(CFG_AUTH_PROP).ToObject()); + pulsar_client_configuration_set_auth(cClientConfig, auth->GetCAuthentication()); + } + } + if (clientConfig.Has(CFG_OP_TIMEOUT) && clientConfig.Get(CFG_OP_TIMEOUT).IsNumber()) { int32_t operationTimeoutSeconds = clientConfig.Get(CFG_OP_TIMEOUT).ToNumber().Int32Value(); if (operationTimeoutSeconds > 0) { diff --git a/src/addon.cc b/src/addon.cc index 9e75d3f..050ae12 100644 --- a/src/addon.cc +++ b/src/addon.cc @@ -17,16 +17,18 @@ * under the License. */ -#include "Client.h" -#include "Consumer.h" #include "Message.h" #include "MessageId.h" +#include "Authentication.h" #include "Producer.h" +#include "Consumer.h" +#include "Client.h" #include <napi.h> Napi::Object InitAll(Napi::Env env, Napi::Object exports) { Message::Init(env, exports); MessageId::Init(env, exports); + Authentication::Init(env, exports); Producer::Init(env, exports); Consumer::Init(env, exports); return Client::Init(env, exports);
