This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new c59f880  [Fix] Fix message listener doesn't respect receiver queue 
size (#309)
c59f880 is described below

commit c59f8801a1093831ef354e02e0692c4ce5b8dcc0
Author: Zike Yang <[email protected]>
AuthorDate: Mon Apr 10 21:01:02 2023 +0800

    [Fix] Fix message listener doesn't respect receiver queue size (#309)
    
    Fixes #308
    
    ### Motivation
    
    This bug is from the nodejs client but not the c++ client. The root cause 
is that the message listener calls the user callback in an asynchronous way. It 
will not wait for the result from the user callback, and then it will process 
the next messages immediately.
    
    ### Modifications
    
    * Use Napi::Promise to wait for the user callback to complete in the 
message listener.
    
    The message listener can detect whether the user function is asynchronous 
or synchronous.
    * If it is a synchronous function, it will behave the same as before. This 
is not an issue because `Napi::Function::Call` will wait for the synchronous 
function to complete.
    * If it is an asynchronous function, the user function must return a 
`Promise` object. We use `promise.then()` to determine when the user function 
is finished. The message listener will wait in the C++ client thread. We should 
not wait in the Node thread because it will block the entire Node main thread. 
This way, we can also utilize the feature of `MessageListenerThreads`. 
Previously, the configuration `MessageListenerThreads` did not work for the 
asynchronous callback. This PR also  [...]
---
 src/Consumer.cc          | 31 +++++++++++++++++++-----
 tests/end_to_end.test.js | 63 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 88 insertions(+), 6 deletions(-)

diff --git a/src/Consumer.cc b/src/Consumer.cc
index 91d7572..f5c3e04 100644
--- a/src/Consumer.cc
+++ b/src/Consumer.cc
@@ -25,6 +25,7 @@
 #include <pulsar/c/result.h>
 #include <atomic>
 #include <thread>
+#include <future>
 
 Napi::FunctionReference Consumer::constructor;
 
@@ -55,21 +56,34 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
 struct MessageListenerProxyData {
   std::shared_ptr<pulsar_message_t> cMessage;
   Consumer *consumer;
+  std::function<void(void)> callback;
 
-  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, 
Consumer *consumer)
-      : cMessage(cMessage), consumer(consumer) {}
+  MessageListenerProxyData(std::shared_ptr<pulsar_message_t> cMessage, 
Consumer *consumer,
+                           std::function<void(void)> callback)
+      : cMessage(cMessage), consumer(consumer), callback(callback) {}
 };
 
 void MessageListenerProxy(Napi::Env env, Napi::Function jsCallback, 
MessageListenerProxyData *data) {
   Napi::Object msg = Message::NewInstance({}, data->cMessage);
   Consumer *consumer = data->consumer;
-  delete data;
 
   // `consumer` might be null in certain cases, segmentation fault might 
happend without this null check. We
   // need to handle this rare case in future.
   if (consumer) {
-    jsCallback.Call({msg, consumer->Value()});
+    Napi::Value ret = jsCallback.Call({msg, consumer->Value()});
+    if (ret.IsPromise()) {
+      Napi::Promise promise = ret.As<Napi::Promise>();
+      Napi::Value thenValue = promise.Get("then");
+      if (thenValue.IsFunction()) {
+        Napi::Function then = thenValue.As<Napi::Function>();
+        Napi::Function callback =
+            Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { 
data->callback(); });
+        then.Call(promise, {callback});
+        return;
+      }
+    }
   }
+  data->callback();
 }
 
 void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t 
*rawMessage, void *ctx) {
@@ -82,9 +96,14 @@ void MessageListener(pulsar_consumer_t *rawConsumer, 
pulsar_message_t *rawMessag
     return;
   }
 
-  MessageListenerProxyData *dataPtr = new MessageListenerProxyData(cMessage, 
consumer);
-  listenerCallback->callback.BlockingCall(dataPtr, MessageListenerProxy);
+  std::promise<void> promise;
+  std::future<void> future = promise.get_future();
+  std::unique_ptr<MessageListenerProxyData> dataPtr(
+      new MessageListenerProxyData(cMessage, consumer, [&promise]() { 
promise.set_value(); }));
+  listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
   listenerCallback->callback.Release();
+
+  future.wait();
 }
 
 void Consumer::SetCConsumer(std::shared_ptr<pulsar_consumer_t> cConsumer) { 
this->cConsumer = cConsumer; }
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index ba4c448..373ee47 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -269,6 +269,69 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Share consumers with message listener', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+
+      const topic = 'test-shared-consumer-listener';
+      const producer = await client.createProducer({
+        topic,
+        batchingEnabled: false,
+      });
+
+      for (let i = 0; i < 100; i += 1) {
+        await producer.send(i);
+      }
+
+      let consumer1Recv = 0;
+
+      const consumer1 = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        receiverQueueSize: 10,
+        listener: async (message, messageConsumer) => {
+          await new Promise((resolve) => setTimeout(resolve, 10));
+          consumer1Recv += 1;
+          await messageConsumer.acknowledge(message);
+        },
+      });
+
+      const consumer2 = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+        subscriptionInitialPosition: 'Earliest',
+        receiverQueueSize: 10,
+      });
+
+      let consumer2Recv = 0;
+      while (true) {
+        await new Promise((resolve) => setTimeout(resolve, 10));
+        try {
+          const msg = await consumer2.receive(3000);
+          consumer2Recv += 1;
+          await consumer2.acknowledge(msg);
+        } catch (err) {
+          break;
+        }
+      }
+
+      // Ensure that each consumer receives at least 1 times (greater than and 
not equal)
+      // the receiver queue size messages.
+      // This way any of the consumers will not immediately empty all messages 
of a topic.
+      expect(consumer1Recv).toBeGreaterThan(10);
+      expect(consumer1Recv).toBeGreaterThan(10);
+
+      await consumer1.close();
+      await consumer2.close();
+      await producer.close();
+      await client.close();
+    });
+
     test('acknowledgeCumulative', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',

Reply via email to