This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 33bd1164db1e1752da1690a7e5cf12e09726299b Author: Zike Yang <[email protected]> AuthorDate: Wed Aug 30 00:23:52 2023 +0800 [fix][client] Fix consumer can't consume resent chunked messages (#21070) Current, when the producer resend the chunked message like this: - M1: UUID: 0, ChunkID: 0 - M2: UUID: 0, ChunkID: 0 // Resend the first chunk - M3: UUID: 0, ChunkID: 1 When the consumer received the M2, it will find that it's already tracking the UUID:0 chunked messages, and will then discard the message M1 and M2. This will lead to unable to consume the whole chunked message even though it's already persisted in the Pulsar topic. Here is the code logic: https://github.com/apache/pulsar/blob/44a055b8a55078bcf93f4904991598541aa6c1ee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1436-L1482 The bug can be easily reproduced using the testcase `testResendChunkMessages` introduced by this PR. - When receiving the new duplicated first chunk of a chunked message, the consumer discard the current chunked message context and create a new context to track the following messages. For the case mentioned in Motivation, the M1 will be released and the consumer will assemble M2 and M3 as the chunked message. (cherry picked from commit eb2e3a258b971cfeeb22f1cec254cafb49d0ae40) --- .../pulsar/client/impl/MessageChunkingTest.java | 53 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 23 +++++++--- 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 783e971f391..d1c79322f5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -308,6 +308,18 @@ public class MessageChunkingTest extends ProducerConsumerBase { producer.close(); } + private void sendSingleChunk(Producer<String> producer, String uuid, int chunkId, int totalChunks) + throws PulsarClientException { + TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage() + .value(String.format("chunk-%s-%d|", uuid, chunkId)); + MessageMetadata msgMetadata = msg.getMetadataBuilder(); + msgMetadata.setUuid(uuid) + .setChunkId(chunkId) + .setNumChunksFromMsg(totalChunks) + .setTotalChunkMsgSize(100); + msg.send(); + } + @Test(enabled = false) public void testMaxPendingChunkMessages() throws Exception { @@ -357,6 +369,47 @@ public class MessageChunkingTest extends ProducerConsumerBase { } + @Test + public void testResendChunkMessages() throws Exception { + log.info("-- Starting {} test --", methodName); + final String topicName = "persistent://my-property/my-ns/testResendChunkMessages"; + + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName("my-subscriber-name") + .maxPendingChunkedMessage(10) + .autoAckOldestChunkedMessageOnQueueFull(true) + .subscribe(); + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .chunkMaxMessageSize(100) + .enableChunking(true) + .enableBatching(false) + .create(); + + sendSingleChunk(producer, "0", 0, 2); + + sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk + sendSingleChunk(producer, "1", 0, 3); // This is for testing the interwoven chunked message + sendSingleChunk(producer, "1", 1, 3); + sendSingleChunk(producer, "1", 0, 3); // Resending the UUID-1 chunked message + + sendSingleChunk(producer, "0", 1, 2); + + Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS); + assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|"); + consumer.acknowledge(receivedMsg); + + sendSingleChunk(producer, "1", 1, 3); + sendSingleChunk(producer, "1", 2, 3); + + receivedMsg = consumer.receive(5, TimeUnit.SECONDS); + assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|"); + consumer.acknowledge(receivedMsg); + } + /** * Validate that chunking is not supported with batching and non-persistent topic * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index cd4fa1b6548..8d484cf625a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1464,20 +1464,31 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle expireChunkMessageTaskScheduled = true; } + ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid()); + if (msgMetadata.getChunkId() == 0) { - ByteBuf chunkedMsgBuffer = PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(), - msgMetadata.getTotalChunkMsgSize()); - int totalChunks = msgMetadata.getNumChunksFromMsg(); - chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(), - (key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer)); + if (chunkedMsgCtx != null) { + // The first chunk of a new chunked-message received before receiving other chunks of previous + // chunked-message + // so, remove previous chunked-message from map and release buffer + if (chunkedMsgCtx.chunkedMsgBuffer != null) { + ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer); + } + chunkedMsgCtx.recycle(); + chunkedMessagesMap.remove(msgMetadata.getUuid()); + } pendingChunkedMessageCount++; if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > maxPendingChunkedMessage) { removeOldestPendingChunkedMessage(); } + int totalChunks = msgMetadata.getNumChunksFromMsg(); + ByteBuf chunkedMsgBuffer = PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(), + msgMetadata.getTotalChunkMsgSize()); + chunkedMsgCtx = chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(), + (key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer)); pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid()); } - ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid()); // discard message if chunk is out-of-order if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
