This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d03e46a  Add getIndex method on Message (#277)
d03e46a is described below

commit d03e46a8dfbf573f622fccd917f6f2796185c233
Author: ZhangJian He <[email protected]>
AuthorDate: Wed Jun 7 11:54:30 2023 +0800

    Add getIndex method on Message (#277)
    
    ### Motivation
    After #276 , We already can consume messages contains index well. Now we 
can add a method allow user to getIndex.
    Index is an optional brokerMetadata. If the index not exists, we will 
return -1
    
    ### Verifying this change
    - Add the assertions to assert the index not equals -1.
---
 include/pulsar/Message.h                   | 13 +++++-
 lib/ClientConnection.cc                    |  9 ++--
 lib/Commands.cc                            |  4 +-
 lib/ConsumerImpl.cc                        | 12 ++++--
 lib/ConsumerImpl.h                         |  4 +-
 lib/Message.cc                             | 17 +++++++-
 lib/MessageImpl.cc                         |  2 -
 lib/MessageImpl.h                          |  3 +-
 tests/ConsumerTest.cc                      | 21 ++++++++++
 tests/brokermetadata/BrokerMetadataTest.cc | 67 ++++++++++++++++++++++++++----
 tests/brokermetadata/docker-compose.yml    |  4 +-
 11 files changed, 127 insertions(+), 29 deletions(-)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index fe99131..c8c18fa 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -31,6 +31,7 @@
 namespace pulsar {
 namespace proto {
 class CommandMessage;
+class BrokerEntryMetadata;
 class MessageMetadata;
 class SingleMessageMetadata;
 }  // namespace proto
@@ -124,6 +125,12 @@ class PULSAR_PUBLIC Message {
      */
     void setMessageId(const MessageId& messageId) const;
 
+    /**
+     * Get the index of this message, if it doesn't exist, return -1
+     * @return
+     */
+    int64_t getIndex() const;
+
     /**
      * Get the partition key for this message
      * @return key string that is hashed to determine message's topic partition
@@ -195,9 +202,11 @@ class PULSAR_PUBLIC Message {
     MessageImplPtr impl_;
 
     Message(MessageImplPtr& impl);
-    Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload);
+    Message(const MessageId& messageId, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+            proto::MessageMetadata& metadata, SharedBuffer& payload);
     /// Used for Batch Messages
-    Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
+    Message(const MessageId& messageId, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+            proto::MessageMetadata& metadata, SharedBuffer& payload,
             proto::SingleMessageMetadata& singleMetadata, const 
std::shared_ptr<std::string>& topicName);
     friend class PartitionedProducerImpl;
     friend class MultiTopicsConsumerImpl;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index d955b32..4af5da0 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -683,7 +683,6 @@ void ClientConnection::processIncomingBuffer() {
 
             // read checksum
             uint32_t remainingBytes = frameSize - (cmdSize + 4);
-            bool isChecksumValid = verifyChecksum(incomingBuffer_, 
remainingBytes, incomingCmd);
 
             auto readerIndex = incomingBuffer_.readerIndex();
             if (incomingBuffer_.readUnsignedShort() == 
Commands::magicBrokerEntryMetadata) {
@@ -698,13 +697,14 @@ void ClientConnection::processIncomingBuffer() {
                     close();
                     return;
                 }
-
-                incomingBuffer_.consume(brokerEntryMetadataSize);
+                incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 + 
brokerEntryMetadataSize);
                 remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
             } else {
                 incomingBuffer_.setReaderIndex(readerIndex);
             }
 
+            bool isChecksumValid = verifyChecksum(incomingBuffer_, 
remainingBytes, incomingCmd);
+
             uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
             if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), 
metadataSize)) {
                 LOG_ERROR(cnxString_ << "[consumer id " << 
incomingCmd.message().consumer_id()  //
@@ -817,7 +817,8 @@ void ClientConnection::handleIncomingMessage(const 
proto::CommandMessage& msg, b
             // Unlock the mutex before notifying the consumer of the
             // new received message
             lock.unlock();
-            consumer->messageReceived(shared_from_this(), msg, 
isChecksumValid, msgMetadata, payload);
+            consumer->messageReceived(shared_from_this(), msg, 
isChecksumValid, brokerEntryMetadata,
+                                      msgMetadata, payload);
         } else {
             consumers_.erase(msg.consumer_id());
             LOG_DEBUG(cnxString_ << "Ignoring incoming message for already 
destroyed consumer "
diff --git a/lib/Commands.cc b/lib/Commands.cc
index cec9d3b..3245b53 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -277,6 +277,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& 
authentication, const
 
     FeatureFlags* flags = connect->mutable_feature_flags();
     flags->set_supports_auth_refresh(true);
+    flags->set_supports_broker_entry_metadata(true);
     if (connectingThroughProxy) {
         Url logicalAddressUrl;
         Url::parse(logicalAddress, logicalAddressUrl);
@@ -908,7 +909,8 @@ Message Commands::deSerializeSingleMessageInBatch(Message& 
batchedMessage, int32
     const MessageId& m = batchedMessage.impl_->messageId;
     auto messageId = 
MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
     auto batchedMessageId = 
std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
-    Message singleMessage(MessageId{batchedMessageId}, 
batchedMessage.impl_->metadata, payload, metadata,
+    Message singleMessage(MessageId{batchedMessageId}, 
batchedMessage.impl_->brokerEntryMetadata,
+                          batchedMessage.impl_->metadata, payload, metadata,
                           batchedMessage.impl_->topicName_);
     singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 04f7142..666361b 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -495,8 +495,8 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
 }
 
 void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
-                                   bool& isChecksumValid, 
proto::MessageMetadata& metadata,
-                                   SharedBuffer& payload) {
+                                   bool& isChecksumValid, 
proto::BrokerEntryMetadata& brokerEntryMetadata,
+                                   proto::MessageMetadata& metadata, 
SharedBuffer& payload) {
     LOG_DEBUG(getName() << "Received Message -- Size: " << 
payload.readableBytes());
 
     if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
@@ -536,7 +536,7 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         }
     }
 
-    Message m(messageId, metadata, payload);
+    Message m(messageId, brokerEntryMetadata, metadata, payload);
     m.impl_->cnx_ = cnx.get();
     m.impl_->setTopicName(topic_);
     m.impl_->setRedeliveryCount(msg.redelivery_count());
@@ -565,7 +565,7 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         Lock lock(mutex_);
         numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m, 
ackSet, msg.redelivery_count());
     } else {
-        // try convery key value data.
+        // try convert key value data.
         m.impl_->convertPayloadToKeyValue(config_.getSchema());
 
         const auto startMessageId = startMessageId_.get();
@@ -706,6 +706,10 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
         msg.impl_->setRedeliveryCount(redeliveryCount);
         msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
         msg.impl_->convertPayloadToKeyValue(config_.getSchema());
+        if (msg.impl_->brokerEntryMetadata.has_index()) {
+            
msg.impl_->brokerEntryMetadata.set_index(msg.impl_->brokerEntryMetadata.index() 
- batchSize + i +
+                                                     1);
+        }
 
         if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
             possibleToDeadLetter.emplace_back(msg);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 158132f..def2543 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -60,6 +60,7 @@ using UnAckedMessageTrackerPtr = 
std::shared_ptr<UnAckedMessageTrackerInterface>
 
 namespace proto {
 class CommandMessage;
+class BrokerEntryMetadata;
 class MessageMetadata;
 }  // namespace proto
 
@@ -87,7 +88,8 @@ class ConsumerImpl : public ConsumerImplBase {
     void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int 
numMessages);
     uint64_t getConsumerId();
     void messageReceived(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
-                         bool& isChecksumValid, proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload);
+                         bool& isChecksumValid, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+                         proto::MessageMetadata& msgMetadata, SharedBuffer& 
payload);
     void messageProcessed(Message& msg, bool track = true);
     void activeConsumerChanged(bool isActive);
     inline CommandSubscribe_SubType getSubType();
diff --git a/lib/Message.cc b/lib/Message.cc
index bfd65f4..06fa5db 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -70,17 +70,21 @@ Message::Message() : impl_() {}
 
 Message::Message(MessageImplPtr& impl) : impl_(impl) {}
 
-Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload)
+Message::Message(const MessageId& messageId, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+                 proto::MessageMetadata& metadata, SharedBuffer& payload)
     : impl_(std::make_shared<MessageImpl>()) {
     impl_->messageId = messageId;
+    impl_->brokerEntryMetadata = brokerEntryMetadata;
     impl_->metadata = metadata;
     impl_->payload = payload;
 }
 
-Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
+Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata& 
brokerEntryMetadata,
+                 proto::MessageMetadata& metadata, SharedBuffer& payload,
                  proto::SingleMessageMetadata& singleMetadata, const 
std::shared_ptr<std::string>& topicName)
     : impl_(std::make_shared<MessageImpl>()) {
     impl_->messageId = messageID;
+    impl_->brokerEntryMetadata = brokerEntryMetadata;
     impl_->metadata = metadata;
     impl_->payload = payload;
     
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
@@ -136,6 +140,15 @@ void Message::setMessageId(const MessageId& messageID) 
const {
     return;
 }
 
+int64_t Message::getIndex() const {
+    if (!impl_ || !impl_->brokerEntryMetadata.has_index()) {
+        return -1;
+    } else {
+        // casting uint64_t to int64_t, server definition ensures that's safe
+        return static_cast<int64_t>(impl_->brokerEntryMetadata.index());
+    }
+}
+
 bool Message::hasPartitionKey() const {
     if (impl_) {
         return impl_->hasPartitionKey();
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index e70ef5d..4650b99 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -20,8 +20,6 @@
 
 namespace pulsar {
 
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0), 
topicName_(), redeliveryCount_() {}
-
 const Message::StringMap& MessageImpl::properties() {
     if (properties_.size() == 0) {
         for (int i = 0; i < metadata.properties_size(); i++) {
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 1046ede..cc07c58 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -35,10 +35,9 @@ class BatchMessageContainer;
 
 class MessageImpl {
    public:
-    MessageImpl();
-
     const Message::StringMap& properties();
 
+    proto::BrokerEntryMetadata brokerEntryMetadata;
     proto::MessageMetadata metadata;
     SharedBuffer payload;
     std::shared_ptr<KeyValueImpl> keyValuePtr;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index b6956fe..9f28dfb 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -93,6 +93,27 @@ class ActiveInactiveListenerEvent : public 
ConsumerEventListener {
     std::mutex mutex_;
 };
 
+TEST(ConsumerTest, testConsumerIndex) {
+    Client client(lookupUrl);
+    const std::string topicName = "testConsumerIndex-topic-" + 
std::to_string(time(nullptr));
+    const std::string subName = "sub";
+    Producer producer;
+    Result producerResult = client.createProducer(topicName, producer);
+    ASSERT_EQ(producerResult, ResultOk);
+    Consumer consumer;
+    Result consumerResult = client.subscribe(topicName, subName, consumer);
+    ASSERT_EQ(consumerResult, ResultOk);
+    const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
+    Result sendResult = producer.send(msg);
+    ASSERT_EQ(sendResult, ResultOk);
+    Message receivedMsg;
+    Result receiveResult = consumer.receive(receivedMsg);
+    ASSERT_EQ(receiveResult, ResultOk);
+    ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+    ASSERT_EQ(receivedMsg.getIndex(), -1);
+    client.close();
+}
+
 typedef std::shared_ptr<ActiveInactiveListenerEvent> 
ActiveInactiveListenerEventPtr;
 
 TEST(ConsumerTest, testConsumerEventWithoutPartition) {
diff --git a/tests/brokermetadata/BrokerMetadataTest.cc 
b/tests/brokermetadata/BrokerMetadataTest.cc
index f012734..ccf6eeb 100644
--- a/tests/brokermetadata/BrokerMetadataTest.cc
+++ b/tests/brokermetadata/BrokerMetadataTest.cc
@@ -20,24 +20,73 @@
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
 
+#include "lib/Latch.h"
+
 using namespace pulsar;
 
 TEST(BrokerMetadataTest, testConsumeSuccess) {
     Client client{"pulsar://localhost:6650"};
     Producer producer;
-    Result producerResult = 
client.createProducer("persistent://public/default/testConsumeSuccess", 
producer);
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setBatchingEnabled(false);
+    Result producerResult =
+        client.createProducer("persistent://public/default/topic-non-batch", 
producerConfiguration, producer);
     ASSERT_EQ(producerResult, ResultOk);
     Consumer consumer;
-    Result consumerResult =
-        client.subscribe("persistent://public/default/testConsumeSuccess", 
"testConsumeSuccess", consumer);
+    Result consumerResult = 
client.subscribe("persistent://public/default/topic-non-batch", "sub", 
consumer);
     ASSERT_EQ(consumerResult, ResultOk);
-    const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
-    Result sendResult = producer.send(msg);
-    ASSERT_EQ(sendResult, ResultOk);
+    for (int i = 0; i < 10; i++) {
+        std::string content = "testConsumeSuccess" + std::to_string(i);
+        const auto msg = MessageBuilder().setContent(content).build();
+        Result sendResult = producer.send(msg);
+        ASSERT_EQ(sendResult, ResultOk);
+    }
+
+    Message receivedMsg;
+    for (int i = 0; i < 10; i++) {
+        Result receiveResult =
+            consumer.receive(receivedMsg, 1000);  // Assumed that we wait 1000 
ms for each message
+        printf("receive index: %d\n", i);
+        ASSERT_EQ(receiveResult, ResultOk);
+        ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + 
std::to_string(i));
+        ASSERT_EQ(receivedMsg.getIndex(), i);
+        Result ackResult = consumer.acknowledge(receivedMsg);
+        ASSERT_EQ(ackResult, ResultOk);
+    }
+    client.close();
+}
+
+TEST(BrokerMetadataTest, testConsumeBatchSuccess) {
+    Client client{"pulsar://localhost:6650"};
+    Producer producer;
+    Result producerResult = 
client.createProducer("persistent://public/default/topic-batch", producer);
+    ASSERT_EQ(producerResult, ResultOk);
+    Consumer consumer;
+    Result consumerResult = 
client.subscribe("persistent://public/default/topic-batch", "sub", consumer);
+    ASSERT_EQ(consumerResult, ResultOk);
+
+    Latch latch(10);
+    auto sendCallback = [&latch](Result result, const MessageId& id) {
+        ASSERT_EQ(result, ResultOk);
+        latch.countdown();
+    };
+
+    for (int i = 0; i < 10; i++) {
+        std::string content = "testConsumeSuccess" + std::to_string(i);
+        const auto msg = MessageBuilder().setContent(content).build();
+        producer.sendAsync(msg, sendCallback);
+    }
+
+    latch.wait();
+
     Message receivedMsg;
-    Result receiveResult = consumer.receive(receivedMsg);
-    ASSERT_EQ(receiveResult, ResultOk);
-    ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+    for (int i = 0; i < 10; i++) {
+        Result receiveResult =
+            consumer.receive(receivedMsg, 1000);  // Assumed that we wait 1000 
ms for each message
+        ASSERT_EQ(receiveResult, ResultOk);
+        ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" + 
std::to_string(i));
+        ASSERT_EQ(receivedMsg.getIndex(), i);
+    }
     client.close();
 }
 
diff --git a/tests/brokermetadata/docker-compose.yml 
b/tests/brokermetadata/docker-compose.yml
index bb719e0..6b1d36e 100644
--- a/tests/brokermetadata/docker-compose.yml
+++ b/tests/brokermetadata/docker-compose.yml
@@ -35,8 +35,8 @@ services:
       - advertisedAddress=localhost
       - advertisedListeners=external:pulsar://localhost:6650
       - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
-      - 
PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
-      - PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true
+      - 
PULSAR_PREFIX_brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
+      - PULSAR_PREFIX_exposingBrokerEntryMetadataToClientEnabled=true
     ports:
       - "6650:6650"
       - "8080:8080"

Reply via email to