This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new fa3ac76  [feat] PIP 107: Introduce chunk message ID (#148)
fa3ac76 is described below

commit fa3ac76eddb08c3eb9d865332214af5aa5a5fe88
Author: Zike Yang <[email protected]>
AuthorDate: Tue Dec 20 22:12:14 2022 +0800

    [feat] PIP 107: Introduce chunk message ID (#148)
    
    Fixes #79
    
    ### Motivation
    
    This is the C++ implementation for 
https://github.com/apache/pulsar/issues/12402
    
    ### Modifications
    
    * Add ChunkMessageIdImpl
    * Return ChunkMessageId when the Producer produces the chunk message or 
when the consumer consumes the chunk message.
    * In cosumer.seek, use the first chunk message-id of the chunk message-id. 
This will solve the problem caused by seeking chunk messages. This is also the 
impact of this PIP on the original business logic.
---
 include/pulsar/Message.h     |  3 +-
 include/pulsar/MessageId.h   |  1 +
 lib/ChunkMessageIdImpl.h     | 48 ++++++++++++++++++++++++
 lib/Commands.cc              | 14 ++++++-
 lib/ConsumerImpl.cc          | 19 +++++++---
 lib/ConsumerImpl.h           |  5 +--
 lib/Message.cc               |  5 +--
 lib/MessageId.cc             | 29 ++++++++++++++-
 lib/OpSendMsg.h              |  9 +++--
 lib/ProducerImpl.cc          | 39 +++++++++++++-------
 tests/MessageChunkingTest.cc | 88 +++++++++++++++++++++++++++++++++++++++++++-
 tests/PulsarFriend.h         |  2 +
 12 files changed, 228 insertions(+), 34 deletions(-)

diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index b7b3fdd..77f30d4 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -188,8 +188,7 @@ class PULSAR_PUBLIC Message {
     MessageImplPtr impl_;
 
     Message(MessageImplPtr& impl);
-    Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, 
SharedBuffer& payload,
-            int32_t partition);
+    Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload);
     /// Used for Batch Messages
     Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
             proto::SingleMessageMetadata& singleMetadata, const std::string& 
topicName);
diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h
index 28b88c8..3871e87 100644
--- a/include/pulsar/MessageId.h
+++ b/include/pulsar/MessageId.h
@@ -108,6 +108,7 @@ class PULSAR_PUBLIC MessageId {
     friend class PulsarFriend;
     friend class NegativeAcksTracker;
     friend class MessageIdBuilder;
+    friend class ChunkMessageIdImpl;
 
     friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
MessageId& messageId);
 
diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h
new file mode 100644
index 0000000..3081ff0
--- /dev/null
+++ b/lib/ChunkMessageIdImpl.h
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/MessageId.h>
+
+#include "MessageIdImpl.h"
+
+namespace pulsar {
+class ChunkMessageIdImpl;
+typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
+class ChunkMessageIdImpl : public MessageIdImpl, public 
std::enable_shared_from_this<ChunkMessageIdImpl> {
+   public:
+    ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) 
{}
+
+    void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = 
*msgId.impl_; }
+
+    void setLastChunkMessageId(const MessageId& msgId) {
+        this->ledgerId_ = msgId.ledgerId();
+        this->entryId_ = msgId.entryId();
+        this->partition_ = msgId.partition();
+    }
+
+    std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { 
return firstChunkMsgId_; }
+
+    MessageId build() { return 
MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
+
+   private:
+    std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
+};
+}  // namespace pulsar
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 3cd97e2..43f4316 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -28,6 +28,7 @@
 
 #include "BatchMessageAcker.h"
 #include "BatchedMessageIdImpl.h"
+#include "ChunkMessageIdImpl.h"
 #include "LogUtils.h"
 #include "MessageImpl.h"
 #include "PulsarApi.pb.h"
