This is an automated email from the ASF dual-hosted git repository. massakam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit 618729cc5a3ec00b7d2cb5c6a8e12dd142be203a Author: yfuruta <[email protected]> AuthorDate: Tue Jun 11 13:47:12 2019 +0900 impl reader --- binding.gyp | 2 + src/Client.h => examples/reader.js | 38 ++++---- src/Client.cc | 14 ++- src/Client.h | 1 + src/Reader.cc | 185 +++++++++++++++++++++++++++++++++++++ src/{Client.h => Reader.h} | 22 +++-- src/ReaderConfig.cc | 70 ++++++++++++++ src/{Client.h => ReaderConfig.h} | 27 +++--- src/addon.cc | 2 + tests/end_to_end.test.js | 61 +++++++++++- tests/reader.test.js | 72 +++++++++++++++ 11 files changed, 445 insertions(+), 49 deletions(-) diff --git a/binding.gyp b/binding.gyp index cfcf8b7..4b7b2f4 100644 --- a/binding.gyp +++ b/binding.gyp @@ -36,6 +36,8 @@ "src/ProducerConfig.cc", "src/Consumer.cc", "src/ConsumerConfig.cc", + "src/Reader.cc", + "src/ReaderConfig.cc", ], "libraries": ["-lpulsar"], } diff --git a/src/Client.h b/examples/reader.js similarity index 59% copy from src/Client.h copy to examples/reader.js index 63c66f2..5665a35 100644 --- a/src/Client.h +++ b/examples/reader.js @@ -17,25 +17,27 @@ * under the License. */ -#ifndef CLIENT_H -#define CLIENT_H +const Pulsar = require('../index.js'); -#include <napi.h> -#include <pulsar/c/client.h> +(async () => { + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); -class Client : public Napi::ObjectWrap<Client> { - public: - static Napi::Object Init(Napi::Env env, Napi::Object exports); - Client(const Napi::CallbackInfo &info); - ~Client(); + // Create a reader + const reader = await client.createReader({ + topic: 'persistent://public/default/my-topic', + startMessageId: Pulsar.MessageId.earliest(), + }); - private: - static Napi::FunctionReference constructor; - pulsar_client_t *cClient; + // read messages + for (let i = 0; i < 10; i += 1) { + const msg = await reader.readNext(); + console.log(msg.getData().toString()); + } - Napi::Value CreateProducer(const Napi::CallbackInfo &info); - Napi::Value Subscribe(const Napi::CallbackInfo &info); - Napi::Value Close(const Napi::CallbackInfo &info); -}; - -#endif + await reader.close(); + await client.close(); +})(); diff --git a/src/Client.cc b/src/Client.cc index 123fc0f..a0c5bad 100644 --- a/src/Client.cc +++ b/src/Client.cc @@ -20,6 +20,7 @@ #include "Client.h" #include "Consumer.h" #include "Producer.h" +#include "Reader.h" #include "Authentication.h" #include <pulsar/c/client.h> #include <pulsar/c/client_configuration.h> @@ -43,10 +44,11 @@ Napi::FunctionReference Client::constructor; Napi::Object Client::Init(Napi::Env env, Napi::Object exports) { Napi::HandleScope scope(env); - Napi::Function func = - DefineClass(env, "Client", - {InstanceMethod("createProducer", &Client::CreateProducer), - InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("close", &Client::Close)}); + Napi::Function func = DefineClass( + env, "Client", + {InstanceMethod("createProducer", &Client::CreateProducer), + InstanceMethod("subscribe", &Client::Subscribe), InstanceMethod("createReader", &Client::CreateReader), + InstanceMethod("close", &Client::Close)}); constructor = Napi::Persistent(func); constructor.SuppressDestruct(); @@ -151,6 +153,10 @@ Napi::Value Client::Subscribe(const Napi::CallbackInfo &info) { return Consumer::NewInstance(info, this->cClient); } +Napi::Value Client::CreateReader(const Napi::CallbackInfo &info) { + return Reader::NewInstance(info, this->cClient); +} + class ClientCloseWorker : public Napi::AsyncWorker { public: ClientCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient) diff --git a/src/Client.h b/src/Client.h index 63c66f2..0def389 100644 --- a/src/Client.h +++ b/src/Client.h @@ -35,6 +35,7 @@ class Client : public Napi::ObjectWrap<Client> { Napi::Value CreateProducer(const Napi::CallbackInfo &info); Napi::Value Subscribe(const Napi::CallbackInfo &info); + Napi::Value CreateReader(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); }; diff --git a/src/Reader.cc b/src/Reader.cc new file mode 100644 index 0000000..8c2b859 --- /dev/null +++ b/src/Reader.cc @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "Message.h" +#include "Reader.h" +#include "ReaderConfig.h" +#include <pulsar/c/result.h> +#include <pulsar/c/reader.h> + +Napi::FunctionReference Reader::constructor; + +void Reader::Init(Napi::Env env, Napi::Object exports) { + Napi::HandleScope scope(env); + + Napi::Function func = DefineClass(env, "Reader", + { + InstanceMethod("readNext", &Reader::ReadNext), + InstanceMethod("hasNext", &Reader::HasNext), + InstanceMethod("close", &Reader::Close), + }); + + constructor = Napi::Persistent(func); + constructor.SuppressDestruct(); +} + +void Reader::SetCReader(pulsar_reader_t *cReader) { this->cReader = cReader; } + +Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info) {} + +class ReaderNewInstanceWorker : public Napi::AsyncWorker { + public: + ReaderNewInstanceWorker(const Napi::Promise::Deferred &deferred, pulsar_client_t *cClient, + ReaderConfig *readerConfig) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cClient(cClient), + readerConfig(readerConfig) {} + ~ReaderNewInstanceWorker() {} + void Execute() { + const std::string &topic = this->readerConfig->GetTopic(); + if (topic.empty()) { + SetError(std::string("Topic is required and must be specified as a string when creating reader")); + return; + } + if (this->readerConfig->GetCStartMessageId() == nullptr) { + SetError(std::string( + "StartMessageId is required and must be specified as a MessageId object when creating reader")); + return; + } + + pulsar_result result = + pulsar_client_create_reader(this->cClient, topic.c_str(), this->readerConfig->GetCStartMessageId(), + this->readerConfig->GetCReaderConfig(), &(this->cReader)); + delete this->readerConfig; + if (result != pulsar_result_Ok) { + SetError(std::string("Failed to create reader: ") + pulsar_result_str(result)); + return; + } + } + void OnOK() { + Napi::Object obj = Reader::constructor.New({}); + Reader *reader = Reader::Unwrap(obj); + reader->SetCReader(this->cReader); + this->deferred.Resolve(obj); + } + void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } + + private: + Napi::Promise::Deferred deferred; + pulsar_client_t *cClient; + ReaderConfig *readerConfig; + pulsar_reader_t *cReader; +}; + +Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) { + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + Napi::Object config = info[0].As<Napi::Object>(); + ReaderConfig *readerConfig = new ReaderConfig(config); + ReaderNewInstanceWorker *wk = new ReaderNewInstanceWorker(deferred, cClient, readerConfig); + wk->Queue(); + return deferred.Promise(); +} + +class ReaderReadNextWorker : public Napi::AsyncWorker { + public: + ReaderReadNextWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader, + int64_t timeout = -1) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cReader(cReader), + timeout(timeout) {} + ~ReaderReadNextWorker() {} + void Execute() { + pulsar_result result; + if (timeout > 0) { + result = pulsar_reader_read_next_with_timeout(this->cReader, &(this->cMessage), timeout); + } else { + result = pulsar_reader_read_next(this->cReader, &(this->cMessage)); + } + if (result != pulsar_result_Ok) { + SetError(std::string("Failed to received message ") + pulsar_result_str(result)); + } + } + void OnOK() { + Napi::Object obj = Message::NewInstance({}, this->cMessage); + this->deferred.Resolve(obj); + } + void OnError(const Napi::Error &e) { this->deferred.Reject(Napi::Error::New(Env(), e.Message()).Value()); } + + private: + Napi::Promise::Deferred deferred; + pulsar_reader_t *cReader; + pulsar_message_t *cMessage; + int64_t timeout; +}; + +Napi::Value Reader::ReadNext(const Napi::CallbackInfo &info) { + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + if (info[0].IsUndefined()) { + ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader); + wk->Queue(); + } else { + Napi::Number timeout = info[0].As<Napi::Object>().ToNumber(); + ReaderReadNextWorker *wk = new ReaderReadNextWorker(deferred, this->cReader, timeout.Int64Value()); + wk->Queue(); + } + return deferred.Promise(); +} + +Napi::Value Reader::HasNext(const Napi::CallbackInfo &info) { + int value = 0; + pulsar_result result = pulsar_reader_has_message_available(this->cReader, &value); + if (result != pulsar_result_Ok || value != 1) { + return Napi::Boolean::New(info.Env(), false); + } else { + return Napi::Boolean::New(info.Env(), true); + } +} + +class ReaderCloseWorker : public Napi::AsyncWorker { + public: + ReaderCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_reader_t *cReader) + : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), + deferred(deferred), + cReader(cReader) {} + ~ReaderCloseWorker() {} + void Execute() { + pulsar_result result = pulsar_reader_close(this->cReader); + if (result != pulsar_result_Ok) SetError(pulsar_result_str(result)); + } + void OnOK() { this->deferred.Resolve(Env().Null()); } + void OnError(const Napi::Error &e) { + this->deferred.Reject( + Napi::Error::New(Env(), std::string("Failed to close reader: ") + e.Message()).Value()); + } + + private: + Napi::Promise::Deferred deferred; + pulsar_reader_t *cReader; +}; + +Napi::Value Reader::Close(const Napi::CallbackInfo &info) { + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ReaderCloseWorker *wk = new ReaderCloseWorker(deferred, this->cReader); + wk->Queue(); + return deferred.Promise(); +} + +Reader::~Reader() { pulsar_reader_free(this->cReader); } diff --git a/src/Client.h b/src/Reader.h similarity index 67% copy from src/Client.h copy to src/Reader.h index 63c66f2..787b732 100644 --- a/src/Client.h +++ b/src/Reader.h @@ -17,24 +17,26 @@ * under the License. */ -#ifndef CLIENT_H -#define CLIENT_H +#ifndef READER_H +#define READER_H #include <napi.h> #include <pulsar/c/client.h> -class Client : public Napi::ObjectWrap<Client> { +class Reader : public Napi::ObjectWrap<Reader> { public: - static Napi::Object Init(Napi::Env env, Napi::Object exports); - Client(const Napi::CallbackInfo &info); - ~Client(); + static void Init(Napi::Env env, Napi::Object exports); + static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient); + static Napi::FunctionReference constructor; + Reader(const Napi::CallbackInfo &info); + ~Reader(); + void SetCReader(pulsar_reader_t *cReader); private: - static Napi::FunctionReference constructor; - pulsar_client_t *cClient; + pulsar_reader_t *cReader; - Napi::Value CreateProducer(const Napi::CallbackInfo &info); - Napi::Value Subscribe(const Napi::CallbackInfo &info); + Napi::Value ReadNext(const Napi::CallbackInfo &info); + Napi::Value HasNext(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); }; diff --git a/src/ReaderConfig.cc b/src/ReaderConfig.cc new file mode 100644 index 0000000..7e15807 --- /dev/null +++ b/src/ReaderConfig.cc @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "ReaderConfig.h" +#include "MessageId.h" +#include <map> + +static const std::string CFG_TOPIC = "topic"; +static const std::string CFG_START_MESSAGE_ID = "startMessageId"; +static const std::string CFG_RECV_QUEUE = "receiverQueueSize"; +static const std::string CFG_READER_NAME = "readerName"; +static const std::string CFG_SUBSCRIPTION_ROLE_PREFIX = "subscriptionRolePrefix"; + +ReaderConfig::ReaderConfig(const Napi::Object &readerConfig) : topic(""), cStartMessageId(NULL) { + this->cReaderConfig = pulsar_reader_configuration_create(); + + if (readerConfig.Has(CFG_TOPIC) && readerConfig.Get(CFG_TOPIC).IsString()) { + this->topic = readerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); + } + if (readerConfig.Has(CFG_START_MESSAGE_ID) && readerConfig.Get(CFG_START_MESSAGE_ID).IsObject()) { + Napi::Object objMessageId = readerConfig.Get(CFG_START_MESSAGE_ID).ToObject(); + MessageId *msgId = MessageId::Unwrap(objMessageId); + this->cStartMessageId = msgId->GetCMessageId(); + } + + if (readerConfig.Has(CFG_RECV_QUEUE) && readerConfig.Get(CFG_RECV_QUEUE).IsNumber()) { + int32_t receiverQueueSize = readerConfig.Get(CFG_RECV_QUEUE).ToNumber().Int32Value(); + if (receiverQueueSize >= 0) { + pulsar_reader_configuration_set_receiver_queue_size(this->cReaderConfig, receiverQueueSize); + } + } + + if (readerConfig.Has(CFG_READER_NAME) && readerConfig.Get(CFG_READER_NAME).IsString()) { + std::string readerName = readerConfig.Get(CFG_READER_NAME).ToString().Utf8Value(); + if (!readerName.empty()) + pulsar_reader_configuration_set_reader_name(this->cReaderConfig, readerName.c_str()); + } + + if (readerConfig.Has(CFG_SUBSCRIPTION_ROLE_PREFIX) && + readerConfig.Get(CFG_SUBSCRIPTION_ROLE_PREFIX).IsString()) { + std::string subscriptionRolePrefix = + readerConfig.Get(CFG_SUBSCRIPTION_ROLE_PREFIX).ToString().Utf8Value(); + if (!subscriptionRolePrefix.empty()) + pulsar_reader_configuration_set_reader_name(this->cReaderConfig, subscriptionRolePrefix.c_str()); + } +} + +ReaderConfig::~ReaderConfig() { pulsar_reader_configuration_free(this->cReaderConfig); } + +pulsar_reader_configuration_t *ReaderConfig::GetCReaderConfig() { return this->cReaderConfig; } + +std::string ReaderConfig::GetTopic() { return this->topic; } + +pulsar_message_id_t *ReaderConfig::GetCStartMessageId() { return this->cStartMessageId; } diff --git a/src/Client.h b/src/ReaderConfig.h similarity index 64% copy from src/Client.h copy to src/ReaderConfig.h index 63c66f2..1983459 100644 --- a/src/Client.h +++ b/src/ReaderConfig.h @@ -17,25 +17,26 @@ * under the License. */ -#ifndef CLIENT_H -#define CLIENT_H +#ifndef READER_CONFIG_H +#define READER_CONFIG_H #include <napi.h> -#include <pulsar/c/client.h> +#include <pulsar/c/reader.h> +#include <pulsar/c/reader_configuration.h> +#include <pulsar/c/message_id.h> -class Client : public Napi::ObjectWrap<Client> { +class ReaderConfig { public: - static Napi::Object Init(Napi::Env env, Napi::Object exports); - Client(const Napi::CallbackInfo &info); - ~Client(); + ReaderConfig(const Napi::Object &readerConfig); + ~ReaderConfig(); + pulsar_reader_configuration_t *GetCReaderConfig(); + pulsar_message_id_t *GetCStartMessageId(); + std::string GetTopic(); private: - static Napi::FunctionReference constructor; - pulsar_client_t *cClient; - - Napi::Value CreateProducer(const Napi::CallbackInfo &info); - Napi::Value Subscribe(const Napi::CallbackInfo &info); - Napi::Value Close(const Napi::CallbackInfo &info); + pulsar_reader_configuration_t *cReaderConfig; + pulsar_message_id_t *cStartMessageId; + std::string topic; }; #endif diff --git a/src/addon.cc b/src/addon.cc index 050ae12..fa26ae0 100644 --- a/src/addon.cc +++ b/src/addon.cc @@ -23,6 +23,7 @@ #include "Producer.h" #include "Consumer.h" #include "Client.h" +#include "Reader.h" #include <napi.h> Napi::Object InitAll(Napi::Env env, Napi::Object exports) { @@ -31,6 +32,7 @@ Napi::Object InitAll(Napi::Env env, Napi::Object exports) { Authentication::Init(env, exports); Producer::Init(env, exports); Consumer::Init(env, exports); + Reader::Init(env, exports); return Client::Init(env, exports); } diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index ae24d86..9f15cb7 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -23,11 +23,12 @@ const Pulsar = require('../index.js'); (() => { describe('End To End', () => { - const client = new Pulsar.Client({ - serviceUrl: 'pulsar://localhost:6650', - operationTimeoutSeconds: 30, - }); test('Produce/Consume', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + const producer = await client.createProducer({ topic: 'persistent://public/default/test-end-to-end', sendTimeoutMs: 30000, @@ -63,9 +64,15 @@ const Pulsar = require('../index.js'); await producer.close(); await consumer.close(); + await client.close(); }); test('acknowledgeCumulative', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + const producer = await client.createProducer({ topic: 'persistent://public/default/acknowledgeCumulative', sendTimeoutMs: 30000, @@ -103,5 +110,51 @@ const Pulsar = require('../index.js'); await consumer.close(); await client.close(); }); + + test('Produce/Read', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + expect(client).not.toBeNull(); + + const producer = await client.createProducer({ + topic: 'persistent://public/default/test-end-to-end', + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + expect(producer).not.toBeNull(); + + const reader = await client.createReader({ + topic: 'persistent://public/default/test-end-to-end', + startMessageId: Pulsar.MessageId.latest(), + }); + expect(reader).not.toBeNull(); + + const messages = []; + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + messages.push(msg); + } + await producer.flush(); + + expect(reader.hasNext()).toBe(true); + + const results = []; + for (let i = 0; i < 10; i += 1) { + const msg = await reader.readNext(); + results.push(msg.getData().toString()); + } + expect(lodash.difference(messages, results)).toEqual([]); + + expect(reader.hasNext()).toBe(false); + + await producer.close(); + await reader.close(); + await client.close(); + }); }); })(); diff --git a/tests/reader.test.js b/tests/reader.test.js new file mode 100644 index 0000000..9c87212 --- /dev/null +++ b/tests/reader.test.js @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const Pulsar = require('../index.js'); + +(() => { + describe('Reader', () => { + describe('Create', () => { + test('No Topic', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + await expect(client.createReader({ + startMessageId: Pulsar.MessageId.earliest(), + })).rejects.toThrow('Topic is required and must be specified as a string when creating reader'); + await client.close(); + }); + + test('No Topic', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + await expect(client.createReader({ + topic: 0, + startMessageId: Pulsar.MessageId.earliest(), + })).rejects.toThrow('Topic is required and must be specified as a string when creating reader'); + await client.close(); + }); + + test('No StartMessageId', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + await expect(client.createReader({ + topic: 'persistent://public/default/topic', + })).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader'); + await client.close(); + }); + + test('Not StartMessageId as MessageId', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + await expect(client.createReader({ + topic: 'persistent://public/default/topic', + startMessageId: 'not MessageId', + })).rejects.toThrow('StartMessageId is required and must be specified as a MessageId object when creating reader'); + await client.close(); + }); + }); + }); +})();
