BewareMyPower commented on a change in pull request #14604:
URL: https://github.com/apache/pulsar/pull/14604#discussion_r833585927



##########
File path: pulsar-client-cpp/lib/ProducerImpl.h
##########
@@ -32,10 +32,10 @@
 #include "stats/ProducerStatsImpl.h"
 #include "PulsarApi.pb.h"
 #include "OpSendMsg.h"
-#include "BatchMessageContainerBase.h"
 #include "PendingFailures.h"
 #include "Semaphore.h"
-
+#include "ChunkMessageIdImpl.h"
+#include "BatchMessageContainerBase.h"

Review comment:
       Why do you add these two lines while other code of this header doesn't 
change?

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -79,7 +83,12 @@ MessageId MessageId::deserialize(const std::string& 
serializedMessageId) {
     if (!idData.ParseFromString(serializedMessageId)) {
         throw std::invalid_argument("Failed to parse serialized message id");
     }
-
+    if (idData.has_first_chunk_message_id()) {
+        auto firData = idData.first_chunk_message_id();
+        return MessageId(
+            MessageId(firData.partition(), firData.ledgerid(), 
firData.entryid(), firData.batch_index()),
+            MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), 
idData.batch_index()));
+    }
     return MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), 
idData.batch_index());

Review comment:
       We have `toMessageId` helper method in `MessageIdUtil.h`, I think we can 
simplify the code to
   
   ```c++
       if (idData.has_first_chunk_message_id()) {
           return {toMessageId(idData.first_chunk_message_id()), 
toMessageId(idData)};
       }
       return toMessageId(idData);
   ```

##########
File path: pulsar-client-cpp/lib/MessageId.cc
##########
@@ -92,8 +101,19 @@ int32_t MessageId::batchIndex() const { return 
impl_->batchIndex_; }
 int32_t MessageId::partition() const { return impl_->partition_; }
 
 PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
pulsar::MessageId& messageId) {
-    s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ 
<< ','
-      << messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << 
')';
+    std::function<void(std::ostream&, const MessageIdImpl&)> printMsgIdImpl = 
[](std::ostream& s,

Review comment:
       Use `auto` instead of the long `function` definition. And we can add the 
`s` in the lambda capture list.
   
   ```c++
       auto printMsgIdImpl = [&s](const MessageIdImpl& impl) {
           s << '(' << impl.ledgerId_ << ',' << impl.entryId_ << ',' << 
impl.partition_ << ','
             << impl.batchIndex_ << ')';
       };
   ```

##########
File path: pulsar-client-cpp/lib/ProducerImpl.cc
##########
@@ -856,7 +856,18 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
         lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
 
         pendingMessagesQueue_.pop_front();
-
+        int totalChunks = op.metadata_.num_chunks_from_msg();
+        if (totalChunks > 1) {
+            if (op.metadata_.chunk_id() == 0) {
+                op.chunkedMessageId_ = MessageId(partition_, 
rawMessageId.ledgerId(), rawMessageId.entryId(),
+                                                 rawMessageId.batchIndex());
+            } else if (op.metadata_.chunk_id() == totalChunks - 1) {
+                MessageId lastMessageId(partition_, rawMessageId.ledgerId(), 
rawMessageId.entryId(),
+                                        rawMessageId.batchIndex());
+                op.chunkedMessageId_ = MessageId(op.chunkedMessageId_, 
lastMessageId);

Review comment:
       We have already created a message ID with the correct partition index, 
see the first few lines in `ackReceived`.
   
   ```c++
       MessageId messageId(partition_, rawMessageId.ledgerId(), 
rawMessageId.entryId(),
                           rawMessageId.batchIndex());
   ```
   
   Could we simplify the code to
   
   ```c++
               if (op.metadata_.chunk_id() == 0) {
                   op.chunkedMessageId_ = messageId;
               } else if (op.metadata_.chunk_id() == totalChunks - 1) {
                   op.chunkedMessageId_ = MessageId(op.chunkedMessageId_, 
messageId);
                   messageId = op.chunkedMessageId_;
               }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to