This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new eea59bb  [fix] Fix consumer doesn't acknowledge all chunk message Ids 
(#321)
eea59bb is described below

commit eea59bb4458c6ea190f51732d41fdb32c9ad6327
Author: Zike Yang <[email protected]>
AuthorDate: Sat Oct 7 13:40:24 2023 +0800

    [fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
---
 lib/AckGroupingTracker.cc    | 33 +++++++++++++++++++++++++++----
 lib/ChunkMessageIdImpl.h     | 18 ++++++++---------
 lib/Commands.cc              |  6 +++---
 lib/ConsumerImpl.cc          |  8 ++++----
 lib/ConsumerImpl.h           |  4 ++--
 lib/MessageId.cc             | 19 +++++++++---------
 lib/OpSendMsg.h              |  8 +++++---
 lib/ProducerImpl.cc          | 19 +++++++++---------
 tests/MessageChunkingTest.cc | 47 +++++++++++++++++++++++++++++---------------
 9 files changed, 100 insertions(+), 62 deletions(-)

diff --git a/lib/AckGroupingTracker.cc b/lib/AckGroupingTracker.cc
index 9a47135..ab7381f 100644
--- a/lib/AckGroupingTracker.cc
+++ b/lib/AckGroupingTracker.cc
@@ -21,8 +21,10 @@
 
 #include <atomic>
 #include <limits>
+#include <set>
 
 #include "BitSet.h"
+#include "ChunkMessageIdImpl.h"
 #include "ClientConnection.h"
 #include "Commands.h"
 #include "LogUtils.h"
@@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& 
msgId, ResultCallback c
         }
         return;
     }
+    if (ackType == CommandAck_AckType_Individual) {
+        // If it's individual ack, we need to acknowledge all message IDs in a 
chunked message Id
+        // If it's cumulative ack, we only need to ack the last message ID of 
a chunked message.
+        // ChunkedMessageId return last chunk message ID by default, so we 
don't need to handle it.
+        if (auto chunkMessageId =
+                
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId)))
 {
+            auto msgIdList = chunkMessageId->getChunkedMessageIds();
+            doImmediateAck(std::set<MessageId>(msgIdList.begin(), 
msgIdList.end()), callback);
+            return;
+        }
+    }
     const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
     if (waitResponse_) {
         const auto requestId = requestIdSupplier_();
@@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const 
std::set<MessageId>& msgIds, Resul
         return;
     }
 
+    std::set<MessageId> ackMsgIds;
+
+    for (const auto& msgId : msgIds) {
+        if (auto chunkMessageId =
+                
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId)))
 {
+            auto msgIdList = chunkMessageId->getChunkedMessageIds();
+            ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
+        } else {
+            ackMsgIds.insert(msgId);
+        }
+    }
+
     if 
(Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion()))
 {
         if (waitResponse_) {
             const auto requestId = requestIdSupplier_();
-            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
msgIds, requestId), requestId)
+            cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds, requestId), requestId)
                 .addListener([callback](Result result, const ResponseData&) {
                     if (callback) {
                         callback(result);
                     }
                 });
         } else {
-            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, 
msgIds));
+            cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, 
ackMsgIds));
             if (callback) {
                 callback(ResultOk);
             }
         }
     } else {
-        auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
+        auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
         auto wrappedCallback = [callback, count](Result result) {
             if (--*count == 0 && callback) {
                 callback(result);
             }
         };
-        for (auto&& msgId : msgIds) {
+        for (auto&& msgId : ackMsgIds) {
             doImmediateAck(msgId, wrappedCallback, 
CommandAck_AckType_Individual);
         }
     }
diff --git a/lib/ChunkMessageIdImpl.h b/lib/ChunkMessageIdImpl.h
index 3081ff0..3fb0f13 100644
--- a/lib/ChunkMessageIdImpl.h
+++ b/lib/ChunkMessageIdImpl.h
@@ -28,21 +28,19 @@ class ChunkMessageIdImpl;
 typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
 class ChunkMessageIdImpl : public MessageIdImpl, public 
std::enable_shared_from_this<ChunkMessageIdImpl> {
    public:
-    ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) 
{}
-
-    void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = 
*msgId.impl_; }
-
-    void setLastChunkMessageId(const MessageId& msgId) {
-        this->ledgerId_ = msgId.ledgerId();
-        this->entryId_ = msgId.entryId();
-        this->partition_ = msgId.partition();
+    explicit ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
+        : chunkedMessageIds_(std::move(chunkedMessageIds)) {
+        auto lastChunkMsgId = chunkedMessageIds_.back();
+        this->ledgerId_ = lastChunkMsgId.ledgerId();
+        this->entryId_ = lastChunkMsgId.entryId();
+        this->partition_ = lastChunkMsgId.partition();
     }
 
-    std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { 
return firstChunkMsgId_; }
+    const std::vector<MessageId>& getChunkedMessageIds() const noexcept { 
return chunkedMessageIds_; }
 
     MessageId build() { return 
MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }
 
    private:
-    std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
+    std::vector<MessageId> chunkedMessageIds_;
 };
 }  // namespace pulsar
