This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch add-get-index-method
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git

commit 7d5e6ae6f94e326b2836bbc54f09ee8fcae68c0c
Author: ZhangJian He <[email protected]>
AuthorDate: Thu Jun 1 08:11:42 2023 +0800

    Add getIndex method on Message
---
 include/pulsar/Message.h                   | 13 +++++++++++--
 lib/ClientConnection.cc                    |  3 ++-
 lib/ConsumerImpl.cc                        |  4 ++--
 lib/ConsumerImpl.h                         |  4 +++-
 lib/Message.cc                             | 16 ++++++++++++++--
 lib/MessageImpl.cc                         |  3 ++-
 lib/MessageImpl.h                          |  1 +
 tests/brokermetadata/BrokerMetadataTest.cc |  1 +
 8 files changed, 36 insertions(+), 9 deletions(-)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index fe99131..13ba57b 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -31,6 +31,7 @@
 namespace pulsar {
 namespace proto {
 class CommandMessage;
+class BrokerEntryMetadata;
 class MessageMetadata;
 class SingleMessageMetadata;
 }  // namespace proto
@@ -124,6 +125,12 @@ class PULSAR_PUBLIC Message {
      */
     void setMessageId(const MessageId& messageId) const;
 
+    /**
+     * Get the index of this message, if it doesn't exist, return -1
+     * @return
+     */
+    const int64_t getIndex() const;
+
     /**
      * Get the partition key for this message
      * @return key string that is hashed to determine message's topic partition
@@ -195,9 +202,11 @@ class PULSAR_PUBLIC Message {
     MessageImplPtr impl_;
 
     Message(MessageImplPtr& impl);
-    Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload);
+    Message(const MessageId& messageId, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+            proto::MessageMetadata& metadata, SharedBuffer& payload);
     /// Used for Batch Messages
-    Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
+    Message(const MessageId& messageId, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+            proto::MessageMetadata& metadata, SharedBuffer& payload,
             proto::SingleMessageMetadata& singleMetadata, const 
std::shared_ptr<std::string>& topicName);
     friend class PartitionedProducerImpl;
     friend class MultiTopicsConsumerImpl;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index d955b32..356b1a4 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -817,7 +817,8 @@ void ClientConnection::handleIncomingMessage(const 
proto::CommandMessage& msg, b
             // Unlock the mutex before notifying the consumer of the
             // new received message
             lock.unlock();
-            consumer->messageReceived(shared_from_this(), msg, 
isChecksumValid, msgMetadata, payload);
+            consumer->messageReceived(shared_from_this(), msg, 
isChecksumValid, brokerEntryMetadata,
+                                      msgMetadata, payload);
         } else {
             consumers_.erase(msg.consumer_id());
             LOG_DEBUG(cnxString_ << "Ignoring incoming message for already 
destroyed consumer "
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 04f7142..d63b3cb 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -495,8 +495,8 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
 }
 
 void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
-                                   bool& isChecksumValid, 
proto::MessageMetadata& metadata,
-                                   SharedBuffer& payload) {
+                                   bool& isChecksumValid, 
proto::BrokerEntryMetadata& brokerEntryMetadata,
+                                   proto::MessageMetadata& metadata, 
SharedBuffer& payload) {
     LOG_DEBUG(getName() << "Received Message -- Size: " << 
payload.readableBytes());
 
     if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 158132f..def2543 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -60,6 +60,7 @@ using UnAckedMessageTrackerPtr = 
std::shared_ptr<UnAckedMessageTrackerInterface>
 
 namespace proto {
 class CommandMessage;
+class BrokerEntryMetadata;
 class MessageMetadata;
 }  // namespace proto
 
@@ -87,7 +88,8 @@ class ConsumerImpl : public ConsumerImplBase {
     void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int 
numMessages);
     uint64_t getConsumerId();
     void messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
-                         bool& isChecksumValid, proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload);
+                         bool& isChecksumValid, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+                         proto::MessageMetadata& msgMetadata, SharedBuffer& 
payload);
     void messageProcessed(Message& msg, bool track = true);
     void activeConsumerChanged(bool isActive);
     inline CommandSubscribe_SubType getSubType();
diff --git a/lib/Message.cc b/lib/Message.cc
index bfd65f4..8b0480d 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -70,17 +70,21 @@ Message::Message() : impl_() {}
 
 Message::Message(MessageImplPtr& impl) : impl_(impl) {}
 
-Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload)
+Message::Message(const MessageId& messageId, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+                 proto::MessageMetadata& metadata, SharedBuffer& payload)
     : impl_(std::make_shared<MessageImpl>()) {
     impl_->messageId = messageId;
+    impl_->brokerEntryMetadata = brokerEntryMetadata;
     impl_->metadata = metadata;
     impl_->payload = payload;
 }
 
-Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
+Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+                 proto::MessageMetadata& metadata, SharedBuffer& payload,
                  proto::SingleMessageMetadata& singleMetadata, const 
std::shared_ptr<std::string>& topicName)
     : impl_(std::make_shared<MessageImpl>()) {
     impl_->messageId = messageID;
+    impl_->brokerEntryMetadata = brokerEntryMetadata;
     impl_->metadata = metadata;
     impl_->payload = payload;
     
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
@@ -136,6 +140,14 @@ void Message::setMessageId(const MessageId& messageID) 
const {
     return;
 }
 
+const int64_t Message::getIndex() const {
+    if (!impl_) {
+        return -1;
+    } else {
+        return impl_->brokerEntryMetadata.index();
+    }
+}
+
 bool Message::hasPartitionKey() const {
     if (impl_) {
         return impl_->hasPartitionKey();
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index e70ef5d..c70dbc6 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -20,7 +20,8 @@
 
 namespace pulsar {
 
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), 
topicName_(), redeliveryCount_() {}
+MessageImpl::MessageImpl()
+    : metadata(), brokerEntryMetadata(), payload(), messageId(), cnx_(0), 
topicName_(), redeliveryCount_() {}
 
 const Message::StringMap& MessageImpl::properties() {
     if (properties_.size() == 0) {
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 1046ede..d83fc8c 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -40,6 +40,7 @@ class MessageImpl {
     const Message::StringMap& properties();
 
     proto::MessageMetadata metadata;
+    proto::BrokerEntryMetadata brokerEntryMetadata;
     SharedBuffer payload;
     std::shared_ptr<KeyValueImpl> keyValuePtr;
     MessageId messageId;
diff --git a/tests/brokermetadata/BrokerMetadataTest.cc 
b/tests/brokermetadata/BrokerMetadataTest.cc
index f012734..09428c3 100644
--- a/tests/brokermetadata/BrokerMetadataTest.cc
+++ b/tests/brokermetadata/BrokerMetadataTest.cc
@@ -38,6 +38,7 @@ TEST(BrokerMetadataTest, testConsumeSuccess) {
     Result receiveResult = consumer.receive(receivedMsg);
     ASSERT_EQ(receiveResult, ResultOk);
     ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+    ASSERT_NE(receivedMsg.getIndex(), -1);
     client.close();
 }
 

Reply via email to