This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e76c68  Support custom message router for partitioned topic producer 
(#435)
5e76c68 is described below

commit 5e76c6861f768775368306b8de48a5a44b5c4829
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Sep 28 09:40:10 2025 +0800

    Support custom message router for partitioned topic producer (#435)
    
    * Support custom message router for partitioned topic producer
    
    * Add test
    
    * Fix lint
    
    * simplify code
    
    * Add tests for exceptional cases
    
    * Fix router signature
    
    * Fix interface
    
    * Fix tests
    
    * Add documents
    
    * Test conflicts of messageRoutingMode and messageRouter
---
 index.d.ts             | 18 +++++++++++
 src/Producer.cc        |  4 ++-
 src/Producer.h         |  5 +++
 src/ProducerConfig.cc  | 37 ++++++++++++++++++++++
 src/ProducerConfig.h   |  7 +++++
 tests/client.test.js   | 10 +++---
 tests/http_utils.js    | 16 +++++++++-
 tests/producer.test.js | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++
 tests/reader.test.js   |  4 +--
 9 files changed, 175 insertions(+), 9 deletions(-)

diff --git a/index.d.ts b/index.d.ts
index 270c5e6..72c89af 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -68,6 +68,7 @@ export interface ProducerConfig {
   schema?: SchemaInfo;
   accessMode?: ProducerAccessMode;
   batchingType?: ProducerBatchType;
+  messageRouter?: MessageRouter;
 }
 
 export class Producer {
@@ -176,6 +177,23 @@ export class MessageId {
   toString(): string;
 }
 
+export interface TopicMetadata {
+  numPartitions: number;
+}
+
+/**
+ * @callback MessageRouter
+ * @description When producing messages to a partitioned topic, this router is 
used to select the
+ * target partition for each message. The router only works when the 
`messageRoutingMode` is set to
+ * `CustomPartition`. Please note that `getTopicName()` cannot be called on 
the `message`, otherwise
+ * the behavior will be undefined because the topic is unknown before sending 
the message.
+ * @param message The message to be routed.
+ * @param topicMetadata Metadata for the partitioned topic the message is 
being routed to.
+ * @returns {number} The index of the target partition (must be a number 
between 0 and
+ * topicMetadata.numPartitions - 1).
+ */
+export type MessageRouter = (message: Message, topicMetadata: TopicMetadata) 
=> number;
+
 export interface SchemaInfo {
   schemaType: SchemaType;
   name?: string;
diff --git a/src/Producer.cc b/src/Producer.cc
index c827f9f..5874b22 100644
--- a/src/Producer.cc
+++ b/src/Producer.cc
@@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo 
&info, std::shared_pt
         auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
         auto deferred = instanceContext->deferred;
         auto cClient = instanceContext->cClient;
+        auto producerConfig = instanceContext->producerConfig;
         delete instanceContext;
 
         if (result != pulsar_result_Ok) {
@@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo 
&info, std::shared_pt
 
         std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, 
pulsar_producer_free);
 
-        deferred->Resolve([cProducer](const Napi::Env env) {
+        deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
           Napi::Object obj = Producer::constructor.New({});
           Producer *producer = Producer::Unwrap(obj);
           producer->SetCProducer(cProducer);
+          producer->producerConfig = producerConfig;
           return obj;
         });
       },
diff --git a/src/Producer.h b/src/Producer.h
index 70c2342..98849ed 100644
--- a/src/Producer.h
+++ b/src/Producer.h
@@ -23,6 +23,8 @@
 #include <napi.h>
 #include <pulsar/c/client.h>
 #include <pulsar/c/producer.h>
+#include <memory>
+#include "ProducerConfig.h"
 
 class Producer : public Napi::ObjectWrap<Producer> {
  public:
@@ -35,6 +37,9 @@ class Producer : public Napi::ObjectWrap<Producer> {
 
  private:
   std::shared_ptr<pulsar_producer_t> cProducer;
+  // Extend the lifetime of the producer config since it's env and router 
function could be used when sending
+  // messages
+  std::shared_ptr<ProducerConfig> producerConfig;
   Napi::Value Send(const Napi::CallbackInfo &info);
   Napi::Value Flush(const Napi::CallbackInfo &info);
   Napi::Value Close(const Napi::CallbackInfo &info);
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 2c704bf..3889120 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -18,8 +18,14 @@
  */
 #include "SchemaInfo.h"
 #include "ProducerConfig.h"
+#include "Message.h"
+#include <cstdio>
 #include <map>
+#include "napi-inl.h"
+#include "napi.h"
 #include "pulsar/ProducerConfiguration.h"
+#include "pulsar/c/message.h"
+#include "pulsar/c/message_router.h"
 
 static const std::string CFG_TOPIC = "topic";
 static const std::string CFG_PRODUCER_NAME = "producerName";
@@ -42,6 +48,7 @@ static const std::string CFG_CRYPTO_FAILURE_ACTION = 
"cryptoFailureAction";
 static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
 static const std::string CFG_ACCESS_MODE = "accessMode";
 static const std::string CFG_BATCHING_TYPE = "batchingType";
+static const std::string CFG_MESSAGE_ROUTER = "messageRouter";
 
 struct _pulsar_producer_configuration {
   pulsar::ProducerConfiguration conf;
@@ -82,6 +89,25 @@ static std::map<std::string, 
pulsar::ProducerConfiguration::BatchingType> PRODUC
     {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
 };
 
+static int choosePartition(pulsar_message_t* msg, pulsar_topic_metadata_t* 
metadata, void* ctx) {
+  auto router = static_cast<Napi::FunctionReference*>(ctx);
+  const auto& env = router->Env();
+  auto jsMessage = Message::NewInstance(Napi::Object::New(env),
+                                        std::shared_ptr<pulsar_message_t>(msg, 
[](pulsar_message_t*) {}));
+  int numPartitions = pulsar_topic_metadata_get_num_partitions(metadata);
+
+  Napi::Object jsTopicMetadata = Napi::Object::New(env);
+  jsTopicMetadata.Set("numPartitions", Napi::Number::New(env, numPartitions));
+
+  try {
+    return router->Call({jsMessage, jsTopicMetadata}).ToNumber().Int32Value();
+  } catch (const Napi::Error& e) {
+    // TODO: how to handle the error properly? For now, return an invalid 
partition to fail the send
+    fprintf(stderr, "Error when calling messageRouter: %s\n", e.what());
+    return numPartitions;
+  }
+}
+
 ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") 
{
   this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
       pulsar_producer_configuration_create(), 
pulsar_producer_configuration_free);
@@ -131,8 +157,10 @@ ProducerConfig::ProducerConfig(const Napi::Object& 
producerConfig) : topic("") {
     
pulsar_producer_configuration_set_block_if_queue_full(this->cProducerConfig.get(),
 blockIfQueueFull);
   }
 
+  bool useCustomPartition = false;
   if (producerConfig.Has(CFG_ROUTING_MODE) && 
producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
     std::string messageRoutingMode = 
producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
+    useCustomPartition = (messageRoutingMode == "CustomPartition");
     if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
       
pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(),
                                                                 
MESSAGE_ROUTING_MODE.at(messageRoutingMode));
@@ -224,6 +252,15 @@ ProducerConfig::ProducerConfig(const Napi::Object& 
producerConfig) : topic("") {
   if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
     
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
   }
+
+  if (useCustomPartition && producerConfig.Has(CFG_MESSAGE_ROUTER)) {
+    auto value = producerConfig.Get(CFG_MESSAGE_ROUTER);
+    if (value.IsFunction()) {
+      messageRouter = Napi::Persistent(value.As<Napi::Function>());
+      
pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), 
choosePartition,
+                                                       &messageRouter);
+    }
+  }
 }
 
 ProducerConfig::~ProducerConfig() {}
diff --git a/src/ProducerConfig.h b/src/ProducerConfig.h
index 3d49557..0437768 100644
--- a/src/ProducerConfig.h
+++ b/src/ProducerConfig.h
@@ -22,6 +22,11 @@
 
 #include <napi.h>
 #include <pulsar/c/producer_configuration.h>
+#include <memory>
+
+struct MessageRouterContext {
+  Napi::FunctionReference messageRouter;
+};
 
 class ProducerConfig {
  public:
@@ -33,6 +38,8 @@ class ProducerConfig {
  private:
   std::shared_ptr<pulsar_producer_configuration_t> cProducerConfig;
   std::string topic;
+  std::unique_ptr<MessageRouterContext> routerContext;
+  Napi::FunctionReference messageRouter;
 };
 
 #endif
diff --git a/tests/client.test.js b/tests/client.test.js
index f7bc6d5..d97763e 100644
--- a/tests/client.test.js
+++ b/tests/client.test.js
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-const httpRequest = require('./http_utils');
+const httpUtils = require('./http_utils');
 const Pulsar = require('../index');
 
 const baseUrl = 'http://localhost:8080';
@@ -74,7 +74,7 @@ const baseUrl = 'http://localhost:8080';
         const nonPartitionedTopicName = 'test-non-partitioned-topic';
         const nonPartitionedTopic = 
`persistent://public/default/${nonPartitionedTopicName}`;
         const nonPartitionedTopicAdminURL = 
`${baseUrl}/admin/v2/persistent/public/default/${nonPartitionedTopicName}`;
-        const createNonPartitionedTopicRes = await httpRequest(
+        const createNonPartitionedTopicRes = await httpUtils.request(
           nonPartitionedTopicAdminURL, {
             headers: {
               'Content-Type': 'application/json',
@@ -91,7 +91,7 @@ const baseUrl = 'http://localhost:8080';
         const partitionedTopicName = 'test-partitioned-topic-1';
         const partitionedTopic = 
`persistent://public/default/${partitionedTopicName}`;
         const partitionedTopicAdminURL = 
`${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
-        const createPartitionedTopicRes = await httpRequest(
+        const createPartitionedTopicRes = await httpUtils.request(
           partitionedTopicAdminURL, {
             headers: {
               'Content-Type': 'text/plain',
@@ -110,9 +110,9 @@ const baseUrl = 'http://localhost:8080';
           'persistent://public/default/test-partitioned-topic-1-partition-3',
         ]);
 
-        const deleteNonPartitionedTopicRes = await 
httpRequest(nonPartitionedTopicAdminURL, { method: 'DELETE' });
+        const deleteNonPartitionedTopicRes = await 
httpUtils.request(nonPartitionedTopicAdminURL, { method: 'DELETE' });
         expect(deleteNonPartitionedTopicRes.statusCode).toBe(204);
-        const deletePartitionedTopicRes = await 
httpRequest(partitionedTopicAdminURL, { method: 'DELETE' });
+        const deletePartitionedTopicRes = await 
httpUtils.request(partitionedTopicAdminURL, { method: 'DELETE' });
         expect(deletePartitionedTopicRes.statusCode).toBe(204);
 
         await client.close();
diff --git a/tests/http_utils.js b/tests/http_utils.js
index 8fa94f2..81d09de 100644
--- a/tests/http_utils.js
+++ b/tests/http_utils.js
@@ -42,4 +42,18 @@ const request = (url, { headers, data = {}, method }) => new 
Promise((resolve, r
   req.end();
 });
 
-module.exports = request;
+function createPartitionedTopic(topic, numPartitions) {
+  const url = 
`http://localhost:8080/admin/v2/persistent/public/default/${topic}/partitions`;
+  return request(url, {
+    headers: {
+      'Content-Type': 'application/json',
+    },
+    data: numPartitions,
+    method: 'PUT',
+  });
+}
+
+module.exports = {
+  createPartitionedTopic,
+  request,
+};
diff --git a/tests/producer.test.js b/tests/producer.test.js
index e6908cb..d094505 100644
--- a/tests/producer.test.js
+++ b/tests/producer.test.js
@@ -18,6 +18,12 @@
  */
 
 const Pulsar = require('../index');
+const httpUtils = require('./http_utils');
+
+function getPartition(msgId) {
+  // The message id string is in the format of 
"entryId,ledgerId,partition,batchIndex"
+  return Number(msgId.toString().split(',')[2]);
+}
 
 (() => {
   describe('Producer', () => {
@@ -156,5 +162,82 @@ const Pulsar = require('../index');
         await producer2.close();
       });
     });
+    describe('Message Routing', () => {
+      test('Custom Message Router', async () => {
+        const topic = `test-custom-router-${Date.now()}`;
+        const numPartitions = 3;
+        const response = await httpUtils.createPartitionedTopic(topic, 
numPartitions);
+        expect(response.statusCode).toBe(204);
+
+        const producer = await client.createProducer({
+          topic,
+          batchingMaxMessages: 2,
+          messageRouter: (message, topicMetadata) => 
parseInt(message.getPartitionKey(), 10)
+            % topicMetadata.numPartitions,
+          messageRoutingMode: 'CustomPartition',
+        });
+
+        const promises = [];
+        const numMessages = 5;
+        for (let i = 0; i < numMessages; i += 1) {
+          const sendPromise = producer.send({
+            partitionKey: `${i}`,
+            data: Buffer.from(`msg-${i}`),
+          });
+          await sendPromise;
+          promises.push(sendPromise);
+        }
+        try {
+          const allMsgIds = await Promise.all(promises);
+          console.log(`All messages have been sent. IDs: ${allMsgIds.join(', 
')}`);
+          for (let i = 0; i < allMsgIds.length; i += 1) {
+            // The message id string is in the format of 
"entryId,ledgerId,partition,batchIndex"
+            const partition = getPartition(allMsgIds[i]);
+            expect(i % numPartitions).toBe(partition);
+          }
+        } catch (error) {
+          console.error('One or more messages failed to send:', error);
+        }
+      }, 30000);
+      test('Exception in router', async () => {
+        const topic = `test-exception-in-router-${Date.now()}`;
+        const numPartitions = 2;
+        const response = await httpUtils.createPartitionedTopic(topic, 
numPartitions);
+        expect(response.statusCode).toBe(204);
+        const producer = await client.createProducer({
+          topic,
+          messageRouter: (message, topicMetadata) => {
+            throw new Error('Custom error in message router');
+          },
+          messageRoutingMode: 'CustomPartition',
+        });
+        await expect(
+          producer.send({ data: Buffer.from('test') }),
+        ).rejects.toThrow('Failed to send message: UnknownError');
+      }, 30000);
+      test('Not CustomPartition', async () => {
+        const topic = `test-not-custom-part-${Date.now()}`;
+        const numPartitions = 2;
+        const response = await httpUtils.createPartitionedTopic(topic, 
numPartitions);
+        expect(response.statusCode).toBe(204);
+
+        let index = 0;
+        const producer = await client.createProducer({
+          topic,
+          messageRouter: (_, topicMetadata) => {
+            const result = index % topicMetadata.numPartitions;
+            index += 1;
+            return result;
+          },
+          messageRoutingMode: 'UseSinglePartition',
+        });
+        const partitions = new Set();
+        for (let i = 0; i < 10; i += 1) {
+          const msgId = await producer.send({ data: Buffer.from('msg') });
+          partitions.add(getPartition(msgId));
+        }
+        expect(partitions.size).toBe(1);
+      }, 30000);
+    });
   });
 })();
diff --git a/tests/reader.test.js b/tests/reader.test.js
index 56d1b48..fb0842b 100644
--- a/tests/reader.test.js
+++ b/tests/reader.test.js
@@ -19,7 +19,7 @@
 
 const lodash = require('lodash');
 const Pulsar = require('../index');
-const httpRequest = require('./http_utils');
+const httpUtils = require('./http_utils');
 
 const baseUrl = 'http://localhost:8080';
 
@@ -81,7 +81,7 @@ const baseUrl = 'http://localhost:8080';
       const partitionedTopicName = 'test-reader-partitioned-topic';
       const partitionedTopic = 
`persistent://public/default/${partitionedTopicName}`;
       const partitionedTopicAdminURL = 
`${baseUrl}/admin/v2/persistent/public/default/${partitionedTopicName}/partitions`;
-      const createPartitionedTopicRes = await httpRequest(
+      const createPartitionedTopicRes = await httpUtils.request(
         partitionedTopicAdminURL, {
           headers: {
             'Content-Type': 'text/plain',

Reply via email to