This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ecd16d68e29 [fix][client] fix negative message re-delivery twice issue 
(#20750)
ecd16d68e29 is described below

commit ecd16d68e29700cc9521e6d77d124a403c3486c9
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 17 15:43:52 2023 +0800

    [fix][client] fix negative message re-delivery twice issue (#20750)
---
 .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java |  3 +++
 .../java/org/apache/pulsar/client/impl/ConsumerBase.java     |  4 ++++
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java     |  3 ++-
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   |  3 +++
 .../org/apache/pulsar/client/impl/NegativeAcksTracker.java   | 12 +++++++++---
 .../org/apache/pulsar/client/impl/UnAckedMessageTracker.java |  8 ++++++++
 6 files changed, 29 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 0ae36b4ca90..876fa98bce4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -146,6 +146,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
             consumer.negativeAcknowledge(msg);
         }
 
+        assertTrue(consumer instanceof ConsumerBase<String>);
+        assertEquals(((ConsumerBase<String>) 
consumer).getUnAckedMessageTracker().size(), 0);
+
         Set<String> receivedMessages = new HashSet<>();
 
         // All the messages should be received again
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index e933005f2d6..fec428824c2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -193,6 +193,10 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         initReceiverQueueSize();
     }
 
+    protected UnAckedMessageTracker getUnAckedMessageTracker() {
+        return unAckedMessageTracker;
+    }
+
     protected void triggerBatchReceiveTimeoutTask() {
         if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 
0) {
             batchReceiveTimeout = 
client.timer().newTimeout(this::pendingBatchReceiveTask,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b0d7d3a0f8b..a929fe9aa6b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -385,6 +385,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return connectionHandler;
     }
 
+    @Override
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
@@ -756,7 +757,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         negativeAcksTracker.add(message);
 
         // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
-        unAckedMessageTracker.remove(message.getMessageId());
+        
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fb7be3c5a5e..8a515a9f9b8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -546,6 +546,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(messageId);
+        unAckedMessageTracker.remove(messageId);
     }
 
     @Override
@@ -554,6 +555,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         checkArgument(messageId instanceof TopicMessageId);
         ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) 
messageId).getOwnerTopic());
         consumer.negativeAcknowledge(message);
+        unAckedMessageTracker.remove(messageId);
     }
 
     @Override
@@ -852,6 +854,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return stats;
     }
 
+    @Override
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 37f58a02180..d6b86e3593d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -32,8 +32,11 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class NegativeAcksTracker implements Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(NegativeAcksTracker.class);
 
     private HashMap<MessageId, Long> nackedMessages = null;
 
@@ -79,9 +82,12 @@ class NegativeAcksTracker implements Closeable {
             }
         });
 
-        messagesToRedeliver.forEach(nackedMessages::remove);
-        consumer.onNegativeAcksSend(messagesToRedeliver);
-        consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+        if (!messagesToRedeliver.isEmpty()) {
+            messagesToRedeliver.forEach(nackedMessages::remove);
+            consumer.onNegativeAcksSend(messagesToRedeliver);
+            log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
+            consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+        }
 
         this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 534f3335026..69f86a1a89f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -189,6 +189,10 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public boolean add(MessageId messageId) {
+        if (messageId == null) {
+            return false;
+        }
+
         writeLock.lock();
         try {
             HashSet<MessageId> partition = timePartitions.peekLast();
@@ -217,6 +221,10 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public boolean remove(MessageId messageId) {
+        if (messageId == null) {
+            return false;
+        }
+
         writeLock.lock();
         try {
             boolean removed = false;

Reply via email to