This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea4cc4f9b49d92d6433cee784f722b055da87493
Author: Zike Yang <[email protected]>
AuthorDate: Wed Aug 30 00:23:52 2023 +0800

    [fix][client] Fix consumer can't consume resent chunked messages (#21070)
    
    Current, when the producer resend the chunked message like this:
    - M1: UUID: 0, ChunkID: 0
    - M2: UUID: 0, ChunkID: 0 // Resend the first chunk
    - M3: UUID: 0, ChunkID: 1
    
    When the consumer received the M2, it will find that it's already tracking 
the UUID:0 chunked messages, and will then discard the message M1 and M2. This 
will lead to unable to consume the whole chunked message even though it's 
already persisted in the Pulsar topic.
    
    Here is the code logic:
    
https://github.com/apache/pulsar/blob/44a055b8a55078bcf93f4904991598541aa6c1ee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1436-L1482
    
    The bug can be easily reproduced using the testcase 
`testResendChunkMessages` introduced by this PR.
    
    - When receiving the new duplicated first chunk of a chunked message, the 
consumer discard the current chunked message context and create a new context 
to track the following messages. For the case mentioned in Motivation, the M1 
will be released and the consumer will assemble M2 and M3 as the chunked 
message.
    
    (cherry picked from commit eb2e3a258b971cfeeb22f1cec254cafb49d0ae40)
---
 .../impl/MessageChunkingDeduplicationTest.java     |  3 +-
 .../pulsar/client/impl/MessageChunkingTest.java    | 54 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 24 ++++++++--
 3 files changed, 76 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
index aa8c4d86c74..69296388475 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
@@ -77,7 +78,7 @@ public class MessageChunkingDeduplicationTest extends 
ProducerConsumerBase {
                 .enableBatching(false)
                 .create();
         int messageSize = 6000; // payload size in KB
-        String message = "a".repeat(messageSize * 1000);
+        String message = String.join("", Collections.nCopies(messageSize * 
1000, "a"));
         producer.newMessage().value(message).sequenceId(10).send();
         Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
         assertNotNull(msg);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index c223208a8ef..95bdc9a24ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -332,6 +333,46 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
     }
 
+    @Test
+    public void testResendChunkMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        final String topicName = 
"persistent://my-property/my-ns/testResendChunkMessages";
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName("my-subscriber-name")
+                .maxPendingChunkedMessage(10)
+                .autoAckOldestChunkedMessageOnQueueFull(true)
+                .subscribe();
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+
+        sendSingleChunk(producer, "0", 0, 2);
+
+        sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk
+        sendSingleChunk(producer, "1", 0, 3); // This is for testing the 
interwoven chunked message
+        sendSingleChunk(producer, "1", 1, 3);
+        sendSingleChunk(producer, "1", 0, 3); // Resending the UUID-1 chunked 
message
+
+        sendSingleChunk(producer, "0", 1, 2);
+
+        Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
+        assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|");
+        consumer.acknowledge(receivedMsg);
+
+        sendSingleChunk(producer, "1", 1, 3);
+        sendSingleChunk(producer, "1", 2, 3);
+
+        receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
+        assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|");
+        consumer.acknowledge(receivedMsg);
+    }
+
     /**
      * Validate that chunking is not supported with batching and 
non-persistent topic
      *
@@ -514,4 +555,17 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
         return str.toString();
     }
 
+    private void sendSingleChunk(Producer<String> producer, String uuid, int 
chunkId, int totalChunks)
+            throws PulsarClientException {
+        TypedMessageBuilderImpl<String> msg = 
(TypedMessageBuilderImpl<String>) producer.newMessage()
+                .value(String.format("chunk-%s-%d|", uuid, chunkId));
+        MessageMetadata msgMetadata = msg.getMetadataBuilder();
+        msgMetadata.setUuid(uuid)
+                .setChunkId(chunkId)
+                .setNumChunksFromMsg(totalChunks)
+                .setTotalChunkMsgSize(100);
+        msg.send();
+    }
+
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 5c6c7c31351..e91408e81b5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1430,11 +1430,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             expireChunkMessageTaskScheduled = true;
         }
 
+        ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.get(msgMetadata.getUuid());
+
         if (msgMetadata.getChunkId() == 0) {
+            if (chunkedMsgCtx != null) {
+                // The first chunk of a new chunked-message received before 
receiving other chunks of previous
+                // chunked-message
+                // so, remove previous chunked-message from map and release 
buffer
+                if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+                    
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+                }
+                chunkedMsgCtx.recycle();
+                chunkedMessagesMap.remove(msgMetadata.getUuid());
+            }
+            pendingChunkedMessageCount++;
+            if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > 
maxPendingChunkedMessage) {
+                removeOldestPendingChunkedMessage();
+            }
+            int totalChunks = msgMetadata.getNumChunksFromMsg();
             ByteBuf chunkedMsgBuffer = 
PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
                     msgMetadata.getTotalChunkMsgSize());
-            int totalChunks = msgMetadata.getNumChunksFromMsg();
-            chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
+            chunkedMsgCtx = 
chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
                     (key) -> ChunkedMessageCtx.get(totalChunks, 
chunkedMsgBuffer));
             pendingChunkedMessageCount++;
             if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > 
maxPendingChunkedMessage) {
@@ -1443,7 +1459,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
         }
 
-        ChunkedMessageCtx chunkedMsgCtx = 
chunkedMessagesMap.get(msgMetadata.getUuid());
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
                 || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
@@ -1472,7 +1487,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             }
             // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, 
chunkId = {}", msgId,
+            log.info("[{}] [{}] Received unexpected chunk messageId {}, 
last-chunk-id = {}, chunkId = {}", topic,
+                    subscription, msgId,
                     (chunkedMsgCtx != null ? 
chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
             if (chunkedMsgCtx != null) {
                 if (chunkedMsgCtx.chunkedMsgBuffer != null) {

Reply via email to