This is an automated email from the ASF dual-hosted git repository.
BewareMyPower pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f2c0fec feat: expose replicated_from proto field to Message (#583)
f2c0fec is described below
commit f2c0fecd388318d7a940cc1aabcd3adead7d5879
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Jun 1 18:05:31 2026 +0800
feat: expose replicated_from proto field to Message (#583)
---
include/pulsar/Message.h | 8 ++++++++
include/pulsar/c/message.h | 9 +++++++++
lib/Message.cc | 7 +++++++
lib/c/c_Message.cc | 5 +++++
tests/MessageTest.cc | 12 ++++++++++++
tests/c/c_MessageTest.cc | 16 ++++++++++++++++
6 files changed, 57 insertions(+)
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index b92ec6a..1c906e8 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -220,6 +220,14 @@ class PULSAR_PUBLIC Message {
*/
const std::string& getProducerName() const noexcept;
+ /**
+ * Get the source cluster from which the message was replicated.
+ *
+ * @return the optional pointer to the source cluster name if the message
was replicated, the pointer is
+ * valid as the Message instance is alive
+ */
+ std::optional<const std::string*> getReplicatedFrom() const;
+
/**
* @return the optional encryption context that is present when the
message is encrypted, the pointer is
* valid as the Message instance is alive
diff --git a/include/pulsar/c/message.h b/include/pulsar/c/message.h
index 8aceca5..af22639 100644
--- a/include/pulsar/c/message.h
+++ b/include/pulsar/c/message.h
@@ -230,6 +230,15 @@ PULSAR_PUBLIC void
pulsar_message_set_schema_version(pulsar_message_t *message,
*/
PULSAR_PUBLIC const char *pulsar_message_get_producer_name(pulsar_message_t
*message);
+/**
+ * Get the source cluster from which the message was replicated.
+ *
+ * The pointer points to internal storage owned by the message wrapper, so the
caller should not free it.
+ *
+ * @return the source cluster name, or NULL if the message is not replicated
+ */
+PULSAR_PUBLIC const char *pulsar_message_get_replicated_from(pulsar_message_t
*message);
+
/**
* Check if the message has a null value.
*
diff --git a/lib/Message.cc b/lib/Message.cc
index f4e6d69..8bde683 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -239,6 +239,13 @@ const std::string& Message::getProducerName() const
noexcept {
return impl_->metadata.producer_name();
}
+std::optional<const std::string*> Message::getReplicatedFrom() const {
+ if (!impl_ || !impl_->metadata.has_replicated_from()) {
+ return std::nullopt;
+ }
+ return &impl_->metadata.replicated_from();
+}
+
std::optional<const EncryptionContext*> Message::getEncryptionContext() const {
if (!impl_ || !impl_->encryptionContext_.has_value()) {
return std::nullopt;
diff --git a/lib/c/c_Message.cc b/lib/c/c_Message.cc
index 51afa8e..6d36309 100644
--- a/lib/c/c_Message.cc
+++ b/lib/c/c_Message.cc
@@ -151,4 +151,9 @@ const char
*pulsar_message_get_producer_name(pulsar_message_t *message) {
return message->message.getProducerName().c_str();
}
+const char *pulsar_message_get_replicated_from(pulsar_message_t *message) {
+ const auto replicatedFrom = message->message.getReplicatedFrom();
+ return replicatedFrom ? replicatedFrom.value()->c_str() : nullptr;
+}
+
int pulsar_message_has_null_value(pulsar_message_t *message) { return
message->message.hasNullValue(); }
diff --git a/tests/MessageTest.cc b/tests/MessageTest.cc
index 0ffcc41..7f1ae4d 100644
--- a/tests/MessageTest.cc
+++ b/tests/MessageTest.cc
@@ -22,6 +22,7 @@
#include <string>
+#include "PulsarFriend.h"
#include "lib/MessageImpl.h"
using namespace pulsar;
@@ -154,6 +155,17 @@ TEST(MessageTest, testGetTopicNameOnProducerMessage) {
ASSERT_TRUE(msg.getTopicName().empty());
}
+TEST(MessageTest, testReplicationMetadataAccessors) {
+ auto msg = MessageBuilder().setContent("test").build();
+ ASSERT_FALSE(msg.getReplicatedFrom().has_value());
+
+ PulsarFriend::getMessageMetadata(msg).set_replicated_from("us-west1");
+
+ const auto replicatedFrom = msg.getReplicatedFrom();
+ ASSERT_TRUE(replicatedFrom.has_value());
+ ASSERT_EQ(*replicatedFrom.value(), "us-west1");
+}
+
TEST(MessageTest, testNullValueMessage) {
{
auto msg = MessageBuilder().setContent("test").build();
diff --git a/tests/c/c_MessageTest.cc b/tests/c/c_MessageTest.cc
index a64a990..7a2ee50 100644
--- a/tests/c/c_MessageTest.cc
+++ b/tests/c/c_MessageTest.cc
@@ -20,6 +20,8 @@
#include <lib/c/c_structs.h>
#include <pulsar/c/message.h>
+#include "../PulsarFriend.h"
+
TEST(c_MessageTest, MessageCopy) {
pulsar_message_t *from = pulsar_message_create();
pulsar_message_set_content(from, "hello", 5);
@@ -32,3 +34,17 @@ TEST(c_MessageTest, MessageCopy) {
pulsar_message_free(from);
pulsar_message_free(to);
}
+
+TEST(c_MessageTest, ReplicationMetadataAccessors) {
+ pulsar_message_t *message = pulsar_message_create();
+ pulsar_message_set_content(message, "hello", 5);
+ message->message = message->builder.build();
+
+ ASSERT_EQ(nullptr, pulsar_message_get_replicated_from(message));
+
+
PulsarFriend::getMessageMetadata(message->message).set_replicated_from("us-west1");
+
+ ASSERT_STREQ("us-west1", pulsar_message_get_replicated_from(message));
+
+ pulsar_message_free(message);
+}