@@ -512,8 +513,17 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, 
uint64_t requestId, const Me
     commandSeek->set_request_id(requestId);
 
     MessageIdData& messageIdData = *commandSeek->mutable_message_id();
-    messageIdData.set_ledgerid(messageId.ledgerId());
-    messageIdData.set_entryid(messageId.entryId());
+
+    auto chunkMsgId = 
std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
+    if (chunkMsgId) {
+        auto firstId = chunkMsgId->getFirstChunkMessageId();
+        messageIdData.set_ledgerid(firstId->ledgerId_);
+        messageIdData.set_entryid(firstId->entryId_);
+    } else {
+        messageIdData.set_ledgerid(messageId.ledgerId());
+        messageIdData.set_entryid(messageId.entryId());
+    }
+
     return writeMessageWithSize(cmd);
 }
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index e277d99..8589d25 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -27,6 +27,7 @@
 #include "AckGroupingTrackerEnabled.h"
 #include "BatchMessageAcker.h"
 #include "BatchedMessageIdImpl.h"
+#include "ChunkMessageIdImpl.h"
 #include "ClientConnection.h"
 #include "ClientImpl.h"
 #include "Commands.h"
@@ -375,9 +376,9 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
 
 boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const 
SharedBuffer& payload,
                                                                 const 
proto::MessageMetadata& metadata,
-                                                                const 
MessageId& messageId,
                                                                 const 
proto::MessageIdData& messageIdData,
-                                                                const 
ClientConnectionPtr& cnx) {
+                                                                const 
ClientConnectionPtr& cnx,
+                                                                MessageId& 
messageId) {
     const auto chunkId = metadata.chunk_id();
     const auto uuid = metadata.uuid();
     LOG_DEBUG("Process message chunk (chunkId: " << chunkId << ", uuid: " << 
uuid
@@ -432,6 +433,11 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
         return boost::none;
     }
 
+    ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
+    
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
+    
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
+    messageId = chunkMsgId->build();
+
     LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", 
ChunkedMessageCtx: " << chunkedMsgCtx
                                                     << ", sequenceId: " << 
metadata.sequence_id());
 
@@ -472,11 +478,12 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         }
     }
 
+    const auto& messageIdData = msg.message_id();
+    auto messageId = 
MessageIdBuilder::from(messageIdData).batchIndex(-1).build();
+
     // Only a non-batched messages can be a chunk
     if (!metadata.has_num_messages_in_batch() && isChunkedMessage) {
-        const auto& messageIdData = msg.message_id();
-        auto messageId = MessageIdBuilder::from(messageIdData).build();
-        auto optionalPayload = processMessageChunk(payload, metadata, 
messageId, messageIdData, cnx);
+        auto optionalPayload = processMessageChunk(payload, metadata, 
messageIdData, cnx, messageId);
         if (optionalPayload) {
             payload = optionalPayload.value();
         } else {
@@ -484,7 +491,7 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         }
     }
 
-    Message m(msg, metadata, payload, partitionIndex_);
+    Message m(messageId, metadata, payload);
     m.impl_->cnx_ = cnx.get();
     m.impl_->setTopicName(topic_);
     m.impl_->setRedeliveryCount(msg.redelivery_count());
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index d2480b8..29d5d0b 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -298,18 +298,17 @@ class ConsumerImpl : public ConsumerImplBase {
      *
      * @param payload the payload of a chunk
      * @param metadata the message metadata
-     * @param messageId
      * @param messageIdData
      * @param cnx
+     * @param messageId
      *
      * @return the concatenated payload if chunks are concatenated into a 
completed message payload
      *   successfully, else Optional::empty()
      */
     boost::optional<SharedBuffer> processMessageChunk(const SharedBuffer& 
payload,
                                                       const 
proto::MessageMetadata& metadata,
-                                                      const MessageId& 
messageId,
                                                       const 
proto::MessageIdData& messageIdData,
-                                                      const 
ClientConnectionPtr& cnx);
+                                                      const 
ClientConnectionPtr& cnx, MessageId& messageId);
 
     friend class PulsarFriend;
 
