This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new fa3ac76 [feat] PIP 107: Introduce chunk message ID (#148)
fa3ac76 is described below
commit fa3ac76eddb08c3eb9d865332214af5aa5a5fe88
Author: Zike Yang <[email protected]>
AuthorDate: Tue Dec 20 22:12:14 2022 +0800
[feat] PIP 107: Introduce chunk message ID (#148)
Fixes #79
### Motivation
This is the C++ implementation for
https://github.com/apache/pulsar/issues/12402
### Modifications
* Add ChunkMessageIdImpl
* Return ChunkMessageId when the Producer produces the chunk message or
when the consumer consumes the chunk message.
* In cosumer.seek, use the first chunk message-id of the chunk message-id.
This will solve the problem caused by seeking chunk messages. This is also the
impact of this PIP on the original business logic.
---
include/pulsar/Message.h | 3 +-
include/pulsar/MessageId.h | 1 +
lib/ChunkMessageIdImpl.h | 48 ++++++++++++++++++++++++
lib/Commands.cc | 14 ++++++-
lib/ConsumerImpl.cc | 19 +++++++---
lib/ConsumerImpl.h | 5 +--
lib/Message.cc | 5 +--
lib/MessageId.cc | 29 ++++++++++++++-
lib/OpSendMsg.h | 9 +++--
lib/ProducerImpl.cc | 39 +++++++++++++-------
tests/MessageChunkingTest.cc | 88 +++++++++++++++++++++++++++++++++++++++++++-
tests/PulsarFriend.h | 2 +
12 files changed, 228 insertions(+), 34 deletions(-)
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index b7b3fdd..77f30d4 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -188,8 +188,7 @@ class PULSAR_PUBLIC Message {
MessageImplPtr impl_;
Message(MessageImplPtr& impl);
- Message(const proto::CommandMessage& msg, proto::MessageMetadata& data,
SharedBuffer& payload,
- int32_t partition);
+ Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload);
/// Used for Batch Messages
Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload,
proto::SingleMessageMetadata& singleMetadata, const std::string&
topicName);
diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h
index 28b88c8..3871e87 100644
--- a/include/pulsar/MessageId.h
+++ b/include/pulsar/MessageId.h
@@ -108,6 +108,7 @@ class PULSAR_PUBLIC MessageId {
friend class PulsarFriend;
friend class NegativeAcksTracker;
friend class MessageIdBuilder;
+ friend class ChunkMessageIdImpl;
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const
MessageId& messageId);
diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h
new file mode 100644
index 0000000..3081ff0
--- /dev/null
+++ b/lib/ChunkMessageIdImpl.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/MessageId.h>
+
+#include "MessageIdImpl.h"
+
+namespace pulsar {
+class ChunkMessageIdImpl;
+typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
+class ChunkMessageIdImpl : public MessageIdImpl, public
std::enable_shared_from_this<ChunkMessageIdImpl> {
+ public:
+ ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>())
{}
+
+ void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ =
*msgId.impl_; }
+
+ void setLastChunkMessageId(const MessageId& msgId) {
+ this->ledgerId_ = msgId.ledgerId();
+ this->entryId_ = msgId.entryId();
+ this->partition_ = msgId.partition();
+ }
+
+ std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const {
return firstChunkMsgId_; }
+
+ MessageId build() { return
MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
+
+ private:
+ std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
+};
+} // namespace pulsar
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 3cd97e2..43f4316 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -28,6 +28,7 @@
#include "BatchMessageAcker.h"
#include "BatchedMessageIdImpl.h"
+#include "ChunkMessageIdImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
@@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId,
uint64_t requestId, const Me
commandSeek->set_request_id(requestId);
MessageIdData& messageIdData = *commandSeek->mutable_message_id();
- messageIdData.set_ledgerid(messageId.ledgerId());
- messageIdData.set_entryid(messageId.entryId());
+
+ auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
+ if (chunkMsgId) {
+ auto firstId = chunkMsgId->getFirstChunkMessageId();
+ messageIdData.set_ledgerid(firstId->ledgerId_);
+ messageIdData.set_entryid(firstId->entryId_);
+ } else {
+ messageIdData.set_ledgerid(messageId.ledgerId());
+ messageIdData.set_entryid(messageId.entryId());
+ }
+
return writeMessageWithSize(cmd);
}
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index e277d99..8589d25 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -27,6 +27,7 @@
#include "AckGroupingTrackerEnabled.h"
#include "BatchMessageAcker.h"
#include "BatchedMessageIdImpl.h"
+#include "ChunkMessageIdImpl.h"
#include "ClientConnection.h"
#include "ClientImpl.h"
#include "Commands.h"
@@ -375,9 +376,9 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const
SharedBuffer& payload,
const
proto::MessageMetadata& metadata,
- const
MessageId& messageId,
const
proto::MessageIdData& messageIdData,
- const
ClientConnectionPtr& cnx) {
+ const
ClientConnectionPtr& cnx,
+ MessageId&
messageId) {
const auto chunkId = metadata.chunk_id();
const auto uuid = metadata.uuid();
LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " <<
uuid
@@ -432,6 +433,11 @@ boost::optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuff
return boost::none;
}
+ ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
+
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
+
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
+ messageId = chunkMsgId->build();
+
LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ",
ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " <<
metadata.sequence_id());
@@ -472,11 +478,12 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
}
}
+ const auto& messageIdData = msg.message_id();
+ auto messageId =
MessageIdBuilder::from(messageIdData).batchIndex(-1).build();
+
// Only a non-batched messages can be a chunk
if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
- const auto& messageIdData = msg.message_id();
- auto messageId = MessageIdBuilder::from(messageIdData).build();
- auto optionalPayload = processMessageChunk(payload, metadata,
messageId, messageIdData, cnx);
+ auto optionalPayload = processMessageChunk(payload, metadata,
messageIdData, cnx, messageId);
if (optionalPayload) {
payload = optionalPayload.value();
} else {
@@ -484,7 +491,7 @@ void ConsumerImpl::messageReceived(const
ClientConnectionPtr& cnx, const proto::
}
}
- Message m(msg, metadata, payload, partitionIndex_);
+ Message m(messageId, metadata, payload);
m.impl_->cnx_ = cnx.get();
m.impl_->setTopicName(topic_);
m.impl_->setRedeliveryCount(msg.redelivery_count());
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index d2480b8..29d5d0b 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -298,18 +298,17 @@ class ConsumerImpl : public ConsumerImplBase {
*
* @param payload the payload of a chunk
* @param metadata the message metadata
- * @param messageId
* @param messageIdData
* @param cnx
+ * @param messageId
*
* @return the concatenated payload if chunks are concatenated into a
completed message payload
* successfully, else Optional::empty()
*/
boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer&
payload,
const
proto::MessageMetadata& metadata,
- const MessageId&
messageId,
const
proto::MessageIdData& messageIdData,
- const
ClientConnectionPtr& cnx);
+ const
ClientConnectionPtr& cnx, MessageId& messageId);
friend class PulsarFriend;
diff --git a/lib/Message.cc b/lib/Message.cc
index 0f28f7d..545c893 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -69,10 +69,9 @@ Message::Message() : impl_() {}
Message::Message(MessageImplPtr& impl) : impl_(impl) {}
-Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata&
metadata, SharedBuffer& payload,
- int32_t partition)
+Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata,
SharedBuffer& payload)
: impl_(std::make_shared<MessageImpl>()) {
- impl_->messageId =
MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build();
+ impl_->messageId = messageId;
impl_->metadata = metadata;
impl_->payload = payload;
}
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 9a1a38c..9b5205b 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -25,6 +25,7 @@
#include <memory>
#include <stdexcept>
+#include "ChunkMessageIdImpl.h"
#include "MessageIdImpl.h"
#include "PulsarApi.pb.h"
@@ -68,6 +69,17 @@ void MessageId::serialize(std::string& result) const {
idData.set_batch_index(impl_->batchIndex_);
}
+ auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
+ if (chunkMsgId) {
+ proto::MessageIdData& firstChunkIdData =
*idData.mutable_first_chunk_message_id();
+ auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
+ firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
+ firstChunkIdData.set_entryid(firstChunkId->entryId_);
+ if (chunkMsgId->partition_ != -1) {
+ firstChunkIdData.set_partition(firstChunkId->partition_);
+ }
+ }
+
idData.SerializeToString(&result);
}
@@ -80,7 +92,16 @@ MessageId MessageId::deserialize(const std::string&
serializedMessageId) {
throw std::invalid_argument("Failed to parse serialized message id");
}
- return MessageIdBuilder::from(idData).build();
+ MessageId msgId = MessageIdBuilder::from(idData).build();
+
+ if (idData.has_first_chunk_message_id()) {
+ ChunkMessageIdImplPtr chunkMsgId =
std::make_shared<ChunkMessageIdImpl>();
+
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
+ chunkMsgId->setLastChunkMessageId(msgId);
+ return chunkMsgId->build();
+ }
+
+ return msgId;
}
int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
@@ -94,6 +115,12 @@ int32_t MessageId::partition() const { return
impl_->partition_; }
int32_t MessageId::batchSize() const { return impl_->batchSize_; }
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const
pulsar::MessageId& messageId) {
+ auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
+ if (chunkMsgId) {
+ auto firstId = chunkMsgId->getFirstChunkMessageId();
+ s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' <<
firstId->partition_ << ','
+ << firstId->batchIndex_ << ");";
+ }
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_
<< ','
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ <<
')';
return s;
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index d805dd3..d365b90 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -24,6 +24,7 @@
#include <boost/date_time/posix_time/ptime.hpp>
+#include "ChunkMessageIdImpl.h"
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
#include "TimeUtils.h"
@@ -40,13 +41,14 @@ struct OpSendMsg {
uint32_t messagesCount_;
uint64_t messagesSize_;
std::vector<std::function<void(Result)>> trackerCallbacks_;
+ ChunkMessageIdImplPtr chunkedMessageId_;
OpSendMsg() = default;
OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer&
payload,
const SendCallback& sendCallback, uint64_t producerId, uint64_t
sequenceId, int sendTimeoutMs,
- uint32_t messagesCount, uint64_t messagesSize)
- : metadata_(metadata), // the copy happens here because OpSendMsg of
chunks are constructed with the
+ uint32_t messagesCount, uint64_t messagesSize,
ChunkMessageIdImplPtr chunkedMessageId = nullptr)
+ : metadata_(metadata), // the copy happens here because OpSendMsg of
chunks are constructed with
// a shared metadata object
payload_(payload),
sendCallback_(sendCallback),
@@ -54,7 +56,8 @@ struct OpSendMsg {
sequenceId_(sequenceId),
timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
messagesCount_(messagesCount),
- messagesSize_(messagesSize) {}
+ messagesSize_(messagesSize),
+ chunkedMessageId_(chunkedMessageId) {}
void complete(Result result, const MessageId& messageId) const {
if (sendCallback_) {
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index a3a5a95..cdcf14e 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, const SendCallba
msgMetadata.set_total_chunk_msg_size(compressedSize);
}
+ auto chunkMessageId = totalChunks > 1 ?
std::make_shared<ChunkMessageIdImpl>() : nullptr;
+
int beginIndex = 0;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
if (sendChunks) {
@@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, const SendCallba
}
OpSendMsg op{msgMetadata, encryptedPayload, (chunkId ==
totalChunks - 1) ? callback : nullptr,
producerId_, sequenceId, conf_.getSendTimeout(),
- 1, uncompressedSize};
+ 1, uncompressedSize, chunkMessageId};
if (!chunkingEnabled_) {
const uint32_t msgMetadataSize = op.metadata_.ByteSize();
@@ -868,22 +870,33 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId,
MessageId& rawMessageId) {
<< " -- MessageId - " << messageId << " last-seq:
" << expectedSequenceId
<< " producer: " << producerId_);
return true;
- } else {
- // Message was persisted correctly
- LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
- releaseSemaphoreForSendOp(op);
- lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
+ }
- pendingMessagesQueue_.pop_front();
+ // Message was persisted correctly
+ LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
- lock.unlock();
- try {
- op.complete(ResultOk, messageId);
- } catch (const std::exception& e) {
- LOG_ERROR(getName() << "Exception thrown from callback " <<
e.what());
+ if (op.chunkedMessageId_) {
+ // Handling the chunk message id.
+ if (op.metadata_.chunk_id() == 0) {
+ op.chunkedMessageId_->setFirstChunkMessageId(messageId);
+ } else if (op.metadata_.chunk_id() ==
op.metadata_.num_chunks_from_msg() - 1) {
+ op.chunkedMessageId_->setLastChunkMessageId(messageId);
+ messageId = op.chunkedMessageId_->build();
}
- return true;
}
+
+ releaseSemaphoreForSendOp(op);
+ lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
+
+ pendingMessagesQueue_.pop_front();
+
+ lock.unlock();
+ try {
+ op.complete(ResultOk, messageId);
+ } catch (const std::exception& e) {
+ LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
+ }
+ return true;
}
bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata,
SharedBuffer& payload,
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index e3f0178..9a65fbc 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -18,10 +18,12 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <pulsar/MessageIdBuilder.h>
#include <ctime>
#include <random>
+#include "ChunkMessageIdImpl.h"
#include "PulsarFriend.h"
#include "WaitUtils.h"
#include "lib/LogUtils.h"
@@ -116,6 +118,9 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
for (int i = 0; i < numMessages; i++) {
MessageId messageId;
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+ auto chunkMsgId =
+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
+ ASSERT_TRUE(chunkMsgId);
LOG_INFO("Send " << i << " to " << messageId);
sendMessageIds.emplace_back(messageId);
}
@@ -128,7 +133,11 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
ASSERT_EQ(msg.getDataAsString(), largeMessage);
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
- receivedMessageIds.emplace_back(msg.getMessageId());
+ auto messageId = msg.getMessageId();
+ auto chunkMsgId =
+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
+ ASSERT_TRUE(chunkMsgId);
+ receivedMessageIds.emplace_back(messageId);
}
ASSERT_EQ(receivedMessageIds, sendMessageIds);
ASSERT_EQ(receivedMessageIds.front().ledgerId(),
receivedMessageIds.front().ledgerId());
@@ -255,6 +264,83 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
consumer.close();
}
+TEST_P(MessageChunkingTest, testSeekChunkMessages) {
+ const std::string topic =
+ "MessageChunkingTest-testSeekChunkMessages-" + toString(GetParam()) +
std::to_string(time(nullptr));
+
+ constexpr int numMessages = 10;
+
+ Consumer consumer1;
+ ConsumerConfiguration consumer1Conf;
+ consumer1Conf.setStartMessageIdInclusive(true);
+ createConsumer(topic, consumer1, consumer1Conf);
+
+ Producer producer;
+ createProducer(topic, producer);
+
+ for (int i = 0; i < numMessages; i++) {
+ MessageId messageId;
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+ LOG_INFO("Send " << i << " to " << messageId);
+ }
+
+ Message msg;
+ std::vector<MessageId> receivedMessageIds;
+ for (int i = 0; i < numMessages; i++) {
+ ASSERT_EQ(ResultOk, consumer1.receive(msg, 3000));
+ LOG_INFO("Receive " << msg.getLength() << " bytes from " <<
msg.getMessageId());
+ receivedMessageIds.emplace_back(msg.getMessageId());
+ }
+
+ consumer1.seek(receivedMessageIds[1]);
+ for (int i = 1; i < numMessages; i++) {
+ Message msgAfterSeek;
+ ASSERT_EQ(ResultOk, consumer1.receive(msgAfterSeek, 3000));
+ ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]);
+ }
+
+ consumer1.close();
+ Consumer consumer2;
+ createConsumer(topic, consumer2);
+
+ consumer2.seek(receivedMessageIds[1]);
+ for (int i = 2; i < numMessages; i++) {
+ Message msgAfterSeek;
+ ASSERT_EQ(ResultOk, consumer2.receive(msgAfterSeek, 3000));
+ ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]);
+ }
+
+ consumer2.close();
+ producer.close();
+}
+
+TEST(ChunkMessageIdTest, testSetChunkMessageId) {
+ MessageId msgId;
+ {
+ ChunkMessageIdImplPtr chunkMsgId =
std::make_shared<ChunkMessageIdImpl>();
+
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
+
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
+ msgId = chunkMsgId->build();
+ // Test the destructor of the underlying message id should also work
for the generated messageId.
+ }
+
+ std::string msgIdData;
+ msgId.serialize(msgIdData);
+ MessageId deserializedMsgId = MessageId::deserialize(msgIdData);
+
+ ASSERT_EQ(deserializedMsgId.ledgerId(), 4);
+ ASSERT_EQ(deserializedMsgId.entryId(), 5);
+ ASSERT_EQ(deserializedMsgId.partition(), 6);
+
+ auto chunkMsgId =
+
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
+ ASSERT_TRUE(chunkMsgId);
+ auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
+ ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
+ ASSERT_EQ(firstChunkMsgId->entryId_, 2);
+ ASSERT_EQ(firstChunkMsgId->partition_, 3);
+}
+
// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't
have INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest,
::testing::Values(CompressionNone, CompressionLZ4,
CompressionZLib, CompressionZSTD,
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 878d80c..fb6bb59 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -179,6 +179,8 @@ class PulsarFriend {
}
static proto::MessageMetadata& getMessageMetadata(Message& message) {
return message.impl_->metadata; }
+
+ static std::shared_ptr<MessageIdImpl> getMessageIdImpl(MessageId& msgId) {
return msgId.impl_; }
};
} // namespace pulsar