This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 1380086  feat: support batch index ack. (#332)
1380086 is described below

commit 13800863c7200107d872add71d1aa1e4e3a86f63
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jun 15 21:40:55 2023 +0800

    feat: support batch index ack. (#332)
---
 index.d.ts                 |  1 +
 src/ConsumerConfig.cc      |  8 +++++++
 tests/conf/standalone.conf |  3 +++
 tests/consumer.test.js     | 58 ++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 70 insertions(+)

diff --git a/index.d.ts b/index.d.ts
index 97462dd..b394804 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -94,6 +94,7 @@ export interface ConsumerConfig {
   maxPendingChunkedMessage?: number;
   autoAckOldestChunkedMessageOnQueueFull?: number;
   schema?: SchemaInfo;
+  batchIndexAckEnabled?: boolean;
 }
 
 export class Consumer {
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 13e1a75..69f97d9 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -45,6 +45,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = 
"cryptoFailureAction";
 static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE = 
"maxPendingChunkedMessage";
 static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
     "autoAckOldestChunkedMessageOnQueueFull";
+static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";
 
 static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
     {"Exclusive", pulsar_ConsumerExclusive},
@@ -215,6 +216,13 @@ ConsumerConfig::ConsumerConfig(const Napi::Object 
&consumerConfig, pulsar_messag
     
pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full(
         this->cConsumerConfig.get(), autoAckOldestChunkedMessageOnQueueFull);
   }
+
+  if (consumerConfig.Has(CFG_BATCH_INDEX_ACK_ENABLED) &&
+      consumerConfig.Get(CFG_BATCH_INDEX_ACK_ENABLED).IsBoolean()) {
+    bool batchIndexAckEnabled = 
consumerConfig.Get(CFG_BATCH_INDEX_ACK_ENABLED).ToBoolean();
+    
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
+                                                              
batchIndexAckEnabled);
+  }
 }
 
 ConsumerConfig::~ConsumerConfig() {
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index 5b39772..2310724 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -59,6 +59,9 @@ backlogQuotaDefaultLimitGB=10
 # Enable the deletion of inactive topics
 brokerDeleteInactiveTopicsEnabled=true
 
+# Enable batch index ACK
+acknowledgmentAtBatchIndexLevelEnabled=true
+
 # How often to check for inactive topics
 brokerDeleteInactiveTopicsFrequencySeconds=60
 
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index e41c638..828cb88 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -140,5 +140,63 @@ const Pulsar = require('../index.js');
         await expect(consumer.close()).rejects.toThrow('Failed to close 
consumer: AlreadyClosed');
       });
     });
+
+    describe('Features', () => {
+      test('Batch index ack', async () => {
+        const topicName = 'test-batch-index-ack';
+        const producer = await client.createProducer({
+          topic: topicName,
+          batchingEnabled: true,
+          batchingMaxMessages: 100,
+          batchingMaxPublishDelayMs: 10000,
+        });
+
+        let consumer = await client.subscribe({
+          topic: topicName,
+          batchIndexAckEnabled: true,
+          subscription: 'test-batch-index-ack',
+        });
+
+        // Make sure send 0~5 is a batch msg.
+        for (let i = 0; i < 5; i += 1) {
+          const msg = `my-message-${i}`;
+          console.log(msg);
+          producer.send({
+            data: Buffer.from(msg),
+          });
+        }
+        await producer.flush();
+
+        // Receive msgs and just ack 0, 1 msgs
+        const results = [];
+        for (let i = 0; i < 5; i += 1) {
+          const msg = await consumer.receive();
+          results.push(msg);
+        }
+        expect(results.length).toEqual(5);
+        for (let i = 0; i < 2; i += 1) {
+          await consumer.acknowledge(results[i]);
+          await new Promise((resolve) => setTimeout(resolve, 200));
+        }
+
+        // Restart consumer after, just receive 2~5 msg.
+        await consumer.close();
+        consumer = await client.subscribe({
+          topic: topicName,
+          batchIndexAckEnabled: true,
+          subscription: 'test-batch-index-ack',
+        });
+        const results2 = [];
+        for (let i = 2; i < 5; i += 1) {
+          const msg = await consumer.receive();
+          results2.push(msg);
+        }
+        expect(results2.length).toEqual(3);
+        // assert no more msgs.
+        await expect(consumer.receive(1000)).rejects.toThrow(
+          'Failed to receive message: TimeOut',
+        );
+      });
+    });
   });
 })();

Reply via email to