This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 0d6fb72 Support set batchingMaxAllowedSizeInBytes on producer batch
configuration (#436)
0d6fb72 is described below
commit 0d6fb726346d8f5f21f1753facc1178bf24a6326
Author: Baodi Shi <[email protected]>
AuthorDate: Sun Sep 28 10:36:20 2025 +0800
Support set batchingMaxAllowedSizeInBytes on producer batch configuration
(#436)
---
index.d.ts | 1 +
src/ProducerConfig.cc | 11 ++++++++
tests/producer.test.js | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 86 insertions(+)
diff --git a/index.d.ts b/index.d.ts
index 72c89af..e9bf8e8 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -69,6 +69,7 @@ export interface ProducerConfig {
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
+ batchingMaxAllowedSizeInBytes?: number;
}
export class Producer {
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 3889120..83afb9c 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -40,6 +40,7 @@ static const std::string CFG_COMPRESS_TYPE =
"compressionType";
static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
+static const std::string CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES =
"batchingMaxAllowedSizeInBytes";
static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PROPS = "properties";
static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
@@ -201,6 +202,16 @@ ProducerConfig::ProducerConfig(const Napi::Object&
producerConfig) : topic("") {
}
}
+ if (producerConfig.Has(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES) &&
+ producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).IsNumber()) {
+ int64_t batchingMaxAllowedSizeInBytes =
+
producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).ToNumber().Int64Value();
+ if (batchingMaxAllowedSizeInBytes > 0) {
+ pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+ this->cProducerConfig.get(), (unsigned
long)batchingMaxAllowedSizeInBytes);
+ }
+ }
+
if (producerConfig.Has(CFG_SCHEMA) &&
producerConfig.Get(CFG_SCHEMA).IsObject()) {
SchemaInfo* schemaInfo = new
SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
schemaInfo->SetProducerSchema(this->cProducerConfig);
diff --git a/tests/producer.test.js b/tests/producer.test.js
index d094505..061d827 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -239,5 +239,79 @@ function getPartition(msgId) {
expect(partitions.size).toBe(1);
}, 30000);
});
+ describe('Batching', () => {
+ function getBatchIndex(msgId) {
+ const parts = msgId.toString().split(':');
+ if (parts.length > 3) {
+ return Number(parts[3]);
+ }
+ return -1;
+ }
+
+ test('should batch messages based on max allowed size in bytes', async
() => {
+ const topicName =
`persistent://public/default/test-batch-size-in-bytes-${Date.now()}`;
+ const subName = 'subscription-name';
+ const numOfMessages = 30;
+ const prefix = '12345678'; // 8 bytes message prefix
+
+ let producer;
+ let consumer;
+
+ try {
+ // 1. Setup Producer with batching enabled and size limit
+ producer = await client.createProducer({
+ topic: topicName,
+ compressionType: 'LZ4',
+ batchingEnabled: true,
+ batchingMaxMessages: 10000,
+ batchingMaxAllowedSizeInBytes: 20,
+ });
+
+ // 2. Setup Consumer
+ consumer = await client.subscribe({
+ topic: topicName,
+ subscription: subName,
+ });
+
+ // 3. Send messages asynchronously
+ const sendPromises = [];
+ for (let i = 0; i < numOfMessages; i += 1) {
+ const messageContent = prefix + i;
+ const msg = {
+ data: Buffer.from(messageContent),
+ properties: { msgIndex: String(i) },
+ };
+ sendPromises.push(producer.send(msg));
+ }
+ await producer.flush();
+ await Promise.all(sendPromises);
+
+ // 4. Receive messages and run assertions
+ let receivedCount = 0;
+ for (let i = 0; i < numOfMessages; i += 1) {
+ const receivedMsg = await consumer.receive(5000);
+ const expectedMessageContent = prefix + i;
+
+ // Assert that batchIndex is 0 or 1, since batch size should be 2
+ const batchIndex = getBatchIndex(receivedMsg.getMessageId());
+ expect(batchIndex).toBeLessThan(2);
+
+ // Assert message properties and content
+ expect(receivedMsg.getProperties().msgIndex).toBe(String(i));
+
expect(receivedMsg.getData().toString()).toBe(expectedMessageContent);
+
+ await consumer.acknowledge(receivedMsg);
+ receivedCount += 1;
+ }
+
+ // 5. Final check on the number of consumed messages
+ expect(receivedCount).toBe(numOfMessages);
+ } finally {
+ // 6. Cleanup
+ if (producer) await producer.close();
+ if (consumer) await consumer.close();
+ }
+ }, 30000);
+ });
});
})();