This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 7317c84421e [fix][client] Fix reach redeliverCount client can't send 
batch messages to DLQ (#17317)
7317c84421e is described below

commit 7317c84421ef33e5391139774d8f5847328d9714
Author: congbo <[email protected]>
AuthorDate: Sun Aug 28 20:10:57 2022 +0800

    [fix][client] Fix reach redeliverCount client can't send batch messages to 
DLQ (#17317)
    
    (cherry picked from commit 0909853873fc61395df7a68de13335d9f770383a)
---
 .../client/impl/TransactionEndToEndTest.java       | 66 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 +++-
 2 files changed, 75 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index cb943a8b0b9..7771c4ca0ed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1116,6 +1116,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         @Cleanup
         ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
                 .topic(topic)
+                .enableBatching(false)
                 .sendTimeout(1, TimeUnit.SECONDS)
                 .create();
 
@@ -1160,4 +1161,69 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         assertEquals(value, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
     }
+
+    @Test
+    public void testSendTxnAckBatchMessageToDLQ() throws Exception {
+        String topic = NAMESPACE1 + "/testSendTxnAckBatchMessageToDLQ";
+        String subName = "test";
+        String value1 = "test1";
+        String value2 = "test2";
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Shared)
+                // consumer can't receive the same message three times
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscriptionName(subName)
+                .subscribe();
+
+        @Cleanup
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
+                .topic(String.format("%s-%s" + 
RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
+                        topic, subName))
+                .subscriptionType(SubscriptionType.Shared)
+                
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
+                .subscriptionName("test")
+                .subscribe();
+
+        producer.sendAsync(value1.getBytes());
+        producer.sendAsync(value2.getBytes());
+        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
+                .build().get();
+
+        Message<byte[]> message = consumer.receive();
+        assertEquals(value1, new String(message.getValue()));
+        // consumer receive the batch message one the first time, 
redeliverCount = 0
+        consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+
+        transaction.abort().get();
+
+        // consumer will receive the batch message two and then receive
+        // the message one and message two again, redeliverCount = 1
+        for (int i = 0; i < 3; i ++) {
+            message = consumer.receive();
+        }
+
+        transaction = pulsarClient.newTransaction().withTransactionTimeout(5, 
TimeUnit.MINUTES)
+                .build().get();
+
+        assertEquals(value2, new String(message.getValue()));
+        // consumer receive the batch message two the second time, 
redeliverCount = 1, also can be received
+        consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
+
+        transaction.abort().get();
+
+        // consumer receive the batch message the third time, redeliverCount = 
2,
+        // the message will be sent to DLQ, can't receive
+        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+
+        assertEquals(value1, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
+        assertEquals(value2, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 8234ed216e3..789e82cc1b7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1429,8 +1429,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             discardCorruptedMessage(messageId, cnx, 
ValidationError.BatchDeSerializeError);
         }
 
-        if (possibleToDeadLetter != null && 
possibleSendToDeadLetterTopicMessages != null) {
-            possibleSendToDeadLetterTopicMessages.put(batchMessage, 
possibleToDeadLetter);
+        if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages 
!= null) {
+            if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+                possibleSendToDeadLetterTopicMessages.put(batchMessage,
+                        possibleToDeadLetter);
+                if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) 
{
+                    
redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
+                    return;
+                }
+            }
         }
 
         if (log.isDebugEnabled()) {

Reply via email to