This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 6457aef feat: Support dead letter topic. (#335)
6457aef is described below
commit 6457aefd7c893940d3b7a32681b1a73867ab0244
Author: Baodi Shi <[email protected]>
AuthorDate: Mon Jun 26 11:17:30 2023 +0800
feat: Support dead letter topic. (#335)
---
index.d.ts | 7 ++++++
src/ConsumerConfig.cc | 26 +++++++++++++++++++
tests/consumer.test.js | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 101 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index bd47146..4e610b2 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -97,6 +97,7 @@ export interface ConsumerConfig {
schema?: SchemaInfo;
batchIndexAckEnabled?: boolean;
regexSubscriptionMode?: RegexSubscriptionMode;
+ deadLetterPolicy?: DeadLetterPolicy;
}
export class Consumer {
@@ -174,6 +175,12 @@ export interface SchemaInfo {
properties?: Record<string, string>;
}
+export interface DeadLetterPolicy {
+ deadLetterTopic: string;
+ maxRedeliverCount?: number;
+ initialSubscriptionName?: string;
+}
+
export class AuthenticationTls {
constructor(params: { certificatePath: string, privateKeyPath: string });
}
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index be646ae..2758649 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -47,6 +47,10 @@ static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE =
"maxPendingChunkedMes
static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
"autoAckOldestChunkedMessageOnQueueFull";
static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";
+static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
+static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
+static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT =
"maxRedeliverCount";
+static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME =
"initialSubscriptionName";
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
@@ -239,6 +243,28 @@ ConsumerConfig::ConsumerConfig(const Napi::Object
&consumerConfig, pulsar_messag
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
batchIndexAckEnabled);
}
+
+ if (consumerConfig.Has(CFG_DEAD_LETTER_POLICY) &&
consumerConfig.Get(CFG_DEAD_LETTER_POLICY).IsObject()) {
+ pulsar_consumer_config_dead_letter_policy_t dlq_policy{};
+ Napi::Object dlqPolicyObject =
consumerConfig.Get(CFG_DEAD_LETTER_POLICY).ToObject();
+ std::string dlq_topic_str;
+ std::string init_subscription_name;
+ if (dlqPolicyObject.Has(CFG_DLQ_POLICY_TOPIC) &&
dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).IsString()) {
+ dlq_topic_str =
dlqPolicyObject.Get(CFG_DLQ_POLICY_TOPIC).ToString().Utf8Value();
+ dlq_policy.dead_letter_topic = dlq_topic_str.c_str();
+ }
+ if (dlqPolicyObject.Has(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT) &&
+ dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).IsNumber()) {
+ dlq_policy.max_redeliver_count =
+
dlqPolicyObject.Get(CFG_DLQ_POLICY_MAX_REDELIVER_COUNT).ToNumber().Int32Value();
+ }
+ if (dlqPolicyObject.Has(CFG_DLQ_POLICY_INIT_SUB_NAME) &&
+ dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).IsString()) {
+ init_subscription_name =
dlqPolicyObject.Get(CFG_DLQ_POLICY_INIT_SUB_NAME).ToString().Utf8Value();
+ dlq_policy.initial_subscription_name = init_subscription_name.c_str();
+ }
+ pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(),
&dlq_policy);
+ }
}
ConsumerConfig::~ConsumerConfig() {
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index d976cb2..2579d1b 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -247,6 +247,74 @@ const Pulsar = require('../index.js');
await producer4.close();
await consumer.close();
});
+
+ test('Dead Letter topic', async () => {
+ const topicName = 'test-dead_letter_topic';
+ const dlqTopicName = 'test-dead_letter_topic_customize';
+ const producer = await client.createProducer({
+ topic: topicName,
+ });
+
+ const maxRedeliverCountNum = 3;
+ const consumer = await client.subscribe({
+ topic: topicName,
+ subscription: 'sub-1',
+ subscriptionType: 'Shared',
+ deadLetterPolicy: {
+ deadLetterTopic: dlqTopicName,
+ maxRedeliverCount: maxRedeliverCountNum,
+ initialSubscriptionName: 'init-sub-1-dlq',
+ },
+ nAckRedeliverTimeoutMs: 50,
+ });
+
+ // Send messages.
+ const sendNum = 5;
+ const messages = [];
+ for (let i = 0; i < sendNum; i += 1) {
+ const msg = `my-message-${i}`;
+ await producer.send({ data: Buffer.from(msg) });
+ messages.push(msg);
+ }
+
+ // Redelivery all messages maxRedeliverCountNum time.
+ let results = [];
+ for (let i = 1; i <= maxRedeliverCountNum * sendNum + sendNum; i += 1)
{
+ const msg = await consumer.receive();
+ results.push(msg);
+ if (i % sendNum === 0) {
+ results.forEach((message) => {
+ console.log(`Redeliver message ${message.getData().toString()}
${i} times ${message.getRedeliveryCount()} redeliver Count`);
+ consumer.negativeAcknowledge(message);
+ });
+ results = [];
+ }
+ }
+ // assert no more msgs.
+ await expect(consumer.receive(100)).rejects.toThrow(
+ 'Failed to receive message: TimeOut',
+ );
+
+ const dlqConsumer = await client.subscribe({
+ topic: dlqTopicName,
+ subscription: 'sub-1',
+ });
+ const dlqResult = [];
+ for (let i = 0; i < sendNum; i += 1) {
+ const msg = await dlqConsumer.receive();
+ dlqResult.push(msg.getData().toString());
+ }
+ expect(dlqResult).toEqual(messages);
+
+ // assert no more msgs.
+ await expect(dlqConsumer.receive(500)).rejects.toThrow(
+ 'Failed to receive message: TimeOut',
+ );
+
+ producer.close();
+ consumer.close();
+ dlqConsumer.close();
+ });
});
});
})();