This is an automated email from the ASF dual-hosted git repository. massakam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit a945e49730ec4caf059dfb166020969dbcc69d6c Author: hrsakai <[email protected]> AuthorDate: Fri Jun 7 15:21:03 2019 +0900 Add acknowledgeCumulative / receiveWithTimeout methods --- src/Consumer.cc | 52 ++++++++++++++++++++++++++++++++++++++---------- src/Consumer.h | 3 +++ tests/end_to_end.test.js | 39 ++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 10 deletions(-) diff --git a/src/Consumer.cc b/src/Consumer.cc index db6fab4..454ba3c 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -28,13 +28,17 @@ Napi::FunctionReference Consumer::constructor; void Consumer::Init(Napi::Env env, Napi::Object exports) { Napi::HandleScope scope(env); - Napi::Function func = DefineClass(env, "Consumer", - { - InstanceMethod("receive", &Consumer::Receive), - InstanceMethod("acknowledge", &Consumer::Acknowledge), - InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), - InstanceMethod("close", &Consumer::Close), - }); + Napi::Function func = + DefineClass(env, "Consumer", + { + InstanceMethod("receive", &Consumer::Receive), + InstanceMethod("receiveWithTimeout", &Consumer::ReceiveWithTimeout), + InstanceMethod("acknowledge", &Consumer::Acknowledge), + InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), + InstanceMethod("acknowledgeCumulative", &Consumer::AcknowledgeCumulative), + InstanceMethod("acknowledgeCumulativeId", &Consumer::AcknowledgeCumulativeId), + InstanceMethod("close", &Consumer::Close), + }); constructor = Napi::Persistent(func); constructor.SuppressDestruct(); @@ -108,13 +112,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_ class ConsumerReceiveWorker : public Napi::AsyncWorker { public: - ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) + ConsumerReceiveWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer, + int64_t timeout = -1) : AsyncWorker(Napi::Function::New(deferred.Promise().Env(), [](const Napi::CallbackInfo &info) {})), deferred(deferred), - cConsumer(cConsumer) {} + cConsumer(cConsumer), + timeout(timeout) {} ~ConsumerReceiveWorker() {} void Execute() { - pulsar_result result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage)); + pulsar_result result; + if (timeout > 0) { + result = pulsar_consumer_receive_with_timeout(this->cConsumer, &(this->cMessage), timeout); + } else { + result = pulsar_consumer_receive(this->cConsumer, &(this->cMessage)); + } if (result != pulsar_result_Ok) { SetError(std::string("Failed to received message ") + pulsar_result_str(result)); @@ -130,6 +141,7 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker { Napi::Promise::Deferred deferred; pulsar_consumer_t *cConsumer; pulsar_message_t *cMessage; + int64_t timeout; }; Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { @@ -139,6 +151,14 @@ Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { return deferred.Promise(); } +Napi::Value Consumer::ReceiveWithTimeout(const Napi::CallbackInfo &info) { + Napi::Number timeout = info[0].As<Napi::Object>().ToNumber(); + Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env()); + ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value()); + wk->Queue(); + return deferred.Promise(); +} + void Consumer::Acknowledge(const Napi::CallbackInfo &info) { Napi::Object obj = info[0].As<Napi::Object>(); Message *msg = Message::Unwrap(obj); @@ -151,6 +171,18 @@ void Consumer::AcknowledgeId(const Napi::CallbackInfo &info) { pulsar_consumer_acknowledge_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); } +void Consumer::AcknowledgeCumulative(const Napi::CallbackInfo &info) { + Napi::Object obj = info[0].As<Napi::Object>(); + Message *msg = Message::Unwrap(obj); + pulsar_consumer_acknowledge_cumulative_async(this->cConsumer, msg->GetCMessage(), NULL, NULL); +} + +void Consumer::AcknowledgeCumulativeId(const Napi::CallbackInfo &info) { + Napi::Object obj = info[0].As<Napi::Object>(); + MessageId *msgId = MessageId::Unwrap(obj); + pulsar_consumer_acknowledge_cumulative_async_id(this->cConsumer, msgId->GetCMessageId(), NULL, NULL); +} + class ConsumerCloseWorker : public Napi::AsyncWorker { public: ConsumerCloseWorker(const Napi::Promise::Deferred &deferred, pulsar_consumer_t *cConsumer) diff --git a/src/Consumer.h b/src/Consumer.h index fe68124..5e8bf5b 100644 --- a/src/Consumer.h +++ b/src/Consumer.h @@ -36,8 +36,11 @@ class Consumer : public Napi::ObjectWrap<Consumer> { pulsar_consumer_t *cConsumer; Napi::Value Receive(const Napi::CallbackInfo &info); + Napi::Value ReceiveWithTimeout(const Napi::CallbackInfo &info); void Acknowledge(const Napi::CallbackInfo &info); void AcknowledgeId(const Napi::CallbackInfo &info); + void AcknowledgeCumulative(const Napi::CallbackInfo &info); + void AcknowledgeCumulativeId(const Napi::CallbackInfo &info); Napi::Value Close(const Napi::CallbackInfo &info); }; diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 9d5ed49..ae24d86 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -40,6 +40,7 @@ const Pulsar = require('../index.js'); subscription: 'sub1', ackTimeoutMs: 10000, }); + expect(consumer).not.toBeNull(); const messages = []; @@ -62,6 +63,44 @@ const Pulsar = require('../index.js'); await producer.close(); await consumer.close(); + }); + + test('acknowledgeCumulative', async () => { + const producer = await client.createProducer({ + topic: 'persistent://public/default/acknowledgeCumulative', + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + expect(producer).not.toBeNull(); + + const consumer = await client.subscribe({ + topic: 'persistent://public/default/acknowledgeCumulative', + subscription: 'sub1', + ackTimeoutMs: 10000, + }); + expect(consumer).not.toBeNull(); + + const messages = []; + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + messages.push(msg); + } + await producer.flush(); + + for (let i = 0; i < 10; i += 1) { + const msg = await consumer.receive(); + if (i === 9) { + consumer.acknowledgeCumulative(msg); + } + } + + await expect(consumer.receiveWithTimeout(1000)).rejects.toThrow('Failed to received message TimeOut'); + + await producer.close(); + await consumer.close(); await client.close(); }); });