diff --git a/lib/Commands.cc b/lib/Commands.cc
index f2e6c6d..4b10b73 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, 
uint64_t requestId, const Me
 
     auto chunkMsgId = 
std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
     if (chunkMsgId) {
-        auto firstId = chunkMsgId->getFirstChunkMessageId();
-        messageIdData.set_ledgerid(firstId->ledgerId_);
-        messageIdData.set_entryid(firstId->entryId_);
+        const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
+        messageIdData.set_ledgerid(firstId.ledgerId());
+        messageIdData.set_entryid(firstId.entryId());
     } else {
         messageIdData.set_ledgerid(messageId.ledgerId());
         messageIdData.set_entryid(messageId.entryId());
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index d82cf78..52f0440 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -479,10 +479,7 @@ boost::optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuff
         return boost::none;
     }
 
-    ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
-    
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
-    
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
-    messageId = chunkMsgId->build();
+    messageId = 
std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();
 
     LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", 
ChunkedMessageCtx: " << chunkedMsgCtx
                                                     << ", sequenceId: " << 
metadata.sequence_id());
@@ -1174,6 +1171,9 @@ std::pair<MessageId, bool> 
ConsumerImpl::prepareIndividualAck(const MessageId& m
                                                    (batchSize > 0) ? batchSize 
: 1);
         unAckedMessageTrackerPtr_->remove(messageId);
         possibleSendToDeadLetterTopicMessages_.remove(messageId);
