This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4c00702e5b0a196b3867ed876152edf972e59242 Author: Jiaqi Shen <[email protected]> AuthorDate: Fri Aug 5 15:26:01 2022 +0800 [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797) ### Motivation There is a incorrect out-of-order check for chunked message in `ConsumerImpl`. For the last check should compare the result of `chunkedMsgCtx.chunkedMsgBuffer.readableBytes() + compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of `ChunkId` with `TotalChunkMsgSize`. ### Modifications Fix the out-of-order check for chunked message in `ConsumerImpl`. --- .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 0079492be07..04865d39995 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1282,12 +1282,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid()); // discard message if chunk is out-of-order if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null - || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1) - || msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) { + || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) { // means we lost the first chunk: should never happen - log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId, - (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(), - msgMetadata.getTotalChunkMsgSize()); + log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId, + (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId()); if (chunkedMsgCtx != null) { if (chunkedMsgCtx.chunkedMsgBuffer != null) { ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
