This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 93f10ef Support get the producer name of a message (#524)
93f10ef is described below
commit 93f10ef76f4e2301472a0ec39ddad8287c1205ee
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 19 17:04:59 2025 +0800
Support get the producer name of a message (#524)
---
include/pulsar/Message.h | 7 +++++++
include/pulsar/c/message.h | 6 ++++++
lib/Message.cc | 7 +++++++
lib/c/c_Message.cc | 4 ++++
tests/BasicEndToEndTest.cc | 13 ++++++++++++-
tests/MessageTest.cc | 1 +
tests/c/c_BasicEndToEndTest.cc | 11 +++++++++--
7 files changed, 46 insertions(+), 3 deletions(-)
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 2e78e98..ea4c4ab 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -195,6 +195,13 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getSchemaVersion() const;
+ /**
+ * Get the producer name which produced this message.
+ *
+ * @return the producer name or empty string if not available
+ */
+ const std::string& getProducerName() const noexcept;
+
bool operator==(const Message& msg) const;
protected:
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index 30f1bad..1f1f91f 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -215,6 +215,12 @@ PULSAR_PUBLIC const char
*pulsar_message_get_schemaVersion(pulsar_message_t *mes
PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t
*message, const char *schemaVersion);
+/**
+ * Returns the producer name which produced this message. The pointer points
to an internal string, so the
+ * caller should not free it.
+ */
+PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t
*message);
+
#ifdef __cplusplus
}
#endif
diff --git a/lib/Message.cc b/lib/Message.cc
index 2a6e8a3..1e26b52 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -213,6 +213,13 @@ uint64_t Message::getPublishTimestamp() const { return
impl_ ? impl_->getPublish
uint64_t Message::getEventTimestamp() const { return impl_ ?
impl_->getEventTimestamp() : 0ull; }
+const std::string& Message::getProducerName() const noexcept {
+ if (!impl_) {
+ return emptyString;
+ }
+ return impl_->metadata.producer_name();
+}
+
bool Message::operator==(const Message& msg) const { return getMessageId() ==
msg.getMessageId(); }
KeyValue Message::getKeyValueData() const { return
KeyValue(impl_->keyValuePtr); }
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index 3d30095..02c4ce3 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -140,3 +140,7 @@ const char
*pulsar_message_get_schemaVersion(pulsar_message_t *message) {
int pulsar_message_has_schema_version(pulsar_message_t *message) {
return message->message.hasSchemaVersion();
}
+
+const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
+ return message->message.getProducerName().c_str();
+}
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index 5cf478b..c9a8faa 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -242,9 +242,20 @@ TEST(BasicEndToEndTest, testProduceConsume) {
Message receivedMsg;
consumer.receive(receivedMsg);
ASSERT_EQ(content, receivedMsg.getDataAsString());
+ ASSERT_FALSE(receivedMsg.getProducerName().empty());
+ ASSERT_EQ(ResultOk, producer.close());
+
+ ProducerConfiguration conf;
+ conf.setProducerName("test-producer");
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producer));
+ producer.send(MessageBuilder().setContent("msg-2-content").build());
+ consumer.receive(receivedMsg);
+ ASSERT_EQ("msg-2-content", receivedMsg.getDataAsString());
+ ASSERT_EQ("test-producer", receivedMsg.getProducerName());
+ consumer.acknowledge(receivedMsg);
ASSERT_EQ(ResultOk, consumer.unsubscribe());
ASSERT_EQ(ResultOk, consumer.close());
- ASSERT_EQ(ResultOk, producer.close());
+ ASSERT_EQ(ResultOk, consumer.close());
ASSERT_EQ(ResultOk, client.close());
}
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 513ea8d..688cb33 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -42,6 +42,7 @@ TEST(MessageTest, testMessageContents) {
ASSERT_NE(myContents.c_str(), (char*)msg.getData());
ASSERT_EQ(myContents, msg.getDataAsString());
ASSERT_EQ(std::string("mycontents").length(), msg.getLength());
+ ASSERT_TRUE(msg.getProducerName().empty());
}
TEST(MessageTest, testAllocatedContents) {
diff --git a/tests/c/c_BasicEndToEndTest.cc b/tests/c/c_BasicEndToEndTest.cc
index b319727..34acd14 100644
--- a/tests/c/c_BasicEndToEndTest.cc
+++ b/tests/c/c_BasicEndToEndTest.cc
@@ -34,6 +34,7 @@ struct receive_ctx {
pulsar_result result;
pulsar_consumer_t *consumer;
char *data;
+ char *producer_name;
std::promise<void> *promise;
};
@@ -57,6 +58,9 @@ static void receive_callback(pulsar_result async_result,
pulsar_message_t *msg,
const char *data = (const char *)pulsar_message_get_data(msg);
receive_ctx->data = (char *)malloc(strlen(data) * sizeof(char) + 1);
strcpy(receive_ctx->data, data);
+ const char *producer_name = pulsar_message_get_producer_name(msg);
+ receive_ctx->producer_name = (char *)malloc(strlen(producer_name) *
sizeof(char) + 1);
+ strcpy(receive_ctx->producer_name, producer_name);
}
receive_ctx->promise->set_value();
pulsar_message_free(msg);
@@ -71,6 +75,7 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
pulsar_producer_configuration_t *producer_conf =
pulsar_producer_configuration_create();
+ pulsar_producer_configuration_set_producer_name(producer_conf,
"test-producer");
pulsar_producer_t *producer;
pulsar_result result = pulsar_client_create_producer(client, topic_name,
producer_conf, &producer);
ASSERT_EQ(pulsar_result_Ok, result);
@@ -101,12 +106,14 @@ TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
// receive asynchronously
std::promise<void> receive_promise;
std::future<void> receive_future = receive_promise.get_future();
- struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer,
NULL, &receive_promise};
+ struct receive_ctx receive_ctx = {pulsar_result_UnknownError, consumer,
NULL, NULL, &receive_promise};
pulsar_consumer_receive_async(consumer, receive_callback, &receive_ctx);
receive_future.get();
ASSERT_EQ(pulsar_result_Ok, receive_ctx.result);
+ ASSERT_STREQ("test-producer", receive_ctx.producer_name);
ASSERT_STREQ(content, receive_ctx.data);
- delete receive_ctx.data;
+ free(receive_ctx.data);
+ free(receive_ctx.producer_name);
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_unsubscribe(consumer));
ASSERT_EQ(pulsar_result_Ok, pulsar_consumer_close(consumer));