This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ecd16d68e29 [fix][client] fix negative message re-delivery twice issue
(#20750)
ecd16d68e29 is described below
commit ecd16d68e29700cc9521e6d77d124a403c3486c9
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 17 15:43:52 2023 +0800
[fix][client] fix negative message re-delivery twice issue (#20750)
---
.../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 3 +++
.../java/org/apache/pulsar/client/impl/ConsumerBase.java | 4 ++++
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++-
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 3 +++
.../org/apache/pulsar/client/impl/NegativeAcksTracker.java | 12 +++++++++---
.../org/apache/pulsar/client/impl/UnAckedMessageTracker.java | 8 ++++++++
6 files changed, 29 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 0ae36b4ca90..876fa98bce4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -146,6 +146,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
consumer.negativeAcknowledge(msg);
}
+ assertTrue(consumer instanceof ConsumerBase<String>);
+ assertEquals(((ConsumerBase<String>)
consumer).getUnAckedMessageTracker().size(), 0);
+
Set<String> receivedMessages = new HashSet<>();
// All the messages should be received again
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index e933005f2d6..fec428824c2 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -193,6 +193,10 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
initReceiverQueueSize();
}
+ protected UnAckedMessageTracker getUnAckedMessageTracker() {
+ return unAckedMessageTracker;
+ }
+
protected void triggerBatchReceiveTimeoutTask() {
if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() >
0) {
batchReceiveTimeout =
client.timer().newTimeout(this::pendingBatchReceiveTask,
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b0d7d3a0f8b..a929fe9aa6b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -385,6 +385,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return connectionHandler;
}
+ @Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
@@ -756,7 +757,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
negativeAcksTracker.add(message);
// Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
- unAckedMessageTracker.remove(message.getMessageId());
+
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fb7be3c5a5e..8a515a9f9b8 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -546,6 +546,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
consumer.negativeAcknowledge(messageId);
+ unAckedMessageTracker.remove(messageId);
}
@Override
@@ -554,6 +555,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
consumer.negativeAcknowledge(message);
+ unAckedMessageTracker.remove(messageId);
}
@Override
@@ -852,6 +854,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return stats;
}
+ @Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 37f58a02180..d6b86e3593d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -32,8 +32,11 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class NegativeAcksTracker implements Closeable {
+ private static final Logger log =
LoggerFactory.getLogger(NegativeAcksTracker.class);
private HashMap<MessageId, Long> nackedMessages = null;
@@ -79,9 +82,12 @@ class NegativeAcksTracker implements Closeable {
}
});
- messagesToRedeliver.forEach(nackedMessages::remove);
- consumer.onNegativeAcksSend(messagesToRedeliver);
- consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+ if (!messagesToRedeliver.isEmpty()) {
+ messagesToRedeliver.forEach(nackedMessages::remove);
+ consumer.onNegativeAcksSend(messagesToRedeliver);
+ log.info("[{}] {} messages will be re-delivered", consumer,
messagesToRedeliver.size());
+ consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+ }
this.timeout = timer.newTimeout(this::triggerRedelivery,
timerIntervalNanos, TimeUnit.NANOSECONDS);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 534f3335026..69f86a1a89f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -189,6 +189,10 @@ public class UnAckedMessageTracker implements Closeable {
}
public boolean add(MessageId messageId) {
+ if (messageId == null) {
+ return false;
+ }
+
writeLock.lock();
try {
HashSet<MessageId> partition = timePartitions.peekLast();
@@ -217,6 +221,10 @@ public class UnAckedMessageTracker implements Closeable {
}
public boolean remove(MessageId messageId) {
+ if (messageId == null) {
+ return false;
+ }
+
writeLock.lock();
try {
boolean removed = false;