codelipenghui commented on a change in pull request #12403:
URL: https://github.com/apache/pulsar/pull/12403#discussion_r759774021
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1206,6 +1206,24 @@ void messageReceived(MessageIdData messageId, int
redeliveryCount, List<Long> ac
if (uncompressedPayload == null) {
return;
}
+
+ // last chunk received: so, stitch chunked-messages and clear
up chunkedMsgBuffer
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed chunkId {},
total-chunks {}, msgId {} sequenceId {}",
+ msgMetadata.getChunkId(),
msgMetadata.getNumChunksFromMsg(), msgId,
+ msgMetadata.getSequenceId());
+ }
+
+ // remove buffer from the map, set the chunk message id
+ ChunkedMessageCtx chunkedMsgCtx =
chunkedMessagesMap.remove(msgMetadata.getUuid());
+ if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
+ msgId = new
ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
+
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
Review comment:
@rdhabalia The producer will not set the message ID of the chunk, the
producer side change just return a chunk message ID(start, end) to users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]