This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 67d9c056f3b [fix][client] Fix cannot retry chunk messages and send to 
DLQ (#21048)
67d9c056f3b is described below

commit 67d9c056f3b681df542f23ee2f6c602885e7c989
Author: Zike Yang <[email protected]>
AuthorDate: Thu Aug 31 07:44:34 2023 +0800

    [fix][client] Fix cannot retry chunk messages and send to DLQ (#21048)
    
    (cherry picked from commit 99e3fea2e3adecb3346d1fc8e5422a3fde4236e3)
---
 .../pulsar/client/api/DeadLetterTopicTest.java     | 48 ++++++++++++++++++----
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 ++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  9 +++-
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  7 +++-
 .../client/impl/UnAckedMessageTrackerTest.java     | 48 +++++++++++++++++++++-
 5 files changed, 103 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index cb9a4c3c104..3731e6cd5b2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -46,9 +47,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-@Test(groups = "flaky")
+@Test(groups = "broker-impl")
 public class DeadLetterTopicTest extends ProducerConsumerBase {
 
     private static final Logger log = 
LoggerFactory.getLogger(DeadLetterTopicTest.class);
@@ -56,6 +58,7 @@ public class DeadLetterTopicTest extends ProducerConsumerBase 
{
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        this.conf.setMaxMessageSize(5 * 1024);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -66,6 +69,15 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    private String createMessagePayload(int size) {
+        StringBuilder str = new StringBuilder();
+        Random rand = new Random();
+        for (int i = 0; i < size; i++) {
+            str.append(rand.nextInt(10));
+        }
+        return str.toString();
+    }
+
     @Test
     public void testDeadLetterTopicWithMessageKey() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
@@ -125,9 +137,13 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
         consumer.close();
     }
 
+    @DataProvider(name = "produceLargeMessages")
+    public Object[][] produceLargeMessages() {
+        return new Object[][] { { false }, { true } };
+    }
 
-    @Test(groups = "quarantine")
-    public void testDeadLetterTopic() throws Exception {
+    @Test(dataProvider = "produceLargeMessages")
+    public void testDeadLetterTopic(boolean produceLargeMessages) throws 
Exception {
         final String topic = 
"persistent://my-property/my-ns/dead-letter-topic";
 
         final int maxRedeliveryCount = 2;
@@ -154,28 +170,44 @@ public class DeadLetterTopicTest extends 
ProducerConsumerBase {
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                 .topic(topic)
+                .enableChunking(produceLargeMessages)
+                .enableBatching(!produceLargeMessages)
                 .create();
 
+        Map<Integer, String> messageContent = new HashMap<>();
+
         for (int i = 0; i < sendMessages; i++) {
-            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+            String data;
+            if (!produceLargeMessages) {
+                data = String.format("Hello Pulsar [%d]", i);
+            } else {
+                data = createMessagePayload(1024 * 10);
+            }
+            
producer.newMessage().key(String.valueOf(i)).value(data.getBytes()).send();
+            messageContent.put(i, data);
         }
 
         producer.close();
 
         int totalReceived = 0;
         do {
-            Message<byte[]> message = consumer.receive();
-            log.info("consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message, "The consumer should be able to receive 
messages.");
+            log.info("consumer received message : {}", message.getMessageId());
             totalReceived++;
         } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
 
         int totalInDeadLetter = 0;
         do {
-            Message message = deadLetterConsumer.receive();
-            log.info("dead letter consumer received message : {} {}", 
message.getMessageId(), new String(message.getData()));
+            Message message = deadLetterConsumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message, "the deadLetterConsumer should receive 
messages.");
+            assertEquals(new String(message.getData()), 
messageContent.get(Integer.parseInt(message.getKey())));
+            messageContent.remove(Integer.parseInt(message.getKey()));
+            log.info("dead letter consumer received message : {}", 
message.getMessageId());
             deadLetterConsumer.acknowledge(message);
             totalInDeadLetter++;
         } while (totalInDeadLetter < sendMessages);
+        assertTrue(messageContent.isEmpty());
 
         deadLetterConsumer.close();
         consumer.close();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a5e32854434..ac453281d0d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -653,6 +653,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                     retryLetterProducer = client.newProducer(schema)
                             .topic(this.deadLetterPolicy.getRetryLetterTopic())
                             .enableBatching(false)
+                            .enableChunking(true)
                             .blockIfQueueFull(false)
                             .create();
                 }
@@ -2206,6 +2207,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                                     
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
                                     
.topic(this.deadLetterPolicy.getDeadLetterTopic())
                                     .blockIfQueueFull(false)
+                                    .enableBatching(false)
+                                    .enableChunking(true)
                                     .createAsync();
                 }
             } finally {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 869aad0fc01..3c481e641c5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -687,7 +687,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 op = OpSendMsg.create(msg, null, sequenceId, callback);
                 final MessageMetadata finalMsgMetadata = msgMetadata;
                 op.rePopulate = () -> {
-                    op.cmd = sendMessage(producerId, sequenceId, numMessages, 
finalMsgMetadata, encryptedPayload);
+                    if (msgMetadata.hasChunkId()) {
+                        // The message metadata is shared between all chunks 
in a large message
+                        // We need to reset the chunk id for each call of this 
method
+                        // It's safe to do that because there is only 1 thread 
to manipulate this message metadata
+                        finalMsgMetadata.setChunkId(chunkId);
+                    }
+                    op.cmd = sendMessage(producerId, sequenceId, numMessages, 
finalMsgMetadata,
+                            encryptedPayload);
                 };
             }
             op.setNumMessagesInBatch(numMessages);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 9ad30296c21..5a706720dc5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -138,8 +138,11 @@ public class UnAckedMessageTracker implements Closeable {
                         if (!headPartition.isEmpty()) {
                             log.info("[{}] {} messages will be re-delivered", 
consumerBase, headPartition.size());
                             headPartition.forEach(messageId -> {
-                                
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, 
consumerBase);
-                                messageIds.add(messageId);
+                                if (messageId instanceof ChunkMessageIdImpl) {
+                                    
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, 
consumerBase);
+                                } else {
+                                    messageIds.add(messageId);
+                                }
                                 messageIdPartitionMap.remove(messageId);
                             });
                         }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
index 323333afc74..029ad0fc34c 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/UnAckedMessageTrackerTest.java
@@ -29,12 +29,14 @@ import static org.mockito.Mockito.when;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 public class UnAckedMessageTrackerTest  {
@@ -77,4 +79,48 @@ public class UnAckedMessageTrackerTest  {
         timer.stop();
     }
 
+    @Test
+    public void testTrackChunkedMessageId() {
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        Timer timer = new HashedWheelTimer(new 
DefaultThreadFactory("pulsar-timer", Thread.currentThread().isDaemon()),
+                1, TimeUnit.MILLISECONDS);
+        when(client.timer()).thenReturn(timer);
+
+        ConsumerBase<byte[]> consumer = mock(ConsumerBase.class);
+        doNothing().when(consumer).onAckTimeoutSend(any());
+        doNothing().when(consumer).redeliverUnacknowledgedMessages(any());
+        ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
+        conf.setAckTimeoutMillis(1000);
+        conf.setTickDurationMillis(1000);
+        UnAckedMessageTracker tracker = new UnAckedMessageTracker(client, 
consumer, conf);
+
+        assertTrue(tracker.isEmpty());
+        assertEquals(tracker.size(), 0);
+
+        // Build chunked message ID
+        MessageIdImpl[] chunkMsgIds = new MessageIdImpl[5];
+        for (int i = 0; i < 5; i++) {
+            chunkMsgIds[i] = new MessageIdImpl(1L, i, -1);
+        }
+        ChunkMessageIdImpl chunkedMessageId =
+                new ChunkMessageIdImpl(chunkMsgIds[0], 
chunkMsgIds[chunkMsgIds.length - 1]);
+
+        consumer.unAckedChunkedMessageIdSequenceMap =
+                ConcurrentOpenHashMap.<MessageIdImpl, 
MessageIdImpl[]>newBuilder().build();
+        consumer.unAckedChunkedMessageIdSequenceMap.put(chunkedMessageId, 
chunkMsgIds);
+
+        // Redeliver chunked message
+        tracker.add(chunkedMessageId);
+
+        Awaitility.await()
+                .pollInterval(Duration.ofMillis(200))
+                .atMost(Duration.ofSeconds(3))
+                .untilAsserted(() -> assertEquals(tracker.size(), 0));
+
+        // Assert that all chunk message ID are removed from 
unAckedChunkedMessageIdSequenceMap
+        assertEquals(consumer.unAckedChunkedMessageIdSequenceMap.size(), 0);
+
+        timer.stop();
+    }
+
 }

Reply via email to