This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4c00702e5b0a196b3867ed876152edf972e59242
Author: Jiaqi Shen <[email protected]>
AuthorDate: Fri Aug 5 15:26:01 2022 +0800

    [fix][client] Remove redundant check for chunked message TotalChunkMsgSize 
in ConsumerImpl (#16797)
    
    ### Motivation
    
    There is a incorrect out-of-order check for chunked message in 
`ConsumerImpl`. For the last check should compare the result of 
`chunkedMsgCtx.chunkedMsgBuffer.readableBytes() + 
compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of 
`ChunkId` with `TotalChunkMsgSize`.
    
    ### Modifications
    
    Fix the out-of-order check for chunked message in `ConsumerImpl`.
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0079492be07..04865d39995 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1282,12 +1282,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.get(msgMetadata.getUuid());
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
-                || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)
-                || msgMetadata.getChunkId() >= 
msgMetadata.getTotalChunkMsgSize()) {
+                || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
             // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}, total-chunks {}", msgId,
-                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
-                    msgMetadata.getTotalChunkMsgSize());
+            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}", msgId,
+                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
             if (chunkedMsgCtx != null) {
                 if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                     
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);

Reply via email to