codelipenghui commented on code in PR #21101:
URL: https://github.com/apache/pulsar/pull/21101#discussion_r1312917750
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1437,6 +1437,39 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
if (msgMetadata.getChunkId() == 0) {
if (chunkedMsgCtx != null) {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Message duplication:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks have the different msgID.
Chunk-1, Chunk-2, Chunk-3, Chunk-4 are
+ // duplicated persisting in the topic.
+ // Consumer ack chunk message via ChunkMessageIdImpl that
consists of all the chunks in this chunk
+ // message(Chunk-3, Chunk-4, Chunk-5). The Chunk-1 and Chunk-2
would not be included in the
+ // ChunkMessageIdImpl, so we should ack them here to avoid ack
hole.
+ boolean messageDuplication =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
Review Comment:
```suggestion
boolean isCorruptedChunkMessageDetected =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java:
##########
@@ -395,6 +427,7 @@ public void testResendChunkMessages() throws Exception {
receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|");
consumer.acknowledge(receivedMsg);
+ Assert.assertEquals(((ConsumerImpl<String>)
consumer).getAvailablePermits(), 10);
Review Comment:
Why should the available permits be 10?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1437,6 +1437,39 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
if (msgMetadata.getChunkId() == 0) {
if (chunkedMsgCtx != null) {
+ // Handle ack hole case when receive duplicated chunks.
+ // There are two situation that receives chunks with the same
sequence ID and chunk ID.
+ // Situation 1 - Message redeliver:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3
+ // In this case, chunk-3 and chunk-4 have the same msgID with
chunk-1 and chunk-2.
+ // This may be caused by message redeliver, we can't ack any
chunk in this case here.
+ // Situation 2 - Message duplication:
+ // For example:
+ // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+ // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+ // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
+ // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+ // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+ // In this case, all the chunks have the different msgID.
Chunk-1, Chunk-2, Chunk-3, Chunk-4 are
+ // duplicated persisting in the topic.
+ // Consumer ack chunk message via ChunkMessageIdImpl that
consists of all the chunks in this chunk
+ // message(Chunk-3, Chunk-4, Chunk-5). The Chunk-1 and Chunk-2
would not be included in the
+ // ChunkMessageIdImpl, so we should ack them here to avoid ack
hole.
Review Comment:
```suggestion
// Situation 2 - Corrupted chunk message
// For example:
// Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
// Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
// Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3
// Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
// Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
// In this case, all the chunks with different msgIDs and
are persistent in the topic.
// But Chunk-1 and Chunk-2 belong to a corrupted chunk
message that must be skipped since
// they will not be delivered to end users. So we should ack
them here to avoid ack hole.
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1477,10 +1511,12 @@ private ByteBuf processMessageChunk(ByteBuf
compressedPayload, MessageMetadata m
msgMetadata.getChunkId(), msgMetadata.getSequenceId());
compressedPayload.release();
increaseAvailablePermits(cnx);
- boolean repeatedlyReceived =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
- .anyMatch(messageId1 -> messageId1 != null &&
messageId1.ledgerId == messageId.getLedgerId()
+ // Just like the above logic of receiving the first chunk
again. We only ack this chunk in the message
+ // duplication case.
+ boolean messageDuplication =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
Review Comment:
```suggestion
boolean isDuplicatedChunk =
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]