This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6c27d6887aeecc033aac88b00ad10aed5bd507d0 Author: Xiangying Meng <[email protected]> AuthorDate: Mon Sep 4 08:50:49 2023 +0800 [fix][client] Avoid ack hole for chunk message (#21101) ## Motivation Handle ack hole case: For example: ```markdown Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3 Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 ``` Consumer ack chunk message via ChunkMessageIdImpl that consists of all the chunks in this chunk message(Chunk-3, Chunk-4, Chunk-5). The Chunk-1 and Chunk-2 are not included in the ChunkMessageIdImpl, so we should process it here. ## Modification Ack chunk-1 and chunk-2. (cherry picked from commit 59a8e724692999094b04747727f78693e3ff8eaf) --- .../pulsar/client/impl/MessageChunkingTest.java | 33 +++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 49 ++++++++++++++++++---- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index d1c79322f5d..61f9473e3c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -369,6 +369,38 @@ public class MessageChunkingTest extends ProducerConsumerBase { } + @Test + public void testResendChunkMessagesWithoutAckHole() throws Exception { + log.info("-- Starting {} test --", methodName); + final String topicName = "persistent://my-property/my-ns/testResendChunkMessagesWithoutAckHole"; + final String subName = "my-subscriber-name"; + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .maxPendingChunkedMessage(10) + .autoAckOldestChunkedMessageOnQueueFull(true) + .subscribe(); + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .chunkMaxMessageSize(100) + .enableChunking(true) + .enableBatching(false) + .create(); + + sendSingleChunk(producer, "0", 0, 2); + + sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk + sendSingleChunk(producer, "0", 1, 2); + + Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS); + assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|"); + consumer.acknowledge(receivedMsg); + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName) + .getNonContiguousDeletedMessagesRanges(), 0); + } + @Test public void testResendChunkMessages() throws Exception { log.info("-- Starting {} test --", methodName); @@ -408,6 +440,7 @@ public class MessageChunkingTest extends ProducerConsumerBase { receivedMsg = consumer.receive(5, TimeUnit.SECONDS); assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|"); consumer.acknowledge(receivedMsg); + Assert.assertEquals(((ConsumerImpl<String>) consumer).getAvailablePermits(), 8); } /** diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8d484cf625a..a5e32854434 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1452,7 +1452,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId, MessageIdData messageId, ClientCnx cnx) { - + if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) { + increaseAvailablePermits(cnx); + } // Lazy task scheduling to expire incomplete chunk message if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) { ((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate( @@ -1468,6 +1470,37 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle if (msgMetadata.getChunkId() == 0) { if (chunkedMsgCtx != null) { + // Handle ack hole case when receive duplicated chunks. + // There are two situation that receives chunks with the same sequence ID and chunk ID. + // Situation 1 - Message redeliver: + // For example: + // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:3 + // In this case, chunk-3 and chunk-4 have the same msgID with chunk-1 and chunk-2. + // This may be caused by message redeliver, we can't ack any chunk in this case here. + // Situation 2 - Corrupted chunk message + // For example: + // Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1 + // Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2 + // Chunk-3 sequence ID: 0, chunk ID: 0, msgID: 1:3 + // Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4 + // Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5 + // In this case, all the chunks with different msgIDs and are persistent in the topic. + // But Chunk-1 and Chunk-2 belong to a corrupted chunk message that must be skipped since + // they will not be delivered to end users. So we should ack them here to avoid ack hole. + boolean isCorruptedChunkMessageDetected = Arrays.stream(chunkedMsgCtx.chunkedMessageIds) + .noneMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId() + && messageId1.entryId == messageId.getEntryId()); + if (isCorruptedChunkMessageDetected) { + Arrays.stream(chunkedMsgCtx.chunkedMessageIds).forEach(messageId1 -> { + if (messageId1 != null) { + doAcknowledge(messageId1, AckType.Individual, Collections.emptyMap(), null); + } + }); + } // The first chunk of a new chunked-message received before receiving other chunks of previous // chunked-message // so, remove previous chunked-message from map and release buffer @@ -1507,17 +1540,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle msgMetadata.getProducerName(), msgId, chunkedMsgCtx.lastChunkedMessageId, msgMetadata.getChunkId(), msgMetadata.getSequenceId()); compressedPayload.release(); - increaseAvailablePermits(cnx); - boolean repeatedlyReceived = Arrays.stream(chunkedMsgCtx.chunkedMessageIds) - .anyMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId() + // Just like the above logic of receiving the first chunk again. We only ack this chunk in the message + // duplication case. + boolean isDuplicatedChunk = Arrays.stream(chunkedMsgCtx.chunkedMessageIds) + .noneMatch(messageId1 -> messageId1 != null && messageId1.ledgerId == messageId.getLedgerId() && messageId1.entryId == messageId.getEntryId()); - if (!repeatedlyReceived) { + if (isDuplicatedChunk) { doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null); } return null; } // means we lost the first chunk: should never happen - log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId, + log.info("[{}] [{}] Received unexpected chunk messageId {}, last-chunk-id = {}, chunkId = {}", topic, + subscription, msgId, (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId()); if (chunkedMsgCtx != null) { if (chunkedMsgCtx.chunkedMsgBuffer != null) { @@ -1527,7 +1562,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } chunkedMessagesMap.remove(msgMetadata.getUuid()); compressedPayload.release(); - increaseAvailablePermits(cnx); if (expireTimeOfIncompleteChunkedMessageMillis > 0 && System.currentTimeMillis() > (msgMetadata.getPublishTime() + expireTimeOfIncompleteChunkedMessageMillis)) { @@ -1546,7 +1580,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // if final chunk is not received yet then release payload and return if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) { compressedPayload.release(); - increaseAvailablePermits(cnx); return null; }
