This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 52777a8  Adds deliverAt and deliverAfter support to Producer (#123)
52777a8 is described below

commit 52777a81a3c29c94d79d77430dcf946df5ded2cc
Author: savearray2 <[email protected]>
AuthorDate: Wed Sep 23 17:25:39 2020 +0900

    Adds deliverAt and deliverAfter support to Producer (#123)
    
    Updates Message.cc to include support for delayed delivery from the C++ 
client. (pulsar_message_set_deliver_after & pulsar_message_set_deliver_at)
    
    Co-authored-by: savearray2 <savearray2>
---
 src/Message.cc             | 12 ++++++++++
 tests/conf/standalone.conf | 10 +++++++++
 tests/end_to_end.test.js   | 56 ++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 78 insertions(+)

diff --git a/src/Message.cc b/src/Message.cc
index 5f9e409..d93264e 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -27,6 +27,8 @@ static const std::string CFG_EVENT_TIME = "eventTimestamp";
 static const std::string CFG_SEQUENCE_ID = "sequenceId";
 static const std::string CFG_PARTITION_KEY = "partitionKey";
 static const std::string CFG_REPL_CLUSTERS = "replicationClusters";
+static const std::string CFG_DELIVER_AFTER = "deliverAfter";
+static const std::string CFG_DELIVER_AT = "deliverAt";
 
 Napi::FunctionReference Message::constructor;
 
@@ -194,6 +196,16 @@ pulsar_message_t *Message::BuildMessage(Napi::Object conf) 
{
       FreeStringArray(arr, length);
     }
   }
+
+  if (conf.Has(CFG_DELIVER_AFTER) && conf.Get(CFG_DELIVER_AFTER).IsNumber()) {
+    Napi::Number deliverAfter = conf.Get(CFG_DELIVER_AFTER).ToNumber();
+    pulsar_message_set_deliver_after(cMessage, deliverAfter.Int64Value());
+  }
+
+  if (conf.Has(CFG_DELIVER_AT) && conf.Get(CFG_DELIVER_AT).IsNumber()) {
+    Napi::Number deliverAt = conf.Get(CFG_DELIVER_AT).ToNumber();
+    pulsar_message_set_deliver_at(cMessage, deliverAt.Int64Value());
+  }
   return cMessage;
 }
 
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index 57b7079..a5a31cc 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -80,6 +80,16 @@ maxUnackedMessagesPerConsumer=50000
 
 subscriptionRedeliveryTrackerEnabled=true
 
+# Whether to enable the delayed delivery for messages.
+# If disabled, messages will be immediately delivered and there will
+# be no tracking overhead.
+delayedDeliveryEnabled=true
+
+# Control the tick time for when retrying on delayed delivery,
+# affecting the accuracy of the delivery time compared to the scheduled time.
+# Default is 1 second.
+delayedDeliveryTickTimeMillis=1000
+
 ### --- Authentication --- ###
 
 # Enable authentication
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index b17e67e..888d182 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -376,6 +376,62 @@ const Pulsar = require('../index.js');
       await client.close();
     });
 
+    test('Produce-Delayed/Consume', async () => {
+      const client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+        operationTimeoutSeconds: 30,
+      });
+      expect(client).not.toBeNull();
+
+      const topic = 'persistent://public/default/produce-read-delayed';
+      const producer = await client.createProducer({
+        topic,
+        sendTimeoutMs: 30000,
+        batchingEnabled: true,
+      });
+      expect(producer).not.toBeNull();
+
+      const consumer = await client.subscribe({
+        topic,
+        subscription: 'sub',
+        subscriptionType: 'Shared',
+      });
+      expect(consumer).not.toBeNull();
+
+      const messages = [];
+      const time = (new Date()).getTime();
+      for (let i = 0; i < 5; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+          deliverAfter: 3000,
+        });
+        messages.push(msg);
+      }
+      for (let i = 5; i < 10; i += 1) {
+        const msg = `my-message-${i}`;
+        producer.send({
+          data: Buffer.from(msg),
+          deliverAt: (new Date()).getTime() + 3000,
+        });
+        messages.push(msg);
+      }
+      await producer.flush();
+
+      const results = [];
+      for (let i = 0; i < 10; i += 1) {
+        const msg = await consumer.receive();
+        results.push(msg.getData().toString());
+        consumer.acknowledge(msg);
+      }
+      expect(lodash.difference(messages, results)).toEqual([]);
+      expect((new Date()).getTime() - time).toBeGreaterThan(3000);
+
+      await producer.close();
+      await consumer.close();
+      await client.close();
+    });
+
     test('Produce/Consume/Unsubscribe', async () => {
       const client = new Pulsar.Client({
         serviceUrl: 'pulsar://localhost:6650',

Reply via email to