This is an automated email from the ASF dual-hosted git repository. shoothzj pushed a commit to branch add-get-index-method in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
commit a8c0827d7a16272d5bb66ceffa261e516880c0a3 Author: ZhangJian He <[email protected]> AuthorDate: Thu Jun 1 08:11:42 2023 +0800 Add getIndex method on Message --- include/pulsar/Message.h | 13 +++++++++++-- lib/ClientConnection.cc | 3 ++- lib/ConsumerImpl.cc | 4 ++-- lib/ConsumerImpl.h | 4 +++- lib/Message.cc | 17 +++++++++++++++-- lib/MessageImpl.cc | 3 ++- lib/MessageImpl.h | 1 + tests/brokermetadata/BrokerMetadataTest.cc | 1 + 8 files changed, 37 insertions(+), 9 deletions(-) diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h index fe99131..13ba57b 100644 --- a/include/pulsar/Message.h +++ b/include/pulsar/Message.h @@ -31,6 +31,7 @@ namespace pulsar { namespace proto { class CommandMessage; +class BrokerEntryMetadata; class MessageMetadata; class SingleMessageMetadata; } // namespace proto @@ -124,6 +125,12 @@ class PULSAR_PUBLIC Message { */ void setMessageId(const MessageId& messageId) const; + /** + * Get the index of this message, if it doesn't exist, return -1 + * @return + */ + const int64_t getIndex() const; + /** * Get the partition key for this message * @return key string that is hashed to determine message's topic partition @@ -195,9 +202,11 @@ class PULSAR_PUBLIC Message { MessageImplPtr impl_; Message(MessageImplPtr& impl); - Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload); + Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata, + proto::MessageMetadata& metadata, SharedBuffer& payload); /// Used for Batch Messages - Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload, + Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata, + proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName); friend class PartitionedProducerImpl; friend class MultiTopicsConsumerImpl; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index d955b32..356b1a4 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -817,7 +817,8 @@ void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, b // Unlock the mutex before notifying the consumer of the // new received message lock.unlock(); - consumer->messageReceived(shared_from_this(), msg, isChecksumValid, msgMetadata, payload); + consumer->messageReceived(shared_from_this(), msg, isChecksumValid, brokerEntryMetadata, + msgMetadata, payload); } else { consumers_.erase(msg.consumer_id()); LOG_DEBUG(cnxString_ << "Ignoring incoming message for already destroyed consumer " diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 04f7142..d63b3cb 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -495,8 +495,8 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff } void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, - bool& isChecksumValid, proto::MessageMetadata& metadata, - SharedBuffer& payload) { + bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata, + proto::MessageMetadata& metadata, SharedBuffer& payload) { LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes()); if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) { diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 158132f..def2543 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -60,6 +60,7 @@ using UnAckedMessageTrackerPtr = std::shared_ptr<UnAckedMessageTrackerInterface> namespace proto { class CommandMessage; +class BrokerEntryMetadata; class MessageMetadata; } // namespace proto @@ -87,7 +88,8 @@ class ConsumerImpl : public ConsumerImplBase { void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int numMessages); uint64_t getConsumerId(); void messageReceived(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, - bool& isChecksumValid, proto::MessageMetadata& msgMetadata, SharedBuffer& payload); + bool& isChecksumValid, proto::BrokerEntryMetadata& brokerEntryMetadata, + proto::MessageMetadata& msgMetadata, SharedBuffer& payload); void messageProcessed(Message& msg, bool track = true); void activeConsumerChanged(bool isActive); inline CommandSubscribe_SubType getSubType(); diff --git a/lib/Message.cc b/lib/Message.cc index bfd65f4..10c086b 100644 --- a/lib/Message.cc +++ b/lib/Message.cc @@ -70,17 +70,21 @@ Message::Message() : impl_() {} Message::Message(MessageImplPtr& impl) : impl_(impl) {} -Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload) +Message::Message(const MessageId& messageId, proto::BrokerEntryMetadata& brokerEntryMetadata, + proto::MessageMetadata& metadata, SharedBuffer& payload) : impl_(std::make_shared<MessageImpl>()) { impl_->messageId = messageId; + impl_->brokerEntryMetadata = brokerEntryMetadata; impl_->metadata = metadata; impl_->payload = payload; } -Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, +Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& brokerEntryMetadata, + proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata, const std::shared_ptr<std::string>& topicName) : impl_(std::make_shared<MessageImpl>()) { impl_->messageId = messageID; + impl_->brokerEntryMetadata = brokerEntryMetadata; impl_->metadata = metadata; impl_->payload = payload; impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties()); @@ -136,6 +140,15 @@ void Message::setMessageId(const MessageId& messageID) const { return; } +const int64_t Message::getIndex() const { + if (!impl_) { + return -1; + } else { + // casting uint64_t to int64_t, server definition ensures that's safe + return static_cast<int64_t>(impl_->brokerEntryMetadata.index()); + } +} + bool Message::hasPartitionKey() const { if (impl_) { return impl_->hasPartitionKey(); diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc index e70ef5d..c70dbc6 100644 --- a/lib/MessageImpl.cc +++ b/lib/MessageImpl.cc @@ -20,7 +20,8 @@ namespace pulsar { -MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), topicName_(), redeliveryCount_() {} +MessageImpl::MessageImpl() + : metadata(), brokerEntryMetadata(), payload(), messageId(), cnx_(0), topicName_(), redeliveryCount_() {} const Message::StringMap& MessageImpl::properties() { if (properties_.size() == 0) { diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h index 1046ede..d83fc8c 100644 --- a/lib/MessageImpl.h +++ b/lib/MessageImpl.h @@ -40,6 +40,7 @@ class MessageImpl { const Message::StringMap& properties(); proto::MessageMetadata metadata; + proto::BrokerEntryMetadata brokerEntryMetadata; SharedBuffer payload; std::shared_ptr<KeyValueImpl> keyValuePtr; MessageId messageId; diff --git a/tests/brokermetadata/BrokerMetadataTest.cc b/tests/brokermetadata/BrokerMetadataTest.cc index f012734..09428c3 100644 --- a/tests/brokermetadata/BrokerMetadataTest.cc +++ b/tests/brokermetadata/BrokerMetadataTest.cc @@ -38,6 +38,7 @@ TEST(BrokerMetadataTest, testConsumeSuccess) { Result receiveResult = consumer.receive(receivedMsg); ASSERT_EQ(receiveResult, ResultOk); ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess"); + ASSERT_NE(receivedMsg.getIndex(), -1); client.close(); }
