poorbarcode commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1303758631


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1449,6 +1449,15 @@ private ByteBuf processMessageChunk(ByteBuf 
compressedPayload, MessageMetadata m
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
                 || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
+            // Filter duplicated chunks instead of discard it.
+            if (chunkedMsgCtx == null || msgMetadata.getChunkId() <= 
chunkedMsgCtx.lastChunkedMessageId) {

Review Comment:
   We should take note of [this comment in the mailing 
list](https://lists.apache.org/thread/h0dz0mzt7rrh3hz01fbblldls57618o9)
   
   > 1. SequenceID: 0, ChunkID: 0
   > 2. SequenceID: 0, ChunkID: 1
   > 3. SequenceID: 0, ChunkID: 0 -> This message will be dropped
   > 4. SequenceID: 0, ChunkID: 1 -> Will also be dropped
   > 5. SequenceID: 0, ChunkID: 2 -> The last chunk of the message
   
   > For the existing behavior, the consumer assembles messages 3,4,5 into
   the original large message. But the changes brought about by this PIP
   will cause the consumer to use messages 1,2,5 for assembly. There is
   no guarantee that the producer will split the message in the same way
   twice before and after. For example, the producer's maxMessageSize may
   be different. This may cause the consumer to receive a corrupt
   message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to