This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 6ff1b6489c9 [fix][client] Fix the message present in incoming queue 
after go to DLQ (#17326)
6ff1b6489c9 is described below

commit 6ff1b6489c950bec0e86ee03661903aa2c05ed65
Author: congbo <[email protected]>
AuthorDate: Tue Aug 30 06:57:09 2022 +0800

    [fix][client] Fix the message present in incoming queue after go to DLQ 
(#17326)
    
    (cherry picked from commit 2c1d2130e65a075f5a0bb261f5bd617e88d2e945)
---
 .../org/apache/pulsar/client/impl/TransactionEndToEndTest.java |  4 ++++
 .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 10 +++++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 7771c4ca0ed..f5788e8d4b6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1159,6 +1159,8 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         // the message will be sent to DLQ, can't receive
         assertNull(consumer.receive(3, TimeUnit.SECONDS));
 
+        assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
+
         assertEquals(value, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
     }
 
@@ -1223,6 +1225,8 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         // the message will be sent to DLQ, can't receive
         assertNull(consumer.receive(3, TimeUnit.SECONDS));
 
+        assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);
+
         assertEquals(value1, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
         assertEquals(value2, new String(deadLetterConsumer.receive(3, 
TimeUnit.SECONDS).getValue()));
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 789e82cc1b7..3bcf95e7813 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1244,6 +1244,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                             Collections.singletonList(message));
                     if (redeliveryCount > 
deadLetterPolicy.getMaxRedeliverCount()) {
                         
redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
+                        // The message is skipped due to reaching the max 
redelivery count,
+                        // so we need to increase the available permits
+                        increaseAvailablePermits(cnx);
                         return;
                     }
                 }
@@ -1418,6 +1421,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
                 if (possibleToDeadLetter != null) {
                     possibleToDeadLetter.add(message);
+                    // Skip the message which reaches the max redelivery count.
+                    if (redeliveryCount > 
deadLetterPolicy.getMaxRedeliverCount()) {
+                        skippedMessages++;
+                        continue;
+                    }
+
                 }
                 executeNotifyCallback(message);
             }
@@ -1435,7 +1444,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         possibleToDeadLetter);
                 if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) 
{
                     
redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
-                    return;
                 }
             }
         }

Reply via email to