This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7650612cd86 [fix][client] Remove redundant check for chunked message
TotalChunkMsgSize in ConsumerImpl (#16797)
7650612cd86 is described below
commit 7650612cd86ead01886300f9b31aba6946831736
Author: Jiaqi Shen <[email protected]>
AuthorDate: Fri Aug 5 15:26:01 2022 +0800
[fix][client] Remove redundant check for chunked message TotalChunkMsgSize
in ConsumerImpl (#16797)
### Motivation
There is a incorrect out-of-order check for chunked message in
`ConsumerImpl`. For the last check should compare the result of
`chunkedMsgCtx.chunkedMsgBuffer.readableBytes() +
compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of
`ChunkId` with `TotalChunkMsgSize`.
### Modifications
Fix the out-of-order check for chunked message in `ConsumerImpl`.
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c474da345c2..4cb334b38e4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1433,12 +1433,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
ChunkedMessageCtx chunkedMsgCtx =
chunkedMessagesMap.get(msgMetadata.getUuid());
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
- || msgMetadata.getChunkId() !=
(chunkedMsgCtx.lastChunkedMessageId + 1)
- || msgMetadata.getChunkId() >=
msgMetadata.getTotalChunkMsgSize()) {
+ || msgMetadata.getChunkId() !=
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
// means we lost the first chunk: should never happen
- log.info("Received unexpected chunk messageId {}, last-chunk-id{},
chunkId = {}, total-chunks {}", msgId,
- (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
- msgMetadata.getTotalChunkMsgSize());
+ log.info("Received unexpected chunk messageId {}, last-chunk-id{},
chunkId = {}", msgId,
+ (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
if (chunkedMsgCtx != null) {
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);