This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d03e46a Add getIndex method on Message (#277)
d03e46a is described below
commit d03e46a8dfbf573f622fccd917f6f2796185c233
Author: ZhangJian He <[email protected]>
AuthorDate: Wed Jun 7 11:54:30 2023 +0800
Add getIndex method on Message (#277)
### Motivation
After #276 , We already can consume messages contains index well. Now we
can add a method allow user to getIndex.
Index is an optional brokerMetadata. If the index not exists, we will
return -1
### Verifying this change
- Add the assertions to assert the index not equals -1.
---
include/pulsar/Message.h | 13 +++++-
lib/ClientConnection.cc | 9 ++--
lib/Commands.cc | 4 +-
lib/ConsumerImpl.cc | 12 ++++--
lib/ConsumerImpl.h | 4 +-
lib/Message.cc | 17 +++++++-
lib/MessageImpl.cc | 2 -
lib/MessageImpl.h | 3 +-
tests/ConsumerTest.cc | 21 ++++++++++
tests/brokermetadata/BrokerMetadataTest.cc | 67 ++++++++++++++++++++++++++----
tests/brokermetadata/docker-compose.yml | 4 +-
11 files changed, 127 insertions(+), 29 deletions(-)
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index fe99131..c8c18fa 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -31,6 +31,7 @@
namespace pulsar {
namespace proto {
class CommandMessage;
+class BrokerEntryMetadata;
class MessageMetadata;
class SingleMessageMetadata;
} // namespace proto
@@ -124,6 +125,12 @@ class PULSAR_PUBLIC Message {
*/
void setMessageId(const MessageId& messageId) const;
+ /**
+ * Get the index of this message, if it doesn't exist, return -1
+ * @return
+ */
+ int64_t getIndex() const;
+
/**
* Get the partition key for this message
* @return key string that is hashed to determine message's topic partition
@@ -195,9 +202,11 @@ class PULSAR_PUBLIC Message {
MessageImplPtr impl_;
Message(MessageImplPtr& impl);
- Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload);
+ Message(const MessageId& messageId, proto::BrokerEntryMetadata&
brokerEntryMetadata,
+ proto::MessageMetadata& metadata, SharedBuffer& payload);
/// Used for Batch Messages
- Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload,
+ Message(const MessageId& messageId, proto::BrokerEntryMetadata&
brokerEntryMetadata,
+ proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata, const
std::shared_ptr<std::string>& topicName);
friend class PartitionedProducerImpl;
friend class MultiTopicsConsumerImpl;
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index d955b32..4af5da0 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -683,7 +683,6 @@ void ClientConnection::processIncomingBuffer() {
// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
- bool isChecksumValid = verifyChecksum(incomingBuffer_,
remainingBytes, incomingCmd);
auto readerIndex = incomingBuffer_.readerIndex();
if (incomingBuffer_.readUnsignedShort() ==
Commands::magicBrokerEntryMetadata) {
@@ -698,13 +697,14 @@ void ClientConnection::processIncomingBuffer() {
close();
return;
}
-
- incomingBuffer_.consume(brokerEntryMetadataSize);
+ incomingBuffer_.setReaderIndex(readerIndex + 2 + 4 +
brokerEntryMetadataSize);
remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
} else {
incomingBuffer_.setReaderIndex(readerIndex);
}
+ bool isChecksumValid = verifyChecksum(incomingBuffer_,
remainingBytes, incomingCmd);
+
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(),
metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " <<
incomingCmd.message().consumer_id() //
@@ -817,7 +817,8 @@ void ClientConnection::handleIncomingMessage(const
proto::CommandMessage& msg, b
// Unlock the mutex before notifying the consumer of the
// new received message
lock.unlock();
- consumer->messageReceived(shared_from_this(), msg,
isChecksumValid, msgMetadata, payload);
+ consumer->messageReceived(shared_from_this(), msg,
isChecksumValid, brokerEntryMetadata,
+ msgMetadata, payload);
} else {
consumers_.erase(msg.consumer_id());
LOG_DEBUG(cnxString_ << "Ignoring incoming message for already
destroyed consumer "
diff --git a/lib/Commands.cc b/lib/Commands.cc
index cec9d3b..3245b53 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -277,6 +277,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr&
authentication, const
FeatureFlags* flags = connect->mutable_feature_flags();
flags->set_supports_auth_refresh(true);
+ flags->set_supports_broker_entry_metadata(true);
if (connectingThroughProxy) {
Url logicalAddressUrl;
Url::parse(logicalAddress, logicalAddressUrl);
@@ -908,7 +909,8 @@ Message Commands::deSerializeSingleMessageInBatch(Message&
batchedMessage, int32
const MessageId& m = batchedMessage.impl_->messageId;
auto messageId =
MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
auto batchedMessageId =
std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
- Message singleMessage(MessageId{batchedMessageId},
batchedMessage.impl_->metadata, payload, metadata,
+ Message singleMessage(MessageId{batchedMessageId},
batchedMessage.impl_->brokerEntryMetadata,
+ batchedMessage.impl_->metadata, payload, metadata,
batchedMessage.impl_->topicName_);
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 04f7142..666361b 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -495,8 +495,8 @@ boost::optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuff
}
void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const
proto::CommandMessage& msg,
- bool& isChecksumValid,
proto::MessageMetadata& metadata,
- SharedBuffer& payload) {
+ bool& isChecksumValid,
proto::BrokerEntryMetadata& brokerEntryMetadata,
+ proto::MessageMetadata& metadata,
SharedBuffer& payload) {
LOG_DEBUG(getName() << "Received Message -- Size: " <<
payload.readableBytes());
if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
@@ -536,7 +536,7 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
}
}
- Message m(messageId, metadata, payload);
+ Message m(messageId, brokerEntryMetadata, metadata, payload);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
m.impl_->setRedeliveryCount(msg.redelivery_count());
@@ -565,7 +565,7 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
Lock lock(mutex_);
numOfMessageReceived = receiveIndividualMessagesFromBatch(cnx, m,
ackSet, msg.redelivery_count());
} else {
- // try convery key value data.
+ // try convert key value data.
m.impl_->convertPayloadToKeyValue(config_.getSchema());
const auto startMessageId = startMessageId_.get();
@@ -706,6 +706,10 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.impl_->topicName_);
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
+ if (msg.impl_->brokerEntryMetadata.has_index()) {
+
msg.impl_->brokerEntryMetadata.set_index(msg.impl_->brokerEntryMetadata.index()
- batchSize + i +
+ 1);
+ }
if (redeliveryCount >= deadLetterPolicy_.getMaxRedeliverCount()) {
possibleToDeadLetter.emplace_back(msg);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 158132f..def2543 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -60,6 +60,7 @@ using UnAckedMessageTrackerPtr =
std::shared_ptr<UnAckedMessageTrackerInterface>
namespace proto {
class CommandMessage;
+class BrokerEntryMetadata;
class MessageMetadata;
} // namespace proto
@@ -87,7 +88,8 @@ class ConsumerImpl : public ConsumerImplBase {
void sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int
numMessages);
uint64_t getConsumerId();
void messageReceived(const ClientConnectionPtr& cnx, const
proto::CommandMessage& msg,
- bool& isChecksumValid, proto::MessageMetadata&
msgMetadata, SharedBuffer& payload);
+ bool& isChecksumValid, proto::BrokerEntryMetadata&
brokerEntryMetadata,
+ proto::MessageMetadata& msgMetadata, SharedBuffer&
payload);
void messageProcessed(Message& msg, bool track = true);
void activeConsumerChanged(bool isActive);
inline CommandSubscribe_SubType getSubType();
diff --git a/lib/Message.cc b/lib/Message.cc
index bfd65f4..06fa5db 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -70,17 +70,21 @@ Message::Message() : impl_() {}
Message::Message(MessageImplPtr& impl) : impl_(impl) {}
-Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload)
+Message::Message(const MessageId& messageId, proto::BrokerEntryMetadata&
brokerEntryMetadata,
+ proto::MessageMetadata& metadata, SharedBuffer& payload)
: impl_(std::make_shared<MessageImpl>()) {
impl_->messageId = messageId;
+ impl_->brokerEntryMetadata = brokerEntryMetadata;
impl_->metadata = metadata;
impl_->payload = payload;
}
-Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata,
SharedBuffer& payload,
+Message::Message(const MessageId& messageID, proto::BrokerEntryMetadata&
brokerEntryMetadata,
+ proto::MessageMetadata& metadata, SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata, const
std::shared_ptr<std::string>& topicName)
: impl_(std::make_shared<MessageImpl>()) {
impl_->messageId = messageID;
+ impl_->brokerEntryMetadata = brokerEntryMetadata;
impl_->metadata = metadata;
impl_->payload = payload;
impl_->metadata.mutable_properties()->CopyFrom(singleMetadata.properties());
@@ -136,6 +140,15 @@ void Message::setMessageId(const MessageId& messageID)
const {
return;
}
+int64_t Message::getIndex() const {
+ if (!impl_ || !impl_->brokerEntryMetadata.has_index()) {
+ return -1;
+ } else {
+ // casting uint64_t to int64_t, server definition ensures that's safe
+ return static_cast<int64_t>(impl_->brokerEntryMetadata.index());
+ }
+}
+
bool Message::hasPartitionKey() const {
if (impl_) {
return impl_->hasPartitionKey();
diff --git a/lib/MessageImpl.cc b/lib/MessageImpl.cc
index e70ef5d..4650b99 100644
--- a/lib/MessageImpl.cc
+++ b/lib/MessageImpl.cc
@@ -20,8 +20,6 @@
namespace pulsar {
-MessageImpl::MessageImpl() : metadata(), payload(), messageId(), cnx_(0),
topicName_(), redeliveryCount_() {}
-
const Message::StringMap& MessageImpl::properties() {
if (properties_.size() == 0) {
for (int i = 0; i < metadata.properties_size(); i++) {
diff --git a/lib/MessageImpl.h b/lib/MessageImpl.h
index 1046ede..cc07c58 100644
--- a/lib/MessageImpl.h
+++ b/lib/MessageImpl.h
@@ -35,10 +35,9 @@ class BatchMessageContainer;
class MessageImpl {
public:
- MessageImpl();
-
const Message::StringMap& properties();
+ proto::BrokerEntryMetadata brokerEntryMetadata;
proto::MessageMetadata metadata;
SharedBuffer payload;
std::shared_ptr<KeyValueImpl> keyValuePtr;
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index b6956fe..9f28dfb 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -93,6 +93,27 @@ class ActiveInactiveListenerEvent : public
ConsumerEventListener {
std::mutex mutex_;
};
+TEST(ConsumerTest, testConsumerIndex) {
+ Client client(lookupUrl);
+ const std::string topicName = "testConsumerIndex-topic-" +
std::to_string(time(nullptr));
+ const std::string subName = "sub";
+ Producer producer;
+ Result producerResult = client.createProducer(topicName, producer);
+ ASSERT_EQ(producerResult, ResultOk);
+ Consumer consumer;
+ Result consumerResult = client.subscribe(topicName, subName, consumer);
+ ASSERT_EQ(consumerResult, ResultOk);
+ const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
+ Result sendResult = producer.send(msg);
+ ASSERT_EQ(sendResult, ResultOk);
+ Message receivedMsg;
+ Result receiveResult = consumer.receive(receivedMsg);
+ ASSERT_EQ(receiveResult, ResultOk);
+ ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+ ASSERT_EQ(receivedMsg.getIndex(), -1);
+ client.close();
+}
+
typedef std::shared_ptr<ActiveInactiveListenerEvent>
ActiveInactiveListenerEventPtr;
TEST(ConsumerTest, testConsumerEventWithoutPartition) {
diff --git a/tests/brokermetadata/BrokerMetadataTest.cc
b/tests/brokermetadata/BrokerMetadataTest.cc
index f012734..ccf6eeb 100644
--- a/tests/brokermetadata/BrokerMetadataTest.cc
+++ b/tests/brokermetadata/BrokerMetadataTest.cc
@@ -20,24 +20,73 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include "lib/Latch.h"
+
using namespace pulsar;
TEST(BrokerMetadataTest, testConsumeSuccess) {
Client client{"pulsar://localhost:6650"};
Producer producer;
- Result producerResult =
client.createProducer("persistent://public/default/testConsumeSuccess",
producer);
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setBatchingEnabled(false);
+ Result producerResult =
+ client.createProducer("persistent://public/default/topic-non-batch",
producerConfiguration, producer);
ASSERT_EQ(producerResult, ResultOk);
Consumer consumer;
- Result consumerResult =
- client.subscribe("persistent://public/default/testConsumeSuccess",
"testConsumeSuccess", consumer);
+ Result consumerResult =
client.subscribe("persistent://public/default/topic-non-batch", "sub",
consumer);
ASSERT_EQ(consumerResult, ResultOk);
- const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
- Result sendResult = producer.send(msg);
- ASSERT_EQ(sendResult, ResultOk);
+ for (int i = 0; i < 10; i++) {
+ std::string content = "testConsumeSuccess" + std::to_string(i);
+ const auto msg = MessageBuilder().setContent(content).build();
+ Result sendResult = producer.send(msg);
+ ASSERT_EQ(sendResult, ResultOk);
+ }
+
+ Message receivedMsg;
+ for (int i = 0; i < 10; i++) {
+ Result receiveResult =
+ consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000
ms for each message
+ printf("receive index: %d\n", i);
+ ASSERT_EQ(receiveResult, ResultOk);
+ ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" +
std::to_string(i));
+ ASSERT_EQ(receivedMsg.getIndex(), i);
+ Result ackResult = consumer.acknowledge(receivedMsg);
+ ASSERT_EQ(ackResult, ResultOk);
+ }
+ client.close();
+}
+
+TEST(BrokerMetadataTest, testConsumeBatchSuccess) {
+ Client client{"pulsar://localhost:6650"};
+ Producer producer;
+ Result producerResult =
client.createProducer("persistent://public/default/topic-batch", producer);
+ ASSERT_EQ(producerResult, ResultOk);
+ Consumer consumer;
+ Result consumerResult =
client.subscribe("persistent://public/default/topic-batch", "sub", consumer);
+ ASSERT_EQ(consumerResult, ResultOk);
+
+ Latch latch(10);
+ auto sendCallback = [&latch](Result result, const MessageId& id) {
+ ASSERT_EQ(result, ResultOk);
+ latch.countdown();
+ };
+
+ for (int i = 0; i < 10; i++) {
+ std::string content = "testConsumeSuccess" + std::to_string(i);
+ const auto msg = MessageBuilder().setContent(content).build();
+ producer.sendAsync(msg, sendCallback);
+ }
+
+ latch.wait();
+
Message receivedMsg;
- Result receiveResult = consumer.receive(receivedMsg);
- ASSERT_EQ(receiveResult, ResultOk);
- ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+ for (int i = 0; i < 10; i++) {
+ Result receiveResult =
+ consumer.receive(receivedMsg, 1000); // Assumed that we wait 1000
ms for each message
+ ASSERT_EQ(receiveResult, ResultOk);
+ ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess" +
std::to_string(i));
+ ASSERT_EQ(receivedMsg.getIndex(), i);
+ }
client.close();
}
diff --git a/tests/brokermetadata/docker-compose.yml
b/tests/brokermetadata/docker-compose.yml
index bb719e0..6b1d36e 100644
--- a/tests/brokermetadata/docker-compose.yml
+++ b/tests/brokermetadata/docker-compose.yml
@@ -35,8 +35,8 @@ services:
- advertisedAddress=localhost
- advertisedListeners=external:pulsar://localhost:6650
- PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
- -
PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
- - PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true
+ -
PULSAR_PREFIX_brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
+ - PULSAR_PREFIX_exposingBrokerEntryMetadataToClientEnabled=true
ports:
- "6650:6650"
- "8080:8080"