diff --git a/lib/Message.cc b/lib/Message.cc
index 0f28f7d..545c893 100644
--- a/lib/Message.cc
+++ b/lib/Message.cc
@@ -69,10 +69,9 @@ Message::Message() : impl_() {}
 
 Message::Message(MessageImplPtr& impl) : impl_(impl) {}
 
-Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& 
metadata, SharedBuffer& payload,
-                 int32_t partition)
+Message::Message(const MessageId& messageId, proto::MessageMetadata& metadata, 
SharedBuffer& payload)
     : impl_(std::make_shared<MessageImpl>()) {
-    impl_->messageId = 
MessageIdBuilder::from(msg.message_id()).batchIndex(-1).build();
+    impl_->messageId = messageId;
     impl_->metadata = metadata;
     impl_->payload = payload;
 }
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 9a1a38c..9b5205b 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -25,6 +25,7 @@
 #include <memory>
 #include <stdexcept>
 
+#include "ChunkMessageIdImpl.h"
 #include "MessageIdImpl.h"
 #include "PulsarApi.pb.h"
 
@@ -68,6 +69,17 @@ void MessageId::serialize(std::string& result) const {
         idData.set_batch_index(impl_->batchIndex_);
     }
 
+    auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
+    if (chunkMsgId) {
+        proto::MessageIdData& firstChunkIdData = 
*idData.mutable_first_chunk_message_id();
+        auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
+        firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
+        firstChunkIdData.set_entryid(firstChunkId->entryId_);
+        if (chunkMsgId->partition_ != -1) {
+            firstChunkIdData.set_partition(firstChunkId->partition_);
+        }
+    }
+
     idData.SerializeToString(&result);
 }
 
@@ -80,7 +92,16 @@ MessageId MessageId::deserialize(const std::string& 
serializedMessageId) {
         throw std::invalid_argument("Failed to parse serialized message id");
     }
 
-    return MessageIdBuilder::from(idData).build();
+    MessageId msgId = MessageIdBuilder::from(idData).build();
+
+    if (idData.has_first_chunk_message_id()) {
+        ChunkMessageIdImplPtr chunkMsgId = 
std::make_shared<ChunkMessageIdImpl>();
+        
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
+        chunkMsgId->setLastChunkMessageId(msgId);
+        return chunkMsgId->build();
+    }
+
+    return msgId;
 }
 
 int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
@@ -94,6 +115,12 @@ int32_t MessageId::partition() const { return 
impl_->partition_; }
 int32_t MessageId::batchSize() const { return impl_->batchSize_; }
 
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
pulsar::MessageId& messageId) {
+    auto chunkMsgId = 
std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
+    if (chunkMsgId) {
+        auto firstId = chunkMsgId->getFirstChunkMessageId();
+        s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << 
firstId->partition_ << ','
+          << firstId->batchIndex_ << ");";
+    }
     s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ 
<< ','
       << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << 
')';
     return s;
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index d805dd3..d365b90 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -24,6 +24,7 @@
 
 #include <boost/date_time/posix_time/ptime.hpp>
 
+#include "ChunkMessageIdImpl.h"
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
 #include "TimeUtils.h"
@@ -40,13 +41,14 @@ struct OpSendMsg {
     uint32_t messagesCount_;
     uint64_t messagesSize_;
     std::vector<std::function<void(Result)>> trackerCallbacks_;
+    ChunkMessageIdImplPtr chunkedMessageId_;
 
     OpSendMsg() = default;
 
     OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& 
payload,
               const SendCallback& sendCallback, uint64_t producerId, uint64_t 
sequenceId, int sendTimeoutMs,
-              uint32_t messagesCount, uint64_t messagesSize)
-        : metadata_(metadata),  // the copy happens here because OpSendMsg of 
chunks are constructed with the
+              uint32_t messagesCount, uint64_t messagesSize, 
ChunkMessageIdImplPtr chunkedMessageId = nullptr)
+        : metadata_(metadata),  // the copy happens here because OpSendMsg of 
chunks are constructed with
                                 // a shared metadata object
           payload_(payload),
           sendCallback_(sendCallback),
