This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 1380086 feat: support batch index ack. (#332)
1380086 is described below
commit 13800863c7200107d872add71d1aa1e4e3a86f63
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jun 15 21:40:55 2023 +0800
feat: support batch index ack. (#332)
---
index.d.ts | 1 +
src/ConsumerConfig.cc | 8 +++++++
tests/conf/standalone.conf | 3 +++
tests/consumer.test.js | 58 ++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 70 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index 97462dd..b394804 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -94,6 +94,7 @@ export interface ConsumerConfig {
maxPendingChunkedMessage?: number;
autoAckOldestChunkedMessageOnQueueFull?: number;
schema?: SchemaInfo;
+ batchIndexAckEnabled?: boolean;
}
export class Consumer {
diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc
index 13e1a75..69f97d9 100644
--- a/src/ConsumerConfig.cc
+++ b/src/ConsumerConfig.cc
@@ -45,6 +45,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION =
"cryptoFailureAction";
static const std::string CFG_MAX_PENDING_CHUNKED_MESSAGE =
"maxPendingChunkedMessage";
static const std::string CFG_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
"autoAckOldestChunkedMessageOnQueueFull";
+static const std::string CFG_BATCH_INDEX_ACK_ENABLED = "batchIndexAckEnabled";
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
@@ -215,6 +216,13 @@ ConsumerConfig::ConsumerConfig(const Napi::Object
&consumerConfig, pulsar_messag
pulsar_consumer_configuration_set_auto_ack_oldest_chunked_message_on_queue_full(
this->cConsumerConfig.get(), autoAckOldestChunkedMessageOnQueueFull);
}
+
+ if (consumerConfig.Has(CFG_BATCH_INDEX_ACK_ENABLED) &&
+ consumerConfig.Get(CFG_BATCH_INDEX_ACK_ENABLED).IsBoolean()) {
+ bool batchIndexAckEnabled =
consumerConfig.Get(CFG_BATCH_INDEX_ACK_ENABLED).ToBoolean();
+
pulsar_consumer_configuration_set_batch_index_ack_enabled(this->cConsumerConfig.get(),
+
batchIndexAckEnabled);
+ }
}
ConsumerConfig::~ConsumerConfig() {
diff --git a/tests/conf/standalone.conf b/tests/conf/standalone.conf
index 5b39772..2310724 100755
--- a/tests/conf/standalone.conf
+++ b/tests/conf/standalone.conf
@@ -59,6 +59,9 @@ backlogQuotaDefaultLimitGB=10
# Enable the deletion of inactive topics
brokerDeleteInactiveTopicsEnabled=true
+# Enable batch index ACK
+acknowledgmentAtBatchIndexLevelEnabled=true
+
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60
diff --git a/tests/consumer.test.js b/tests/consumer.test.js
index e41c638..828cb88 100644
--- a/tests/consumer.test.js
+++ b/tests/consumer.test.js
@@ -140,5 +140,63 @@ const Pulsar = require('../index.js');
await expect(consumer.close()).rejects.toThrow('Failed to close
consumer: AlreadyClosed');
});
});
+
+ describe('Features', () => {
+ test('Batch index ack', async () => {
+ const topicName = 'test-batch-index-ack';
+ const producer = await client.createProducer({
+ topic: topicName,
+ batchingEnabled: true,
+ batchingMaxMessages: 100,
+ batchingMaxPublishDelayMs: 10000,
+ });
+
+ let consumer = await client.subscribe({
+ topic: topicName,
+ batchIndexAckEnabled: true,
+ subscription: 'test-batch-index-ack',
+ });
+
+ // Make sure send 0~5 is a batch msg.
+ for (let i = 0; i < 5; i += 1) {
+ const msg = `my-message-${i}`;
+ console.log(msg);
+ producer.send({
+ data: Buffer.from(msg),
+ });
+ }
+ await producer.flush();
+
+ // Receive msgs and just ack 0, 1 msgs
+ const results = [];
+ for (let i = 0; i < 5; i += 1) {
+ const msg = await consumer.receive();
+ results.push(msg);
+ }
+ expect(results.length).toEqual(5);
+ for (let i = 0; i < 2; i += 1) {
+ await consumer.acknowledge(results[i]);
+ await new Promise((resolve) => setTimeout(resolve, 200));
+ }
+
+ // Restart consumer after, just receive 2~5 msg.
+ await consumer.close();
+ consumer = await client.subscribe({
+ topic: topicName,
+ batchIndexAckEnabled: true,
+ subscription: 'test-batch-index-ack',
+ });
+ const results2 = [];
+ for (let i = 2; i < 5; i += 1) {
+ const msg = await consumer.receive();
+ results2.push(msg);
+ }
+ expect(results2.length).toEqual(3);
+ // assert no more msgs.
+ await expect(consumer.receive(1000)).rejects.toThrow(
+ 'Failed to receive message: TimeOut',
+ );
+ });
+ });
});
})();