This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push:
new 5e76c68 Support custom message router for partitioned topic producer
(#435)
5e76c68 is described below
commit 5e76c6861f768775368306b8de48a5a44b5c4829
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Sep 28 09:40:10 2025 +0800
Support custom message router for partitioned topic producer (#435)
* Support custom message router for partitioned topic producer
* Add test
* Fix lint
* simplify code
* Add tests for exceptional cases
* Fix router signature
* Fix interface
* Fix tests
* Add documents
* Test conflicts of messageRoutingMode and messageRouter
---
index.d.ts | 18 +++++++++++
src/Producer.cc | 4 ++-
src/Producer.h | 5 +++
src/ProducerConfig.cc | 37 ++++++++++++++++++++++
src/ProducerConfig.h | 7 +++++
tests/client.test.js | 10 +++---
tests/http_utils.js | 16 +++++++++-
tests/producer.test.js | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++
tests/reader.test.js | 4 +--
9 files changed, 175 insertions(+), 9 deletions(-)
diff --git a/index.d.ts b/index.d.ts
index 270c5e6..72c89af 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -68,6 +68,7 @@ export interface ProducerConfig {
schema?: SchemaInfo;
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
+ messageRouter?: MessageRouter;
}
export class Producer {
@@ -176,6 +177,23 @@ export class MessageId {
toString(): string;
}
+export interface TopicMetadata {
+ numPartitions: number;
+}
+
+/**
+ * @callback MessageRouter
+ * @description When producing messages to a partitioned topic, this router is
used to select the
+ * target partition for each message. The router only works when the
`messageRoutingMode` is set to
+ * `CustomPartition`. Please note that `getTopicName()` cannot be called on
the `message`, otherwise
+ * the behavior will be undefined because the topic is unknown before sending
the message.
+ * @param message The message to be routed.
+ * @param topicMetadata Metadata for the partitioned topic the message is
being routed to.
+ * @returns {number} The index of the target partition (must be a number
between 0 and
+ * topicMetadata.numPartitions - 1).
+ */
+export type MessageRouter = (message: Message, topicMetadata: TopicMetadata)
=> number;
+
export interface SchemaInfo {
schemaType: SchemaType;
name?: string;
diff --git a/src/Producer.cc b/src/Producer.cc
index c827f9f..5874b22 100644
--- a/src/Producer.cc
+++ b/src/Producer.cc
@@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo
&info, std::shared_pt
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
+ auto producerConfig = instanceContext->producerConfig;
delete instanceContext;
if (result != pulsar_result_Ok) {
@@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo
&info, std::shared_pt
std::shared_ptr<pulsar_producer_t> cProducer(rawProducer,
pulsar_producer_free);
- deferred->Resolve([cProducer](const Napi::Env env) {
+ deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
+ producer->producerConfig = producerConfig;
return obj;
});
},
diff --git a/src/Producer.h b/src/Producer.h
index 70c2342..98849ed 100644
--- a/src/Producer.h
+++ b/src/Producer.h
@@ -23,6 +23,8 @@
#include <napi.h>
#include <pulsar/c/client.h>
#include <pulsar/c/producer.h>
+#include <memory>
+#include "ProducerConfig.h"
class Producer : public Napi::ObjectWrap<Producer> {
public:
@@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap<Producer> {
private:
std::shared_ptr<pulsar_producer_t> cProducer;
+ // Extend the lifetime of the producer config since it's env and router
function could be used when sending
+ // messages
+ std::shared_ptr<ProducerConfig> producerConfig;
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 2c704bf..3889120 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -18,8 +18,14 @@
*/
#include "SchemaInfo.h"
#include "ProducerConfig.h"
+#include "Message.h"
+#include <cstdio>
#include <map>
+#include "napi-inl.h"
+#include "napi.h"
#include "pulsar/ProducerConfiguration.h"
+#include "pulsar/c/message.h"
+#include "pulsar/c/message_router.h"
static const std::string CFG_TOPIC = "topic";
static const std::string CFG_PRODUCER_NAME = "producerName";
@@ -42,6 +48,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION =
"cryptoFailureAction";
static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
static const std::string CFG_ACCESS_MODE = "accessMode";
static const std::string CFG_BATCHING_TYPE = "batchingType";
+static const std::string CFG_MESSAGE_ROUTER = "messageRouter";
struct _pulsar_producer_configuration {
pulsar::ProducerConfiguration conf;
@@ -82,6 +89,25 @@ static std::map<std::string,
pulsar::ProducerConfiguration::BatchingType> PRODUC
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
};
+static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t*
metadata, void* ctx) {
+ auto router = static_cast<Napi::FunctionReference*>(ctx);
+ const auto& env = router->Env();
+ auto jsMessage = Message::NewInstance(Napi::Object::New(env),
+ std::shared_ptr<pulsar_message_t>(msg,
[](pulsar_message_t*) {}));
+ int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata);
+
+ Napi::Object jsTopicMetadata = Napi::Object::New(env);
+ jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));
+
+ try {
+ return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value();
+ } catch (const Napi::Error& e) {
+ // TODO: how to handle the error properly? For now, return an invalid
partition to fail the send
+ fprintf(stderr, "Error when calling messageRouter: %s\n", e.what());
+ return numPartitions;
+ }
+}
+
ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("")
{
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
pulsar_producer_configuration_create(),
pulsar_producer_configuration_free);
@@ -131,8 +157,10 @@ ProducerConfig::ProducerConfig(const Napi::Object&
producerConfig) : topic("") {
pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig.get(),
blockIfQueueFull);
}
+ bool useCustomPartition = false;
if (producerConfig.Has(CFG_ROUTING_MODE) &&
producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
std::string messageRoutingMode =
producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
+ useCustomPartition = (messageRoutingMode == "CustomPartition");
if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(),
MESSAGE_ROUTING_MODE.at(messageRoutingMode));
@@ -224,6 +252,15 @@ ProducerConfig::ProducerConfig(const Napi::Object&
producerConfig) : topic("") {
if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
}
+
+ if (useCustomPartition && producerConfig.Has(CFG_MESSAGE_ROUTER)) {
+ auto value = producerConfig.Get(CFG_MESSAGE_ROUTER);
+ if (value.IsFunction()) {
+ messageRouter = Napi::Persistent(value.As<Napi::Function>());
+
pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(),
choosePartition,
+ &messageRouter);
+ }
+ }
}
ProducerConfig::~ProducerConfig() {}
diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h
index 3d49557..0437768 100644
--- a/src/ProducerConfig.h
+++ b/src/ProducerConfig.h
@@ -22,6 +22,11 @@
#include <napi.h>
#include <pulsar/c/producer_configuration.h>
+#include <memory>
+
+struct MessageRouterContext {
+ Napi::FunctionReference messageRouter;
+};
class ProducerConfig {
public:
@@ -33,6 +38,8 @@ class ProducerConfig {
private:
std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
std::string topic;
+ std::unique_ptr<MessageRouterContext> routerContext;
+ Napi::FunctionReference messageRouter;
};
#endif
diff --git a/tests/client.test.js b/tests/client.test.js
index f7bc6d5..d97763e 100644
--- a/tests/client.test.js
+++ b/tests/client.test.js
@@ -17,7 +17,7 @@
* under the License.
*/
-const httpRequest = require('./http_utils');
+const httpUtils = require('./http_utils');
const Pulsar = require('../index');
const baseUrl = 'http://localhost:8080';
@@ -74,7 +74,7 @@ const baseUrl = 'http://localhost:8080';
const nonPartitionedTopicName = 'test-non-partitioned-topic';
const nonPartitionedTopic =
`persistent://public/default/${nonPartitionedTopicName}`;
const nonPartitionedTopicAdminURL =
`${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
- const createNonPartitionedTopicRes = await httpRequest(
+ const createNonPartitionedTopicRes = await httpUtils.request(
nonPartitionedTopicAdminURL, {
headers: {
'Content-Type': 'application/json',
@@ -91,7 +91,7 @@ const baseUrl = 'http://localhost:8080';
const partitionedTopicName = 'test-partitioned-topic-1';
const partitionedTopic =
`persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL =
`${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
- const createPartitionedTopicRes = await httpRequest(
+ const createPartitionedTopicRes = await httpUtils.request(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',
@@ -110,9 +110,9 @@ const baseUrl = 'http://localhost:8080';
'persistent://public/default/test-partitioned-topic-1-partition-3',
]);
- const deleteNonPartitionedTopicRes = await
httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' });
+ const deleteNonPartitionedTopicRes = await
httpUtils.request(nonPartitionedTopicAdminURL, { method: 'DELETE' });
expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
- const deletePartitionedTopicRes = await
httpRequest(partitionedTopicAdminURL, { method: 'DELETE' });
+ const deletePartitionedTopicRes = await
httpUtils.request(partitionedTopicAdminURL, { method: 'DELETE' });
expect(deletePartitionedTopicRes.statusCode).toBe(204);
await client.close();
diff --git a/tests/http_utils.js b/tests/http_utils.js
index 8fa94f2..81d09de 100644
--- a/tests/http_utils.js
+++ b/tests/http_utils.js
@@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new
Promise((resolve, r
req.end();
});
-module.exports = request;
+function createPartitionedTopic(topic, numPartitions) {
+ const url =
`http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`;
+ return request(url, {
+ headers: {
+ 'Content-Type': 'application/json',
+ },
+ data: numPartitions,
+ method: 'PUT',
+ });
+}
+
+module.exports = {
+ createPartitionedTopic,
+ request,
+};
diff --git a/tests/producer.test.js b/tests/producer.test.js
index e6908cb..d094505 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -18,6 +18,12 @@
*/
const Pulsar = require('../index');
+const httpUtils = require('./http_utils');
+
+function getPartition(msgId) {
+ // The message id string is in the format of
"entryId,ledgerId,partition,batchIndex"
+ return Number(msgId.toString().split(',')[2]);
+}
(() => {
describe('Producer', () => {
@@ -156,5 +162,82 @@ const Pulsar = require('../index');
await producer2.close();
});
});
+ describe('Message Routing', () => {
+ test('Custom Message Router', async () => {
+ const topic = `test-custom-router-${Date.now()}`;
+ const numPartitions = 3;
+ const response = await httpUtils.createPartitionedTopic(topic,
numPartitions);
+ expect(response.statusCode).toBe(204);
+
+ const producer = await client.createProducer({
+ topic,
+ batchingMaxMessages: 2,
+ messageRouter: (message, topicMetadata) =>
parseInt(message.getPartitionKey(), 10)
+ % topicMetadata.numPartitions,
+ messageRoutingMode: 'CustomPartition',
+ });
+
+ const promises = [];
+ const numMessages = 5;
+ for (let i = 0; i < numMessages; i += 1) {
+ const sendPromise = producer.send({
+ partitionKey: `${i}`,
+ data: Buffer.from(`msg-${i}`),
+ });
+ await sendPromise;
+ promises.push(sendPromise);
+ }
+ try {
+ const allMsgIds = await Promise.all(promises);
+ console.log(`All messages have been sent. IDs: ${allMsgIds.join(',
')}`);
+ for (let i = 0; i < allMsgIds.length; i += 1) {
+ // The message id string is in the format of
"entryId,ledgerId,partition,batchIndex"
+ const partition = getPartition(allMsgIds[i]);
+ expect(i % numPartitions).toBe(partition);
+ }
+ } catch (error) {
+ console.error('One or more messages failed to send:', error);
+ }
+ }, 30000);
+ test('Exception in router', async () => {
+ const topic = `test-exception-in-router-${Date.now()}`;
+ const numPartitions = 2;
+ const response = await httpUtils.createPartitionedTopic(topic,
numPartitions);
+ expect(response.statusCode).toBe(204);
+ const producer = await client.createProducer({
+ topic,
+ messageRouter: (message, topicMetadata) => {
+ throw new Error('Custom error in message router');
+ },
+ messageRoutingMode: 'CustomPartition',
+ });
+ await expect(
+ producer.send({ data: Buffer.from('test') }),
+ ).rejects.toThrow('Failed to send message: UnknownError');
+ }, 30000);
+ test('Not CustomPartition', async () => {
+ const topic = `test-not-custom-part-${Date.now()}`;
+ const numPartitions = 2;
+ const response = await httpUtils.createPartitionedTopic(topic,
numPartitions);
+ expect(response.statusCode).toBe(204);
+
+ let index = 0;
+ const producer = await client.createProducer({
+ topic,
+ messageRouter: (_, topicMetadata) => {
+ const result = index % topicMetadata.numPartitions;
+ index += 1;
+ return result;
+ },
+ messageRoutingMode: 'UseSinglePartition',
+ });
+ const partitions = new Set();
+ for (let i = 0; i < 10; i += 1) {
+ const msgId = await producer.send({ data: Buffer.from('msg') });
+ partitions.add(getPartition(msgId));
+ }
+ expect(partitions.size).toBe(1);
+ }, 30000);
+ });
});
})();
diff --git a/tests/reader.test.js b/tests/reader.test.js
index 56d1b48..fb0842b 100644
--- a/tests/reader.test.js
+++ b/tests/reader.test.js
@@ -19,7 +19,7 @@
const lodash = require('lodash');
const Pulsar = require('../index');
-const httpRequest = require('./http_utils');
+const httpUtils = require('./http_utils');
const baseUrl = 'http://localhost:8080';
@@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080';
const partitionedTopicName = 'test-reader-partitioned-topic';
const partitionedTopic =
`persistent://public/default/${partitionedTopicName}`;
const partitionedTopicAdminURL =
`${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
- const createPartitionedTopicRes = await httpRequest(
+ const createPartitionedTopicRes = await httpUtils.request(
partitionedTopicAdminURL, {
headers: {
'Content-Type': 'text/plain',