This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new cbc3ea16936 [fix][client] fix negative message re-delivery twice issue
(#20750)
cbc3ea16936 is described below
commit cbc3ea169367e7e14cae3682f36e0da4fc9b587f
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 17 15:43:52 2023 +0800
[fix][client] fix negative message re-delivery twice issue (#20750)
(cherry picked from commit ecd16d68e29700cc9521e6d77d124a403c3486c9)
---
.../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 3 +++
pulsar-client-cpp/python/setup.py | 2 +-
.../java/org/apache/pulsar/client/impl/ConsumerBase.java | 4 ++++
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++-
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 3 +++
.../org/apache/pulsar/client/impl/NegativeAcksTracker.java | 12 +++++++++---
.../org/apache/pulsar/client/impl/UnAckedMessageTracker.java | 8 ++++++++
7 files changed, 30 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index f7acdf9baa6..3b9a3afcc1f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -143,6 +143,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
consumer.negativeAcknowledge(msg);
}
+ assertTrue(consumer instanceof ConsumerBase<String>);
+ assertEquals(((ConsumerBase<String>)
consumer).getUnAckedMessageTracker().size(), 0);
+
Set<String> receivedMessages = new HashSet<>();
// All the messages should be received again
diff --git a/pulsar-client-cpp/python/setup.py
b/pulsar-client-cpp/python/setup.py
index 3f172fe3d21..7d19376cd94 100644
--- a/pulsar-client-cpp/python/setup.py
+++ b/pulsar-client-cpp/python/setup.py
@@ -83,7 +83,7 @@ extras_require = {}
extras_require["functions"] = sorted(
{
"protobuf>=3.6.1,<=3.20.*",
- "grpcio<1.28,>=1.8.2",
+ "grpcio>=1.60",
"apache-bookkeeper-client>=4.9.2",
"prometheus_client",
"ratelimit"
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 883c5b1db87..27cacf53c12 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -150,6 +150,10 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
+ protected UnAckedMessageTracker getUnAckedMessageTracker() {
+ return unAckedMessageTracker;
+ }
+
protected void triggerBatchReceiveTimeoutTask() {
if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() >
0) {
batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 45ecb34f15d..0b785b87452 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -394,6 +394,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return connectionHandler;
}
+ @Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
@@ -769,7 +770,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
negativeAcksTracker.add(message);
// Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
- unAckedMessageTracker.remove(message.getMessageId());
+
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index b01c25d215b..0eea33e05d8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -567,6 +567,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
+ unAckedMessageTracker.remove(messageId);
}
@Override
@@ -577,6 +578,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicPartitionName());
consumer.negativeAcknowledge(message);
+ unAckedMessageTracker.remove(messageId);
}
@Override
@@ -848,6 +850,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return stats;
}
+ @Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 6273f4d582e..f24872fb619 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -30,8 +30,11 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class NegativeAcksTracker implements Closeable {
+ private static final Logger log =
LoggerFactory.getLogger(NegativeAcksTracker.class);
private HashMap<MessageId, Long> nackedMessages = null;
@@ -77,9 +80,12 @@ class NegativeAcksTracker implements Closeable {
}
});
- messagesToRedeliver.forEach(nackedMessages::remove);
- consumer.onNegativeAcksSend(messagesToRedeliver);
- consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+ if (!messagesToRedeliver.isEmpty()) {
+ messagesToRedeliver.forEach(nackedMessages::remove);
+ consumer.onNegativeAcksSend(messagesToRedeliver);
+ log.info("[{}] {} messages will be re-delivered", consumer,
messagesToRedeliver.size());
+ consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+ }
this.timeout = timer.newTimeout(this::triggerRedelivery,
timerIntervalNanos, TimeUnit.NANOSECONDS);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 9ad30296c21..bd7329ea518 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -189,6 +189,10 @@ public class UnAckedMessageTracker implements Closeable {
}
public boolean add(MessageId messageId) {
+ if (messageId == null) {
+ return false;
+ }
+
writeLock.lock();
try {
HashSet<MessageId> partition = timePartitions.peekLast();
@@ -217,6 +221,10 @@ public class UnAckedMessageTracker implements Closeable {
}
public boolean remove(MessageId messageId) {
+ if (messageId == null) {
+ return false;
+ }
+
writeLock.lock();
try {
boolean removed = false;