This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7650612cd86 [fix][client] Remove redundant check for chunked message 
TotalChunkMsgSize in ConsumerImpl (#16797)
7650612cd86 is described below

commit 7650612cd86ead01886300f9b31aba6946831736
Author: Jiaqi Shen <[email protected]>
AuthorDate: Fri Aug 5 15:26:01 2022 +0800

    [fix][client] Remove redundant check for chunked message TotalChunkMsgSize 
in ConsumerImpl (#16797)
    
    ### Motivation
    
    There is a incorrect out-of-order check for chunked message in 
`ConsumerImpl`. For the last check should compare the result of 
`chunkedMsgCtx.chunkedMsgBuffer.readableBytes() + 
compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of 
`ChunkId` with `TotalChunkMsgSize`.
    
    ### Modifications
    
    Fix the out-of-order check for chunked message in `ConsumerImpl`.
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c474da345c2..4cb334b38e4 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1433,12 +1433,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.get(msgMetadata.getUuid());
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
-                || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)
-                || msgMetadata.getChunkId() >= 
msgMetadata.getTotalChunkMsgSize()) {
+                || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
             // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}, total-chunks {}", msgId,
-                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
-                    msgMetadata.getTotalChunkMsgSize());
+            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}", msgId,
+                    (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
             if (chunkedMsgCtx != null) {
                 if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                     
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);

Reply via email to