This is an automated email from the ASF dual-hosted git repository. baodi pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
commit c5fccfcc97f147f01fdc1c81fadf1dec8e2ed502 Author: Zike Yang <[email protected]> AuthorDate: Tue Apr 11 21:21:53 2023 +0800 [Fix] Fix reader message listener doesn't respect receiver queue size (#316) (cherry picked from commit f38321aaf480638c2a99b4afa5f0c6d24dce4973) --- src/Reader.cc | 32 +++++++++++++++++++++----- tests/end_to_end.test.js | 58 ++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/src/Reader.cc b/src/Reader.cc index 74bd4b2..5b9d8a7 100644 --- a/src/Reader.cc +++ b/src/Reader.cc @@ -26,6 +26,7 @@ #include <pulsar/c/reader.h> #include <atomic> #include <thread> +#include <future> Napi::FunctionReference Reader::constructor; @@ -49,17 +50,30 @@ void Reader::Init(Napi::Env env, Napi::Object exports) { struct ReaderListenerProxyData { std::shared_ptr<pulsar_message_t> cMessage; Reader *reader; + std::function<void(void)> callback; - ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader) - : cMessage(cMessage), reader(reader) {} + ReaderListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, Reader *reader, + std::function<void(void)> callback) + : cMessage(cMessage), reader(reader), callback(callback) {} }; void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) { Napi::Object msg = Message::NewInstance({}, data->cMessage); Reader *reader = data->reader; - delete data; - jsCallback.Call({msg, reader->Value()}); + Napi::Value ret = jsCallback.Call({msg, reader->Value()}); + if (ret.IsPromise()) { + Napi::Promise promise = ret.As<Napi::Promise>(); + Napi::Value thenValue = promise.Get("then"); + if (thenValue.IsFunction()) { + Napi::Function then = thenValue.As<Napi::Function>(); + Napi::Function callback = + Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); }); + then.Call(promise, {callback}); + return; + } + } + data->callback(); } void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, void *ctx) { @@ -69,9 +83,15 @@ void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, vo if (readerListenerCallback->callback.Acquire() != napi_ok) { return; } - ReaderListenerProxyData *dataPtr = new ReaderListenerProxyData(cMessage, reader); - readerListenerCallback->callback.BlockingCall(dataPtr, ReaderListenerProxy); + + std::promise<void> promise; + std::future<void> future = promise.get_future(); + std::unique_ptr<ReaderListenerProxyData> dataPtr( + new ReaderListenerProxyData(cMessage, reader, [&promise]() { promise.set_value(); })); + readerListenerCallback->callback.BlockingCall(dataPtr.get(), ReaderListenerProxy); readerListenerCallback->callback.Release(); + + future.wait(); } void Reader::SetCReader(std::shared_ptr<pulsar_reader_t> cReader) { this->cReader = cReader; } diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 373ee47..7a5621d 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -310,9 +310,9 @@ const Pulsar = require('../index.js'); let consumer2Recv = 0; while (true) { - await new Promise((resolve) => setTimeout(resolve, 10)); try { const msg = await consumer2.receive(3000); + await new Promise((resolve) => setTimeout(resolve, 10)); consumer2Recv += 1; await consumer2.acknowledge(msg); } catch (err) { @@ -324,7 +324,7 @@ const Pulsar = require('../index.js'); // the receiver queue size messages. // This way any of the consumers will not immediately empty all messages of a topic. expect(consumer1Recv).toBeGreaterThan(10); - expect(consumer1Recv).toBeGreaterThan(10); + expect(consumer2Recv).toBeGreaterThan(10); await consumer1.close(); await consumer2.close(); @@ -332,6 +332,60 @@ const Pulsar = require('../index.js'); await client.close(); }); + test('Share readers with message listener', async () => { + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + operationTimeoutSeconds: 30, + }); + + const topic = 'test-shared-reader-listener'; + const producer = await client.createProducer({ + topic, + batchingEnabled: false, + }); + + for (let i = 0; i < 100; i += 1) { + await producer.send(i); + } + + let reader1Recv = 0; + + const reader1 = await client.createReader({ + topic, + startMessageId: Pulsar.MessageId.earliest(), + receiverQueueSize: 10, + listener: async (message, reader) => { + await new Promise((resolve) => setTimeout(resolve, 10)); + reader1Recv += 1; + }, + }); + + const reader2 = await client.createReader({ + topic, + startMessageId: Pulsar.MessageId.earliest(), + receiverQueueSize: 10, + }); + + let reader2Recv = 0; + + while (reader2.hasNext()) { + await reader2.readNext(); + await new Promise((resolve) => setTimeout(resolve, 10)); + reader2Recv += 1; + } + + // Ensure that each reader receives at least 1 times (greater than and not equal) + // the receiver queue size messages. + // This way any of the readers will not immediately empty all messages of a topic. + expect(reader1Recv).toBeGreaterThan(10); + expect(reader2Recv).toBeGreaterThan(10); + + await reader1.close(); + await reader2.close(); + await producer.close(); + await client.close(); + }); + test('acknowledgeCumulative', async () => { const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650',