+        if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
+            return std::make_pair(messageId, true);
+        }
         return std::make_pair(discardBatch(messageId), true);
     } else if (config_.isBatchIndexAckEnabled()) {
         return std::make_pair(messageId, true);
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index def2543..690d8fc 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase {
 
         const std::vector<MessageId>& getChunkedMessageIds() const noexcept { 
return chunkedMessageIds_; }
 
+        std::vector<MessageId> moveChunkedMessageIds() noexcept { return 
std::move(chunkedMessageIds_); }
+
         long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
 
         friend std::ostream& operator<<(std::ostream& os, const 
ChunkedMessageCtx& ctx) {
@@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase {
     // concurrently on the topic) then it guards against broken chunked 
message which was not fully published
     const bool autoAckOldestChunkedMessageOnQueueFull_;
 
-    // The key is UUID, value is the associated ChunkedMessageCtx of the 
chunked message.
-    std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
     // This list contains all the keys of `chunkedMessagesMap_`, each key is 
an UUID that identifies a pending
     // chunked message. Once the number of pending chunked messages exceeds 
the limit, the oldest UUIDs and
     // the associated ChunkedMessageCtx will be removed.
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 12b6f40..b51fd9f 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const {
     auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
     if (chunkMsgId) {
         proto::MessageIdData& firstChunkIdData = 
*idData.mutable_first_chunk_message_id();
-        auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
-        firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
-        firstChunkIdData.set_entryid(firstChunkId->entryId_);
+        const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front();
+        firstChunkIdData.set_ledgerid(firstChunkId.ledgerId());
+        firstChunkIdData.set_entryid(firstChunkId.entryId());
         if (chunkMsgId->partition_ != -1) {
-            firstChunkIdData.set_partition(firstChunkId->partition_);
+            firstChunkIdData.set_partition(firstChunkId.partition());
         }
     }
 
@@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& 
serializedMessageId) {
     MessageId msgId = MessageIdBuilder::from(idData).build();
 
     if (idData.has_first_chunk_message_id()) {
-        ChunkMessageIdImplPtr chunkMsgId = 
std::make_shared<ChunkMessageIdImpl>();
-        
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
-        chunkMsgId->setLastChunkMessageId(msgId);
+        ChunkMessageIdImplPtr chunkMsgId = 
std::make_shared<ChunkMessageIdImpl>(
+            
std::vector<MessageId>({MessageIdBuilder::from(idData.first_chunk_message_id()).build(),
 msgId}));
         return chunkMsgId->build();
     }
 
@@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return 
impl_->batchSize_; }
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
pulsar::MessageId& messageId) {
     auto chunkMsgId = 
std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
     if (chunkMsgId) {
-        auto firstId = chunkMsgId->getFirstChunkMessageId();
-        s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << 
firstId->partition_ << ','
-          << firstId->batchIndex_ << ");";
+        const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
+        s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << 
firstId.partition() << ','
+          << firstId.batchIndex() << ");";
     }
     s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ 
<< ','
       << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << 
')';
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index 06fa77f..a1319e1 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -45,6 +45,8 @@ struct SendArguments {
     SendArguments& operator=(const SendArguments&) = delete;
 };
 
+typedef std::shared_ptr<std::vector<MessageId>> ChunkMessageIdListPtr;
+
 struct OpSendMsg {
     const Result result;
     const int32_t chunkId;
@@ -54,7 +56,7 @@ struct OpSendMsg {
     const boost::posix_time::ptime timeout;
     const SendCallback sendCallback;
     std::vector<std::function<void(Result)>> trackerCallbacks;
-    ChunkMessageIdImplPtr chunkedMessageId;
+    ChunkMessageIdListPtr chunkMessageIdList;
     // Use shared_ptr here because producer might resend the message with the 
same arguments
     const std::shared_ptr<SendArguments> sendArgs;
 
@@ -89,7 +91,7 @@ struct OpSendMsg {
           sendArgs(nullptr) {}
 
     OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, 
uint64_t messagesSize,
-              int sendTimeoutMs, SendCallback&& callback, 
ChunkMessageIdImplPtr chunkedMessageId,
+              int sendTimeoutMs, SendCallback&& callback, 
ChunkMessageIdListPtr chunkMessageIdList,
               uint64_t producerId, SharedBuffer payload)
         : result(ResultOk),
           chunkId(metadata.chunk_id()),
@@ -98,7 +100,7 @@ struct OpSendMsg {
           messagesSize(messagesSize),
           timeout(TimeUtils::now() + 
boost::posix_time::milliseconds(sendTimeoutMs)),
           sendCallback(std::move(callback)),
-          chunkedMessageId(chunkedMessageId),
+          chunkMessageIdList(std::move(chunkMessageIdList)),
           sendArgs(new SendArguments(producerId, metadata.sequence_id(), 
metadata, payload)) {}
 };
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 3166c19..8bd14f2 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -572,14 +572,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const 
Message& msg, SendCallback&& c
         }
     } else {
         const bool sendChunks = (totalChunks > 1);
+        ChunkMessageIdListPtr chunkMessageIdList;
         if (sendChunks) {
             msgMetadata.set_uuid(producerName_ + "-" + 
std::to_string(sequenceId));
             msgMetadata.set_num_chunks_from_msg(totalChunks);
             msgMetadata.set_total_chunk_msg_size(compressedSize);
+            chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
         }
 
-        auto chunkMessageId = totalChunks > 1 ? 
std::make_shared<ChunkMessageIdImpl>() : nullptr;
-
         int beginIndex = 0;
         for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
             if (sendChunks) {
@@ -596,7 +596,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, SendCallback&& c
             }
 
             auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, 
conf_.getSendTimeout(),
-                                        (chunkId == totalChunks - 1) ? 
callback : nullptr, chunkMessageId,
+                                        (chunkId == totalChunks - 1) ? 
callback : nullptr, chunkMessageIdList,
                                         producerId_, encryptedPayload);
 
             if (!chunkingEnabled_) {
@@ -887,7 +887,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
         return true;
     }
 
-    const auto& op = *pendingMessagesQueue_.front();
+    auto& op = *pendingMessagesQueue_.front();
     if (op.result != ResultOk) {
         LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " 
for " << sequenceId << " and "
                                                           << rawMessageId);
@@ -911,13 +911,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
     // Message was persisted correctly
     LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
 
-    if (op.chunkedMessageId) {
+    if (op.chunkMessageIdList) {
         // Handling the chunk message id.
-        if (op.chunkId == 0) {
-            op.chunkedMessageId->setFirstChunkMessageId(messageId);
-        } else if (op.chunkId == op.numChunks - 1) {
-            op.chunkedMessageId->setLastChunkMessageId(messageId);
-            messageId = op.chunkedMessageId->build();
+        op.chunkMessageIdList->push_back(messageId);
+        if (op.chunkId == op.numChunks - 1) {
+            auto chunkedMessageId = 
std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
+            messageId = chunkedMessageId->build();
         }
     }
 
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index 6d54a69..f68dd3d 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -81,7 +81,9 @@ class MessageChunkingTest : public 
::testing::TestWithParam<CompressionType> {
     }
 
     void createConsumer(const std::string& topic, Consumer& consumer) {
-        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
+        ConsumerConfiguration conf;
+        conf.setBrokerConsumerStatsCacheTimeInMs(1000);
+        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, 
consumer));
     }
 
     void createConsumer(const std::string& topic, Consumer& consumer, 
ConsumerConfiguration& conf) {
@@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
     for (int i = 0; i < numMessages; i++) {
         MessageId messageId;
         ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
-        auto chunkMsgId =
-            
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
-        ASSERT_TRUE(chunkMsgId);
         LOG_INFO("Send " << i << " to " << messageId);
         sendMessageIds.emplace_back(messageId);
     }
@@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
         ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
         ASSERT_EQ(msg.getMessageId().batchSize(), 0);
         auto messageId = msg.getMessageId();
-        auto chunkMsgId =
-            
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
-        ASSERT_TRUE(chunkMsgId);
         receivedMessageIds.emplace_back(messageId);
+        consumer.acknowledge(messageId);
     }
     ASSERT_EQ(receivedMessageIds, sendMessageIds);
-    ASSERT_EQ(receivedMessageIds.front().ledgerId(), 
receivedMessageIds.front().ledgerId());
+    for (int i = 0; i < sendMessageIds.size(); ++i) {
+        auto sendChunkMsgId =
+            
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(sendMessageIds[i]));
+        ASSERT_TRUE(sendChunkMsgId);
+        auto receiveChunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(
+            PulsarFriend::getMessageIdImpl(receivedMessageIds[i]));
+        ASSERT_TRUE(receiveChunkMsgId);
+        ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), 
receiveChunkMsgId->getChunkedMessageIds());
+    }
     ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);
 
     // Verify the cache has been cleared
     auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
     ASSERT_EQ(chunkedMessageCache.size(), 0);
 
