heesung-sn commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1303335642


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1449,21 +1449,28 @@ private ByteBuf processMessageChunk(ByteBuf 
compressedPayload, MessageMetadata m
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
                 || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
-            // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}", msgId,
-                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
-            if (chunkedMsgCtx != null) {
-                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
-                    
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+            // Filter duplicated chunks instead of discard it.
+            if (chunkedMsgCtx == null || msgMetadata.getChunkId() <= 
chunkedMsgCtx.lastChunkedMessageId) {
+                log.warn("[{}] Receive a repeated chunk messageId {}, 
last-chunk-id{}, chunkId = {}",
+                        msgMetadata.getProducerName(), chunkedMsgCtx == null ? 
null
+                                : chunkedMsgCtx.lastChunkedMessageId, msgId, 
msgMetadata.getChunkId());
+            } else {
+                // means we lost the first chunk: should never happen
+                log.info("Received unexpected chunk messageId {}, 
last-chunk-id{}, chunkId = {}", msgId,
+                        (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
+                if (chunkedMsgCtx != null) {
+                    if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                        
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                    }
+                    chunkedMsgCtx.recycle();
                 }
-                chunkedMsgCtx.recycle();
+                chunkedMessagesMap.remove(msgMetadata.getUuid());
             }
-            chunkedMessagesMap.remove(msgMetadata.getUuid());
             compressedPayload.release();
             increaseAvailablePermits(cnx);
             if (expireTimeOfIncompleteChunkedMessageMillis > 0
                     && System.currentTimeMillis() > 
(msgMetadata.getPublishTime()
-                            + expireTimeOfIncompleteChunkedMessageMillis)) {
+                    + expireTimeOfIncompleteChunkedMessageMillis)) {
                 doAcknowledge(msgId, AckType.Individual, 
Collections.emptyMap(), null);
             } else {
                 trackMessage(msgId);

Review Comment:
   I believe we don't need to track the duplicated chunk msgId for redelivery 
as this logic is supposed to `ignore` it.
   
   Now, I think we need to discuss this consumer behavior change in the PIP 
too. If the broker correctly dedups chunked messages, this consumer change is 
not required. 



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1449,21 +1449,28 @@ private ByteBuf processMessageChunk(ByteBuf 
compressedPayload, MessageMetadata m
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
                 || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
-            // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}", msgId,
-                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
-            if (chunkedMsgCtx != null) {
-                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
-                    
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+            // Filter duplicated chunks instead of discard it.
+            if (chunkedMsgCtx == null || msgMetadata.getChunkId() <= 
chunkedMsgCtx.lastChunkedMessageId) {
+                log.warn("[{}] Receive a repeated chunk messageId {}, 
last-chunk-id{}, chunkId = {}",
+                        msgMetadata.getProducerName(), chunkedMsgCtx == null ? 
null
+                                : chunkedMsgCtx.lastChunkedMessageId, msgId, 
msgMetadata.getChunkId());
+            } else {
+                // means we lost the first chunk: should never happen

Review Comment:
   We need to release these resources: buffer and chunkedMessagesMap for this 
case
   `msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId` too, when 
expired, `expireTimeOfIncompleteChunkedMessageMillis > 0
                       && System.currentTimeMillis() > 
(msgMetadata.getPublishTime().`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to