heesung-sn commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1303335642
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1449,21 +1449,28 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() !=
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
- // means we lost the first chunk: should never happen
- log.info("Received unexpected chunk messageId {}, last-chunk-id{},
chunkId = {}", msgId,
- (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
- if (chunkedMsgCtx != null) {
- if (chunkedMsgCtx.chunkedMsgBuffer != null) {
-
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ // Filter duplicated chunks instead of discard it.
+ if (chunkedMsgCtx == null || msgMetadata.getChunkId() <=
chunkedMsgCtx.lastChunkedMessageId) {
+ log.warn("[{}] Receive a repeated chunk messageId {},
last-chunk-id{}, chunkId = {}",
+ msgMetadata.getProducerName(), chunkedMsgCtx == null ?
null
+ : chunkedMsgCtx.lastChunkedMessageId, msgId,
msgMetadata.getChunkId());
+ } else {
+ // means we lost the first chunk: should never happen
+ log.info("Received unexpected chunk messageId {},
last-chunk-id{}, chunkId = {}", msgId,
+ (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
+ if (chunkedMsgCtx != null) {
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ }
+ chunkedMsgCtx.recycle();
}
- chunkedMsgCtx.recycle();
+ chunkedMessagesMap.remove(msgMetadata.getUuid());
}
- chunkedMessagesMap.remove(msgMetadata.getUuid());
compressedPayload.release();
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() >
(msgMetadata.getPublishTime()
- + expireTimeOfIncompleteChunkedMessageMillis)) {
+ + expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual,
Collections.emptyMap(), null);
} else {
trackMessage(msgId);
Review Comment:
I believe we don't need to track the duplicated chunk msgId for redelivery
as this logic is supposed to `ignore` it.
I am fine with this PR change, but I think it would also be worth discussing
this consumer behavior change in the PIP. If the broker correctly dedups
chunked messages, this consumer change is not required.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]