+    BrokerConsumerStats consumerStats;
+    waitUntil(
+        std::chrono::seconds(10),
+        [&] {
+            return consumer.getBrokerConsumerStats(consumerStats) == ResultOk 
&&
+                   consumerStats.getMsgBacklog() == 0;
+        },
+        1000);
+    ASSERT_EQ(consumerStats.getMsgBacklog(), 0);
+
     producer.close();
     consumer.close();
 }
@@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) {
 TEST(ChunkMessageIdTest, testSetChunkMessageId) {
     MessageId msgId;
     {
-        ChunkMessageIdImplPtr chunkMsgId = 
std::make_shared<ChunkMessageIdImpl>();
-        
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
-        
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
+        ChunkMessageIdImplPtr chunkMsgId = 
std::make_shared<ChunkMessageIdImpl>(
+            
std::vector<MessageId>({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
+                                    
MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}));
         msgId = chunkMsgId->build();
         // Test the destructor of the underlying message id should also work 
for the generated messageId.
     }
@@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
     ASSERT_EQ(deserializedMsgId.entryId(), 5);
     ASSERT_EQ(deserializedMsgId.partition(), 6);
 
-    auto chunkMsgId =
+    const auto& chunkMsgId =
         
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
     ASSERT_TRUE(chunkMsgId);
-    auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
-    ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
-    ASSERT_EQ(firstChunkMsgId->entryId_, 2);
-    ASSERT_EQ(firstChunkMsgId->partition_, 3);
+    auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front();
+    ASSERT_EQ(firstChunkMsgId.ledgerId(), 1);
+    ASSERT_EQ(firstChunkMsgId.entryId(), 2);
+    ASSERT_EQ(firstChunkMsgId.partition(), 3);
 }
 
 // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't 
have INSTANTIATE_TEST_SUITE_P

Reply via email to