This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new c59f880 [Fix] Fix message listener doesn't respect receiver queue
size (#309)
c59f880 is described below
commit c59f8801a1093831ef354e02e0692c4ce5b8dcc0
Author: Zike Yang <[email protected]>
AuthorDate: Mon Apr 10 21:01:02 2023 +0800
[Fix] Fix message listener doesn't respect receiver queue size (#309)
Fixes #308
### Motivation
This bug is from the nodejs client but not the c++ client. The root cause
is that the message listener calls the user callback in an asynchronous way. It
will not wait for the result from the user callback, and then it will process
the next messages immediately.
### Modifications
* Use Napi::Promise to wait for the user callback to complete in the
message listener.
The message listener can detect whether the user function is asynchronous
or synchronous.
* If it is a synchronous function, it will behave the same as before. This
is not an issue because `Napi::Function::Call` will wait for the synchronous
function to complete.
* If it is an asynchronous function, the user function must return a
`Promise` object. We use `promise.then()` to determine when the user function
is finished. The message listener will wait in the C++ client thread. We should
not wait in the Node thread because it will block the entire Node main thread.
This way, we can also utilize the feature of `MessageListenerThreads`.
Previously, the configuration `MessageListenerThreads` did not work for the
asynchronous callback. This PR also [...]
---
src/Consumer.cc | 31 +++++++++++++++++++-----
tests/end_to_end.test.js | 63 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 88 insertions(+), 6 deletions(-)
diff --git a/src/Consumer.cc b/src/Consumer.cc
index 91d7572..f5c3e04 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -25,6 +25,7 @@
#include <pulsar/c/result.h>
#include <atomic>
#include <thread>
+#include <future>
Napi::FunctionReference Consumer::constructor;
@@ -55,21 +56,34 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
struct MessageListenerProxyData {
std::shared_ptr<pulsar_message_t> cMessage;
Consumer *consumer;
+ std::function<void(void)> callback;
- MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage,
Consumer *consumer)
- : cMessage(cMessage), consumer(consumer) {}
+ MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage,
Consumer *consumer,
+ std::function<void(void)> callback)
+ : cMessage(cMessage), consumer(consumer), callback(callback) {}
};
void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback,
MessageListenerProxyData *data) {
Napi::Object msg = Message::NewInstance({}, data->cMessage);
Consumer *consumer = data->consumer;
- delete data;
// `consumer` might be null in certain cases, segmentation fault might
happend without this null check. We
// need to handle this rare case in future.
if (consumer) {
- jsCallback.Call({msg, consumer->Value()});
+ Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
+ if (ret.IsPromise()) {
+ Napi::Promise promise = ret.As<Napi::Promise>();
+ Napi::Value thenValue = promise.Get("then");
+ if (thenValue.IsFunction()) {
+ Napi::Function then = thenValue.As<Napi::Function>();
+ Napi::Function callback =
+ Napi::Function::New(env, [data](const Napi::CallbackInfo &info) {
data->callback(); });
+ then.Call(promise, {callback});
+ return;
+ }
+ }
}
+ data->callback();
}
void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t
*rawMessage, void *ctx) {
@@ -82,9 +96,14 @@ void MessageListener(pulsar_consumer_t *rawConsumer,
pulsar_message_t *rawMessag
return;
}
- MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage,
consumer);
- listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
+ std::promise<void> promise;
+ std::future<void> future = promise.get_future();
+ std::unique_ptr<MessageListenerProxyData> dataPtr(
+ new MessageListenerProxyData(cMessage, consumer, [&promise]() {
promise.set_value(); }));
+ listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
listenerCallback->callback.Release();
+
+ future.wait();
}
void Consumer::SetCConsumer(std::shared_ptr<pulsar_consumer_t> cConsumer) {
this->cConsumer = cConsumer; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ba4c448..373ee47 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -269,6 +269,69 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('Share consumers with message listener', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+
+ const topic = 'test-shared-consumer-listener';
+ const producer = await client.createProducer({
+ topic,
+ batchingEnabled: false,
+ });
+
+ for (let i = 0; i < 100; i += 1) {
+ await producer.send(i);
+ }
+
+ let consumer1Recv = 0;
+
+ const consumer1 = await client.subscribe({
+ topic,
+ subscription: 'sub',
+ subscriptionType: 'Shared',
+ subscriptionInitialPosition: 'Earliest',
+ receiverQueueSize: 10,
+ listener: async (message, messageConsumer) => {
+ await new Promise((resolve) => setTimeout(resolve, 10));
+ consumer1Recv += 1;
+ await messageConsumer.acknowledge(message);
+ },
+ });
+
+ const consumer2 = await client.subscribe({
+ topic,
+ subscription: 'sub',
+ subscriptionType: 'Shared',
+ subscriptionInitialPosition: 'Earliest',
+ receiverQueueSize: 10,
+ });
+
+ let consumer2Recv = 0;
+ while (true) {
+ await new Promise((resolve) => setTimeout(resolve, 10));
+ try {
+ const msg = await consumer2.receive(3000);
+ consumer2Recv += 1;
+ await consumer2.acknowledge(msg);
+ } catch (err) {
+ break;
+ }
+ }
+
+ // Ensure that each consumer receives at least 1 times (greater than and
not equal)
+ // the receiver queue size messages.
+ // This way any of the consumers will not immediately empty all messages
of a topic.
+ expect(consumer1Recv).toBeGreaterThan(10);
+ expect(consumer1Recv).toBeGreaterThan(10);
+
+ await consumer1.close();
+ await consumer2.close();
+ await producer.close();
+ await client.close();
+ });
+
test('acknowledgeCumulative', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',