This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new cbc3ea16936 [fix][client] fix negative message re-delivery twice issue 
(#20750)
cbc3ea16936 is described below

commit cbc3ea169367e7e14cae3682f36e0da4fc9b587f
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 17 15:43:52 2023 +0800

    [fix][client] fix negative message re-delivery twice issue (#20750)
    
    (cherry picked from commit ecd16d68e29700cc9521e6d77d124a403c3486c9)
---
 .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java |  3 +++
 pulsar-client-cpp/python/setup.py                            |  2 +-
 .../java/org/apache/pulsar/client/impl/ConsumerBase.java     |  4 ++++
 .../java/org/apache/pulsar/client/impl/ConsumerImpl.java     |  3 ++-
 .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   |  3 +++
 .../org/apache/pulsar/client/impl/NegativeAcksTracker.java   | 12 +++++++++---
 .../org/apache/pulsar/client/impl/UnAckedMessageTracker.java |  8 ++++++++
 7 files changed, 30 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index f7acdf9baa6..3b9a3afcc1f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -143,6 +143,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
             consumer.negativeAcknowledge(msg);
         }
 
+        assertTrue(consumer instanceof ConsumerBase<String>);
+        assertEquals(((ConsumerBase<String>) 
consumer).getUnAckedMessageTracker().size(), 0);
+
         Set<String> receivedMessages = new HashSet<>();
 
         // All the messages should be received again
diff --git a/pulsar-client-cpp/python/setup.py 
b/pulsar-client-cpp/python/setup.py
index 3f172fe3d21..7d19376cd94 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -83,7 +83,7 @@ extras_require = {}
 extras_require["functions"] = sorted(
     {
       "protobuf>=3.6.1,<=3.20.*",
-      "grpcio<1.28,>=1.8.2",
+      "grpcio>=1.60",
       "apache-bookkeeper-client>=4.9.2",
       "prometheus_client",
       "ratelimit"
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 883c5b1db87..27cacf53c12 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -150,6 +150,10 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
     }
 
+    protected UnAckedMessageTracker getUnAckedMessageTracker() {
+        return unAckedMessageTracker;
+    }
+
     protected void triggerBatchReceiveTimeoutTask() {
         if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 
0) {
             batchReceiveTimeout = 
client.timer().newTimeout(this::pendingBatchReceiveTask,
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 45ecb34f15d..0b785b87452 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -394,6 +394,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return connectionHandler;
     }
 
+    @Override
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
@@ -769,7 +770,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         negativeAcksTracker.add(message);
 
         // Ensure the message is not redelivered for ack-timeout, since we did 
receive an "ack"
-        unAckedMessageTracker.remove(message.getMessageId());
+        
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index b01c25d215b..0eea33e05d8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -567,6 +567,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
         consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
+        unAckedMessageTracker.remove(messageId);
     }
 
     @Override
@@ -577,6 +578,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
         ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicPartitionName());
         consumer.negativeAcknowledge(message);
+        unAckedMessageTracker.remove(messageId);
     }
 
     @Override
@@ -848,6 +850,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
         return stats;
     }
 
+    @Override
     public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 6273f4d582e..f24872fb619 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -30,8 +30,11 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.RedeliveryBackoff;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class NegativeAcksTracker implements Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(NegativeAcksTracker.class);
 
     private HashMap<MessageId, Long> nackedMessages = null;
 
@@ -77,9 +80,12 @@ class NegativeAcksTracker implements Closeable {
             }
         });
 
-        messagesToRedeliver.forEach(nackedMessages::remove);
-        consumer.onNegativeAcksSend(messagesToRedeliver);
-        consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+        if (!messagesToRedeliver.isEmpty()) {
+            messagesToRedeliver.forEach(nackedMessages::remove);
+            consumer.onNegativeAcksSend(messagesToRedeliver);
+            log.info("[{}] {} messages will be re-delivered", consumer, 
messagesToRedeliver.size());
+            consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+        }
 
         this.timeout = timer.newTimeout(this::triggerRedelivery, 
timerIntervalNanos, TimeUnit.NANOSECONDS);
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 9ad30296c21..bd7329ea518 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -189,6 +189,10 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public boolean add(MessageId messageId) {
+        if (messageId == null) {
+            return false;
+        }
+
         writeLock.lock();
         try {
             HashSet<MessageId> partition = timePartitions.peekLast();
@@ -217,6 +221,10 @@ public class UnAckedMessageTracker implements Closeable {
     }
 
     public boolean remove(MessageId messageId) {
+        if (messageId == null) {
+            return false;
+        }
+
         writeLock.lock();
         try {
             boolean removed = false;

Reply via email to