This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 6457aef  feat: Support dead letter topic. (#335)
6457aef is described below

commit 6457aefd7c893940d3b7a32681b1a73867ab0244
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jun 26 11:17:30 2023 +0800

    feat: Support dead letter topic. (#335)
---
 index.d.ts             |  7 ++++++
 src/ConsumerConfig.cc  | 26 +++++++++++++++++++
 tests/consumer.test.js | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 101 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index bd47146..4e610b2 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -97,6 +97,7 @@ export interface ConsumerConfig {
   schema?: SchemaInfo;
   batchIndexAckEnabled?: boolean;
   regexSubscriptionMode?: RegexSubscriptionMode;
+  deadLetterPolicy?: DeadLetterPolicy;
 }
 
 export class Consumer {
@@ -174,6 +175,12 @@ export interface SchemaInfo {
   properties?: Record<string, string>;
 }
 
+export interface DeadLetterPolicy {
+  deadLetterTopic: string;
+  maxRedeliverCount?: number;
+  initialSubscriptionName?: string;
+}
+
 export class AuthenticationTls {
   constructor(params: { certificatePath: string, privateKeyPath: string });
 }
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index be646ae..2758649 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -47,6 +47,10 @@ static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = 
"maxPendingChunkedMes
 static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
     "autoAckOldestChunkedMessageOnQueueFull";
 static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";
+static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
+static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
+static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = 
"maxRedeliverCount";
+static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = 
"initialSubscriptionName";
 
 static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
     {"Exclusive", pulsar_ConsumerExclusive},
@@ -239,6 +243,28 @@ ConsumerConfig::ConsumerConfig(const Napi::Object 
&consumerConfig, pulsar_messag
     
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
                                                               
batchIndexAckEnabled);
   }
+
+  if (consumerConfig.Has(CFG_DEAD_LETTER_POLICY) && 
consumerConfig.Get(CFG_DEAD_LETTER_POLICY).IsObject()) {
+    pulsar_consumer_config_dead_letter_policy_t dlq_policy{};
+    Napi::Object dlqPolicyObject = 
consumerConfig.Get(CFG_DEAD_LETTER_POLICY).ToObject();
+    std::string dlq_topic_str;
+    std::string init_subscription_name;
+    if (dlqPolicyObject.Has(CFG_DLQ_POLICY_TOPIC) && 
dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).IsString()) {
+      dlq_topic_str = 
dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).ToString().Utf8Value();
+      dlq_policy.dead_letter_topic = dlq_topic_str.c_str();
+    }
+    if (dlqPolicyObject.Has(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT) &&
+        dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).IsNumber()) {
+      dlq_policy.max_redeliver_count =
+          
dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).ToNumber().Int32Value();
+    }
+    if (dlqPolicyObject.Has(CFG_DLQ_POLICY_INIT_SUB_NAME) &&
+        dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).IsString()) {
+      init_subscription_name = 
dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).ToString().Utf8Value();
+      dlq_policy.initial_subscription_name = init_subscription_name.c_str();
+    }
+    pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), 
&dlq_policy);
+  }
 }
 
 ConsumerConfig::~ConsumerConfig() {
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index d976cb2..2579d1b 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -247,6 +247,74 @@ const Pulsar = require('../index.js');
         await producer4.close();
         await consumer.close();
       });
+
+      test('Dead Letter topic', async () => {
+        const topicName = 'test-dead_letter_topic';
+        const dlqTopicName = 'test-dead_letter_topic_customize';
+        const producer = await client.createProducer({
+          topic: topicName,
+        });
+
+        const maxRedeliverCountNum = 3;
+        const consumer = await client.subscribe({
+          topic: topicName,
+          subscription: 'sub-1',
+          subscriptionType: 'Shared',
+          deadLetterPolicy: {
+            deadLetterTopic: dlqTopicName,
+            maxRedeliverCount: maxRedeliverCountNum,
+            initialSubscriptionName: 'init-sub-1-dlq',
+          },
+          nAckRedeliverTimeoutMs: 50,
+        });
+
+        // Send messages.
+        const sendNum = 5;
+        const messages = [];
+        for (let i = 0; i < sendNum; i += 1) {
+          const msg = `my-message-${i}`;
+          await producer.send({ data: Buffer.from(msg) });
+          messages.push(msg);
+        }
+
+        // Redelivery all messages maxRedeliverCountNum time.
+        let results = [];
+        for (let i = 1; i <= maxRedeliverCountNum * sendNum + sendNum; i += 1) 
{
+          const msg = await consumer.receive();
+          results.push(msg);
+          if (i % sendNum === 0) {
+            results.forEach((message) => {
+              console.log(`Redeliver message ${message.getData().toString()} 
${i} times ${message.getRedeliveryCount()} redeliver Count`);
+              consumer.negativeAcknowledge(message);
+            });
+            results = [];
+          }
+        }
+        // assert no more msgs.
+        await expect(consumer.receive(100)).rejects.toThrow(
+          'Failed to receive message: TimeOut',
+        );
+
+        const dlqConsumer = await client.subscribe({
+          topic: dlqTopicName,
+          subscription: 'sub-1',
+        });
+        const dlqResult = [];
+        for (let i = 0; i < sendNum; i += 1) {
+          const msg = await dlqConsumer.receive();
+          dlqResult.push(msg.getData().toString());
+        }
+        expect(dlqResult).toEqual(messages);
+
+        // assert no more msgs.
+        await expect(dlqConsumer.receive(500)).rejects.toThrow(
+          'Failed to receive message: TimeOut',
+        );
+
+        producer.close();
+        consumer.close();
+        dlqConsumer.close();
+      });
     });
   });
 })();

Reply via email to