This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e4a71ef88ca [fix][broker]Fix chunked messages will be filtered by 
duplicating (#20948)
e4a71ef88ca is described below

commit e4a71ef88ca2c07a8234192b42d8c4aba270af33
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Aug 31 09:58:46 2023 +0800

    [fix][broker]Fix chunked messages will be filtered by duplicating (#20948)
    
    ## Motivation
    Make the chunk message function work properly when deduplication is enabled.
    ## Modification
    ### Only check and store the sequence ID of the last chunk in a chunk 
message.
     For example:
     ```markdown
         Chunk-1 sequence ID: 0, chunk ID: 0, total chunk: 2
         Chunk-2 sequence ID: 0, chunk ID: 1
         Chunk-3 sequence ID: 1, chunk ID: 0 total chunk: 3
         Chunk-4 sequence ID: 1, chunk ID: 1
         Chunk-5 sequence ID: 1, chunk ID: 1
         Chunk-6 sequence ID: 1, chunk ID: 2
    ```
    Only store check and store the sequence ID of Chunk-2 and Chunk-6.
    **Add a property in the publishContext to determine whether this chunk is 
the last chunk when persistent completely.**
    ```java
    publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE);
    ```
    ### Filter and ack duplicated chunks in a chunk message instead of 
discarding ctx.
     For example:
     ```markdown
         Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
         Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
         Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
         Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
         Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
         Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
    ```
    We should filter and ack chunk-4 and chunk-5.
---
 .../service/persistent/MessageDeduplication.java   |  34 +++++-
 .../impl/MessageChunkingDeduplicationTest.java     | 114 +++++++++++++++++++++
 .../client/impl/MessageChunkingSharedTest.java     |  40 ++++++--
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  25 +++++
 4 files changed, 198 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 490be4a8876..e75e22ff58c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -55,6 +55,8 @@ public class MessageDeduplication {
     private final ManagedLedger managedLedger;
     private ManagedCursor managedCursor;
 
+    private static final String IS_LAST_CHUNK = "isLastChunk";
+
     enum Status {
 
         // Deduplication is initialized
@@ -324,11 +326,12 @@ public class MessageDeduplication {
         String producerName = publishContext.getProducerName();
         long sequenceId = publishContext.getSequenceId();
         long highestSequenceId = 
Math.max(publishContext.getHighestSequenceId(), sequenceId);
+        MessageMetadata md = null;
         if (producerName.startsWith(replicatorPrefix)) {
             // Message is coming from replication, we need to use the original 
producer name and sequence id
             // for the purpose of deduplication and not rely on the 
"replicator" name.
             int readerIndex = headersAndPayload.readerIndex();
-            MessageMetadata md = 
Commands.parseMessageMetadata(headersAndPayload);
+            md = Commands.parseMessageMetadata(headersAndPayload);
             producerName = md.getProducerName();
             sequenceId = md.getSequenceId();
             highestSequenceId = Math.max(md.getHighestSequenceId(), 
sequenceId);
@@ -337,7 +340,23 @@ public class MessageDeduplication {
             publishContext.setOriginalHighestSequenceId(highestSequenceId);
             headersAndPayload.readerIndex(readerIndex);
         }
-
+        long chunkID = -1;
+        long totalChunk = -1;
+        if (publishContext.isChunked()) {
+            if (md == null) {
+                int readerIndex = headersAndPayload.readerIndex();
+                md = Commands.parseMessageMetadata(headersAndPayload);
+                headersAndPayload.readerIndex(readerIndex);
+            }
+            chunkID = md.getChunkId();
+            totalChunk = md.getNumChunksFromMsg();
+        }
+        // All chunks of a message use the same message metadata and sequence 
ID,
+        // so we only need to check the sequence ID for the last chunk in a 
chunk message.
+        if (chunkID != -1 && chunkID != totalChunk - 1) {
+            publishContext.setProperty(IS_LAST_CHUNK, Boolean.FALSE);
+            return MessageDupStatus.NotDup;
+        }
         // Synchronize the get() and subsequent put() on the map. This would 
only be relevant if the producer
         // disconnects and re-connects very quickly. At that point the call 
can be coming from a different thread
         synchronized (highestSequencedPushed) {
@@ -363,6 +382,11 @@ public class MessageDeduplication {
             }
             highestSequencedPushed.put(producerName, highestSequenceId);
         }
+        // Only put sequence ID into highestSequencedPushed and
+        // highestSequencedPersisted until receive and persistent the last 
chunk.
+        if (chunkID != -1 && chunkID == totalChunk - 1) {
+            publishContext.setProperty(IS_LAST_CHUNK, Boolean.TRUE);
+        }
         return MessageDupStatus.NotDup;
     }
 
@@ -383,8 +407,10 @@ public class MessageDeduplication {
             sequenceId = publishContext.getOriginalSequenceId();
             highestSequenceId = publishContext.getOriginalHighestSequenceId();
         }
-
-        highestSequencedPersisted.put(producerName, 
Math.max(highestSequenceId, sequenceId));
+        Boolean isLastChunk = (Boolean) 
publishContext.getProperty(IS_LAST_CHUNK);
+        if (isLastChunk == null || isLastChunk) {
+            highestSequencedPersisted.put(producerName, 
Math.max(highestSequenceId, sequenceId));
+        }
         if (++snapshotCounter >= snapshotInterval) {
             snapshotCounter = 0;
             takeSnapshot(position);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
new file mode 100644
index 00000000000..5e590414132
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingDeduplicationTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static 
org.apache.pulsar.client.impl.MessageChunkingSharedTest.sendChunk;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class MessageChunkingDeduplicationTest extends ProducerConsumerBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        this.conf.setBrokerDeduplicationEnabled(true);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSendChunkMessageWithSameSequenceID() throws Exception {
+        String topicName = 
"persistent://my-property/my-ns/testSendChunkMessageWithSameSequenceID";
+        String producerName = "test-producer";
+        @Cleanup
+        Consumer<String> consumer = pulsarClient
+                .newConsumer(Schema.STRING)
+                .subscriptionName("test-sub")
+                .topic(topicName)
+                .subscribe();
+        @Cleanup
+        Producer<String> producer = pulsarClient
+                .newProducer(Schema.STRING)
+                .producerName(producerName)
+                .topic(topicName)
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+        int messageSize = 6000; // payload size in KB
+        String message = "a".repeat(messageSize * 1000);
+        producer.newMessage().value(message).sequenceId(10).send();
+        Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+        assertNotNull(msg);
+        assertTrue(msg.getMessageId() instanceof ChunkMessageIdImpl);
+        assertEquals(msg.getValue(), message);
+        producer.newMessage().value(message).sequenceId(10).send();
+        msg = consumer.receive(3, TimeUnit.SECONDS);
+        assertNull(msg);
+    }
+
+    @Test
+    public void testDeduplicateChunksInSingleChunkMessages() throws Exception {
+        String topicName = 
"persistent://my-property/my-ns/testDeduplicateChunksInSingleChunkMessage";
+        String producerName = "test-producer";
+        @Cleanup
+        Consumer<String> consumer = pulsarClient
+                .newConsumer(Schema.STRING)
+                .subscriptionName("test-sub")
+                .topic(topicName)
+                .subscribe();
+        final PersistentTopic persistentTopic = (PersistentTopic) 
pulsar.getBrokerService()
+                .getTopicIfExists(topicName).get().orElse(null);
+        assertNotNull(persistentTopic);
+        sendChunk(persistentTopic, producerName, 1, 0, 2);
+        sendChunk(persistentTopic, producerName, 1, 1, 2);
+        sendChunk(persistentTopic, producerName, 1, 1, 2);
+
+        Message<String> message = consumer.receive(15, TimeUnit.SECONDS);
+        assertEquals(message.getData().length, 2);
+
+        sendChunk(persistentTopic, producerName, 2, 0, 3);
+        sendChunk(persistentTopic, producerName, 2, 1, 3);
+        sendChunk(persistentTopic, producerName, 2, 1, 3);
+        sendChunk(persistentTopic, producerName, 2, 2, 3);
+        message = consumer.receive(20, TimeUnit.SECONDS);
+        assertEquals(message.getData().length, 3);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
index 163c42d835b..3d24d3746d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingSharedTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -34,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -45,7 +47,6 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.awaitility.Awaitility;
@@ -217,7 +218,7 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
         sendChunk(persistentTopic, producerName, sequenceId, null, null);
     }
 
-    private static void sendChunk(final PersistentTopic persistentTopic,
+    protected static void sendChunk(final PersistentTopic persistentTopic,
                                   final String producerName,
                                   final long sequenceId,
                                   final Integer chunkId,
@@ -233,16 +234,33 @@ public class MessageChunkingSharedTest extends 
ProducerConsumerBase {
             metadata.setTotalChunkMsgSize(numChunks);
         }
         final ByteBuf buf = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, metadata,
-                PulsarByteBufAllocator.DEFAULT.buffer(1));
-        persistentTopic.publishMessage(buf, (e, ledgerId, entryId) -> {
-            String name = producerName + "-" + sequenceId;
-            if (chunkId != null) {
-                name += "-" + chunkId + "-" + numChunks;
+                Unpooled.wrappedBuffer("a".getBytes()));
+        persistentTopic.publishMessage(buf, new Topic.PublishContext() {
+            @Override
+            public boolean isChunked() {
+                return chunkId != null;
             }
-            if (e == null) {
-                log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
-            } else {
-                log.error("Failed to send {}: {}", name, e.getMessage());
+
+            @Override
+            public String getProducerName() {
+                return producerName;
+            }
+
+            public long getSequenceId() {
+                return sequenceId;
+            }
+
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                String name = producerName + "-" + sequenceId;
+                if (chunkId != null) {
+                    name += "-" + chunkId + "-" + numChunks;
+                }
+                if (e == null) {
+                    log.info("Sent {} to ({}, {})", name, ledgerId, entryId);
+                } else {
+                    log.error("Failed to send {}: {}", name, e.getMessage());
+                }
             }
         });
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 72932467d7f..ca3d2727ef0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -35,6 +35,7 @@ import io.netty.util.Timeout;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
@@ -1460,6 +1461,30 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
                 || msgMetadata.getChunkId() != 
(chunkedMsgCtx.lastChunkedMessageId + 1)) {
+            // Filter and ack duplicated chunks instead of discard ctx.
+            // For example:
+            //     Chunk-1 sequence ID: 0, chunk ID: 0, msgID: 1:1
+            //     Chunk-2 sequence ID: 0, chunk ID: 1, msgID: 1:2
+            //     Chunk-3 sequence ID: 0, chunk ID: 2, msgID: 1:3
+            //     Chunk-4 sequence ID: 0, chunk ID: 1, msgID: 1:4
+            //     Chunk-5 sequence ID: 0, chunk ID: 2, msgID: 1:5
+            //     Chunk-6 sequence ID: 0, chunk ID: 3, msgID: 1:6
+            // We should filter and ack chunk-4 and chunk-5.
+            if (chunkedMsgCtx != null && msgMetadata.getChunkId() <= 
chunkedMsgCtx.lastChunkedMessageId) {
+                log.warn("[{}] Receive a duplicated chunk message with 
messageId [{}], last-chunk-Id [{}], "
+                                + "chunkId [{}], sequenceId [{}]",
+                        msgMetadata.getProducerName(), msgId, 
chunkedMsgCtx.lastChunkedMessageId,
+                        msgMetadata.getChunkId(), msgMetadata.getSequenceId());
+                compressedPayload.release();
+                increaseAvailablePermits(cnx);
+                boolean repeatedlyReceived = 
Arrays.stream(chunkedMsgCtx.chunkedMessageIds)
+                        .anyMatch(messageId1 -> messageId1 != null && 
messageId1.ledgerId == messageId.getLedgerId()
+                                && messageId1.entryId == 
messageId.getEntryId());
+                if (!repeatedlyReceived) {
+                    doAcknowledge(msgId, AckType.Individual, 
Collections.emptyMap(), null);
+                }
+                return null;
+            }
             // means we lost the first chunk: should never happen
             log.info("[{}] [{}] Received unexpected chunk messageId {}, 
last-chunk-id = {}, chunkId = {}", topic,
                     subscription, msgId,

Reply via email to