heesung-sn commented on code in PR #20948:
URL: https://github.com/apache/pulsar/pull/20948#discussion_r1303335642
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1449,21 +1449,28 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() !=
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
- // means we lost the first chunk: should never happen
- log.info("Received unexpected chunk messageId {}, last-chunk-id{},
chunkId = {}", msgId,
- (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
- if (chunkedMsgCtx != null) {
- if (chunkedMsgCtx.chunkedMsgBuffer != null) {
-
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ // Filter duplicated chunks instead of discard it.
+ if (chunkedMsgCtx == null || msgMetadata.getChunkId() <=
chunkedMsgCtx.lastChunkedMessageId) {
+ log.warn("[{}] Receive a repeated chunk messageId {},
last-chunk-id{}, chunkId = {}",
+ msgMetadata.getProducerName(), chunkedMsgCtx == null ?
null
+ : chunkedMsgCtx.lastChunkedMessageId, msgId,
msgMetadata.getChunkId());
+ } else {
+ // means we lost the first chunk: should never happen
+ log.info("Received unexpected chunk messageId {},
last-chunk-id{}, chunkId = {}", msgId,
+ (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
+ if (chunkedMsgCtx != null) {
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ }
+ chunkedMsgCtx.recycle();
}
- chunkedMsgCtx.recycle();
+ chunkedMessagesMap.remove(msgMetadata.getUuid());
}
- chunkedMessagesMap.remove(msgMetadata.getUuid());
compressedPayload.release();
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() >
(msgMetadata.getPublishTime()
- + expireTimeOfIncompleteChunkedMessageMillis)) {
+ + expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual,
Collections.emptyMap(), null);
} else {
trackMessage(msgId);
Review Comment:
I believe we don't need to track the duplicated chunk msgId for redelivery
as this logic is supposed to `ignore` it.
Now, I think we need to discuss this consumer behavior change in the PIP
too. If the broker correctly dedups chunked messages, this consumer change is
not required.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1449,21 +1449,28 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() !=
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
- // means we lost the first chunk: should never happen
- log.info("Received unexpected chunk messageId {}, last-chunk-id{},
chunkId = {}", msgId,
- (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
- if (chunkedMsgCtx != null) {
- if (chunkedMsgCtx.chunkedMsgBuffer != null) {
-
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ // Filter duplicated chunks instead of discard it.
+ if (chunkedMsgCtx == null || msgMetadata.getChunkId() <=
chunkedMsgCtx.lastChunkedMessageId) {
+ log.warn("[{}] Receive a repeated chunk messageId {},
last-chunk-id{}, chunkId = {}",
+ msgMetadata.getProducerName(), chunkedMsgCtx == null ?
null
+ : chunkedMsgCtx.lastChunkedMessageId, msgId,
msgMetadata.getChunkId());
+ } else {
+ // means we lost the first chunk: should never happen
Review Comment:
We need to release these resources: buffer and chunkedMessagesMap for this
case
`msgMetadata.getChunkId() <= chunkedMsgCtx.lastChunkedMessageId` too, when
expired, `expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() >
(msgMetadata.getPublishTime().`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]