This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4fe0ddd0b45 [fix][client] fix negative message re-delivery twice issue 
(#20750)
4fe0ddd0b45 is described below

commit 4fe0ddd0b459de542347d78448c57ec2021dc117
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 17 15:43:52 2023 +0800

    [fix][client] fix negative message re-delivery twice issue (#20750)
    
    (cherry picked from commit ecd16d68e29700cc9521e6d77d124a403c3486c9)
---
 .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java |  3 +++
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java     |  3 ++-
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   |  3 +++
 .../org/apache/pulsar/client/impl/NegativeAcksTracker.java   | 12 +++++++++---
 .../org/apache/pulsar/client/impl/UnAckedMessageTracker.java |  8 ++++++++
 5 files changed, 25 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index d4ea058f50f..ee2fbce240f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -152,6 +152,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
         }
 
 
+        assertTrue(consumer instanceof ConsumerBase<String>);
+        assertEquals(((ConsumerBase<String>) 
consumer).getUnAckedMessageTracker().size(), 0);
+
         Set<String> receivedMessages = new HashSet<>();
 
         // All the messages should be received again
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index bf703ce047c..115067235cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -390,6 +390,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return connectionHandler;
     }
 
+    @Override
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
@@ -762,7 +763,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         negativeAcksTracker.add(message);
 
         // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
-        unAckedMessageTracker.remove(message.getMessageId());
+        
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 418dd8561db..e7b876b9273 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -560,6 +560,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(messageId);
+        unAckedMessageTracker.remove(messageId);
     }
 
     @Override
@@ -568,6 +569,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(message);
+        unAckedMessageTracker.remove(messageId);
     }
 
     @Override
@@ -882,6 +884,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return stats;
     }
 
+    @Override
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 37f58a02180..d6b86e3593d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -32,8 +32,11 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class NegativeAcksTracker implements Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(NegativeAcksTracker.class);
 
     private HashMap<MessageId, Long> nackedMessages = null;
 
@@ -79,9 +82,12 @@ class NegativeAcksTracker implements Closeable {
             }
         });
 
-        messagesToRedeliver.forEach(nackedMessages::remove);
-        consumer.onNegativeAcksSend(messagesToRedeliver);
-        consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+        if (!messagesToRedeliver.isEmpty()) {
+            messagesToRedeliver.forEach(nackedMessages::remove);
+            consumer.onNegativeAcksSend(messagesToRedeliver);
+            log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
+            consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+        }
 
         this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 220771e426f..20ec9c3d99a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -192,6 +192,10 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public boolean add(MessageId messageId) {
+        if (messageId == null) {
+            return false;
+        }
+
         writeLock.lock();
         try {
             HashSet<MessageId> partition = timePartitions.peekLast();
@@ -220,6 +224,10 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public boolean remove(MessageId messageId) {
+        if (messageId == null) {
+            return false;
+        }
+
         writeLock.lock();
         try {
             boolean removed = false;

Reply via email to