This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 52777a8 Adds deliverAt and deliverAfter support to Producer (#123)
52777a8 is described below
commit 52777a81a3c29c94d79d77430dcf946df5ded2cc
Author: savearray2 <[email protected]>
AuthorDate: Wed Sep 23 17:25:39 2020 +0900
Adds deliverAt and deliverAfter support to Producer (#123)
Updates Message.cc to include support for delayed delivery from the C++
client. (pulsar_message_set_deliver_after & pulsar_message_set_deliver_at)
Co-authored-by: savearray2 <savearray2>
---
src/Message.cc | 12 ++++++++++
tests/conf/standalone.conf | 10 +++++++++
tests/end_to_end.test.js | 56 ++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 78 insertions(+)
diff --git a/src/Message.cc b/src/Message.cc
index 5f9e409..d93264e 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -27,6 +27,8 @@ static const std::string CFG_EVENT_TIME = "eventTimestamp";
static const std::string CFG_SEQUENCE_ID = "sequenceId";
static const std::string CFG_PARTITION_KEY = "partitionKey";
static const std::string CFG_REPL_CLUSTERS = "replicationClusters";
+static const std::string CFG_DELIVER_AFTER = "deliverAfter";
+static const std::string CFG_DELIVER_AT = "deliverAt";
Napi::FunctionReference Message::constructor;
@@ -194,6 +196,16 @@ pulsar_message_t *Message::BuildMessage(Napi::Object conf)
{
FreeStringArray(arr, length);
}
}
+
+ if (conf.Has(CFG_DELIVER_AFTER) && conf.Get(CFG_DELIVER_AFTER).IsNumber()) {
+ Napi::Number deliverAfter = conf.Get(CFG_DELIVER_AFTER).ToNumber();
+ pulsar_message_set_deliver_after(cMessage, deliverAfter.Int64Value());
+ }
+
+ if (conf.Has(CFG_DELIVER_AT) && conf.Get(CFG_DELIVER_AT).IsNumber()) {
+ Napi::Number deliverAt = conf.Get(CFG_DELIVER_AT).ToNumber();
+ pulsar_message_set_deliver_at(cMessage, deliverAt.Int64Value());
+ }
return cMessage;
}
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index 57b7079..a5a31cc 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -80,6 +80,16 @@ maxUnackedMessagesPerConsumer=50000
subscriptionRedeliveryTrackerEnabled=true
+# Whether to enable the delayed delivery for messages.
+# If disabled, messages will be immediately delivered and there will
+# be no tracking overhead.
+delayedDeliveryEnabled=true
+
+# Control the tick time for when retrying on delayed delivery,
+# affecting the accuracy of the delivery time compared to the scheduled time.
+# Default is 1 second.
+delayedDeliveryTickTimeMillis=1000
+
### --- Authentication --- ###
# Enable authentication
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index b17e67e..888d182 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -376,6 +376,62 @@ const Pulsar = require('../index.js');
await client.close();
});
+ test('Produce-Delayed/Consume', async () => {
+ const client = new Pulsar.Client({
+ serviceUrl: 'pulsar://localhost:6650',
+ operationTimeoutSeconds: 30,
+ });
+ expect(client).not.toBeNull();
+
+ const topic = 'persistent://public/default/produce-read-delayed';
+ const producer = await client.createProducer({
+ topic,
+ sendTimeoutMs: 30000,
+ batchingEnabled: true,
+ });
+ expect(producer).not.toBeNull();
+
+ const consumer = await client.subscribe({
+ topic,
+ subscription: 'sub',
+ subscriptionType: 'Shared',
+ });
+ expect(consumer).not.toBeNull();
+
+ const messages = [];
+ const time = (new Date()).getTime();
+ for (let i = 0; i < 5; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ deliverAfter: 3000,
+ });
+ messages.push(msg);
+ }
+ for (let i = 5; i < 10; i += 1) {
+ const msg = `my-message-${i}`;
+ producer.send({
+ data: Buffer.from(msg),
+ deliverAt: (new Date()).getTime() + 3000,
+ });
+ messages.push(msg);
+ }
+ await producer.flush();
+
+ const results = [];
+ for (let i = 0; i < 10; i += 1) {
+ const msg = await consumer.receive();
+ results.push(msg.getData().toString());
+ consumer.acknowledge(msg);
+ }
+ expect(lodash.difference(messages, results)).toEqual([]);
+ expect((new Date()).getTime() - time).toBeGreaterThan(3000);
+
+ await producer.close();
+ await consumer.close();
+ await client.close();
+ });
+
test('Produce/Consume/Unsubscribe', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',