@@ -54,7 +56,8 @@ struct OpSendMsg {
           sequenceId_(sequenceId),
           timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
           messagesCount_(messagesCount),
-          messagesSize_(messagesSize) {}
+          messagesSize_(messagesSize),
+          chunkedMessageId_(chunkedMessageId) {}
 
     void complete(Result result, const MessageId& messageId) const {
         if (sendCallback_) {
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index a3a5a95..cdcf14e 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -562,6 +562,8 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
             msgMetadata.set_total_chunk_msg_size(compressedSize);
         }
 
+        auto chunkMessageId = totalChunks > 1 ? 
std::make_shared<ChunkMessageIdImpl>() : nullptr;
+
         int beginIndex = 0;
         for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
             if (sendChunks) {
@@ -578,7 +580,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
             }
             OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == 
totalChunks - 1) ? callback : nullptr,
                          producerId_, sequenceId,       conf_.getSendTimeout(),
-                         1,           uncompressedSize};
+                         1,           uncompressedSize, chunkMessageId};
 
             if (!chunkingEnabled_) {
                 const uint32_t msgMetadataSize = op.metadata_.ByteSize();
@@ -868,22 +870,33 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
                             << " -- MessageId - " << messageId << " last-seq: 
" << expectedSequenceId
                             << " producer: " << producerId_);
         return true;
-    } else {
-        // Message was persisted correctly
-        LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
-        releaseSemaphoreForSendOp(op);
-        lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
+    }
 
-        pendingMessagesQueue_.pop_front();
+    // Message was persisted correctly
+    LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
 
-        lock.unlock();
-        try {
-            op.complete(ResultOk, messageId);
-        } catch (const std::exception& e) {
-            LOG_ERROR(getName() << "Exception thrown from callback " << 
e.what());
+    if (op.chunkedMessageId_) {
+        // Handling the chunk message id.
+        if (op.metadata_.chunk_id() == 0) {
+            op.chunkedMessageId_->setFirstChunkMessageId(messageId);
+        } else if (op.metadata_.chunk_id() == 
op.metadata_.num_chunks_from_msg() - 1) {
+            op.chunkedMessageId_->setLastChunkMessageId(messageId);
+            messageId = op.chunkedMessageId_->build();
         }
-        return true;
     }
+
+    releaseSemaphoreForSendOp(op);
+    lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
+
+    pendingMessagesQueue_.pop_front();
+
+    lock.unlock();
+    try {
+        op.complete(ResultOk, messageId);
+    } catch (const std::exception& e) {
+        LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
+    }
+    return true;
 }
 
 bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, 
SharedBuffer& payload,
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index e3f0178..9a65fbc 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -18,10 +18,12 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
+#include <pulsar/MessageIdBuilder.h>
 
 #include <ctime>
 #include <random>
 
+#include "ChunkMessageIdImpl.h"
 #include "PulsarFriend.h"
 #include "WaitUtils.h"
 #include "lib/LogUtils.h"
@@ -116,6 +118,9 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
     for (int i = 0; i < numMessages; i++) {
         MessageId messageId;
         ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+        auto chunkMsgId =
+            
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
+        ASSERT_TRUE(chunkMsgId);
         LOG_INFO("Send " << i << " to " << messageId);
         sendMessageIds.emplace_back(messageId);
     }
@@ -128,7 +133,11 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
         ASSERT_EQ(msg.getDataAsString(), largeMessage);
         ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
         ASSERT_EQ(msg.getMessageId().batchSize(), 0);
