RobertIndie commented on a change in pull request #12403:
URL: https://github.com/apache/pulsar/pull/12403#discussion_r759004547



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1206,6 +1206,24 @@ void messageReceived(MessageIdData messageId, int 
redeliveryCount, List<Long> ac
                 if (uncompressedPayload == null) {
                     return;
                 }
+
+                // last chunk received: so, stitch chunked-messages and clear 
up chunkedMsgBuffer
+                if (log.isDebugEnabled()) {
+                    log.debug("Chunked message completed chunkId {}, 
total-chunks {}, msgId {} sequenceId {}",
+                            msgMetadata.getChunkId(), 
msgMetadata.getNumChunksFromMsg(), msgId,
+                            msgMetadata.getSequenceId());
+                }
+
+                // remove buffer from the map, set the chunk message id
+                ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.remove(msgMetadata.getUuid());
+                if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
+                    msgId = new 
ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
+                            
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);

Review comment:
       @rdhabalia Here is where the consumer sets the first chunk message id 
and the last chunk message-id. It does not depend on the first chunk message-id 
set by the producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to