RobertIndie commented on a change in pull request #12403:
URL: https://github.com/apache/pulsar/pull/12403#discussion_r759004547
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1206,6 +1206,24 @@ void messageReceived(MessageIdData messageId, int
redeliveryCount, List<Long> ac
if (uncompressedPayload == null) {
return;
}
+
+ // last chunk received: so, stitch chunked-messages and clear
up chunkedMsgBuffer
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed chunkId {},
total-chunks {}, msgId {} sequenceId {}",
+ msgMetadata.getChunkId(),
msgMetadata.getNumChunksFromMsg(), msgId,
+ msgMetadata.getSequenceId());
+ }
+
+ // remove buffer from the map, set the chunk message id
+ ChunkedMessageCtx chunkedMsgCtx =
chunkedMessagesMap.remove(msgMetadata.getUuid());
+ if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
+ msgId = new
ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
+
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
Review comment:
@rdhabalia Here is where the consumer sets the first chunk message id
and the last chunk message-id. It does not depend on the first chunk message-id
set by the producer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]