-        receivedMessageIds.emplace_back(msg.getMessageId());
+        auto messageId = msg.getMessageId();
+        auto chunkMsgId =
+            
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
+        ASSERT_TRUE(chunkMsgId);
+        receivedMessageIds.emplace_back(messageId);
     }
     ASSERT_EQ(receivedMessageIds, sendMessageIds);
     ASSERT_EQ(receivedMessageIds.front().ledgerId(), 
receivedMessageIds.front().ledgerId());
@@ -255,6 +264,83 @@ TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
     consumer.close();
 }
 
+TEST_P(MessageChunkingTest, testSeekChunkMessages) {
+    const std::string topic =
+        "MessageChunkingTest-testSeekChunkMessages-" + toString(GetParam()) + 
std::to_string(time(nullptr));
+
+    constexpr int numMessages = 10;
+
+    Consumer consumer1;
+    ConsumerConfiguration consumer1Conf;
+    consumer1Conf.setStartMessageIdInclusive(true);
+    createConsumer(topic, consumer1, consumer1Conf);
+
+    Producer producer;
+    createProducer(topic, producer);
+
+    for (int i = 0; i < numMessages; i++) {
+        MessageId messageId;
+        ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
+        LOG_INFO("Send " << i << " to " << messageId);
+    }
+
+    Message msg;
+    std::vector<MessageId> receivedMessageIds;
+    for (int i = 0; i < numMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer1.receive(msg, 3000));
+        LOG_INFO("Receive " << msg.getLength() << " bytes from " << 
msg.getMessageId());
+        receivedMessageIds.emplace_back(msg.getMessageId());
+    }
+
+    consumer1.seek(receivedMessageIds[1]);
+    for (int i = 1; i < numMessages; i++) {
+        Message msgAfterSeek;
+        ASSERT_EQ(ResultOk, consumer1.receive(msgAfterSeek, 3000));
+        ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]);
+    }
+
+    consumer1.close();
+    Consumer consumer2;
+    createConsumer(topic, consumer2);
+
+    consumer2.seek(receivedMessageIds[1]);
+    for (int i = 2; i < numMessages; i++) {
+        Message msgAfterSeek;
+        ASSERT_EQ(ResultOk, consumer2.receive(msgAfterSeek, 3000));
+        ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]);
+    }
+
+    consumer2.close();
+    producer.close();
+}
+
+TEST(ChunkMessageIdTest, testSetChunkMessageId) {
+    MessageId msgId;
+    {
+        ChunkMessageIdImplPtr chunkMsgId = 
std::make_shared<ChunkMessageIdImpl>();
+        
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
+        
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
+        msgId = chunkMsgId->build();
+        // Test the destructor of the underlying message id should also work 
for the generated messageId.
+    }
+
+    std::string msgIdData;
+    msgId.serialize(msgIdData);
+    MessageId deserializedMsgId = MessageId::deserialize(msgIdData);
+
+    ASSERT_EQ(deserializedMsgId.ledgerId(), 4);
+    ASSERT_EQ(deserializedMsgId.entryId(), 5);
+    ASSERT_EQ(deserializedMsgId.partition(), 6);
+
+    auto chunkMsgId =
+        
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
+    ASSERT_TRUE(chunkMsgId);
+    auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
+    ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
+    ASSERT_EQ(firstChunkMsgId->entryId_, 2);
+    ASSERT_EQ(firstChunkMsgId->partition_, 3);
+}
+
 // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't 
have INSTANTIATE_TEST_SUITE_P
 INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest,
                         ::testing::Values(CompressionNone, CompressionLZ4, 
CompressionZLib, CompressionZSTD,
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 878d80c..fb6bb59 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -179,6 +179,8 @@ class PulsarFriend {
     }
 
     static proto::MessageMetadata& getMessageMetadata(Message& message) { 
return message.impl_->metadata; }
+
+    static std::shared_ptr<MessageIdImpl> getMessageIdImpl(MessageId& msgId) { 
return msgId.impl_; }
 };
 }  // namespace pulsar
 

Reply via email to