This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4fe0ddd0b45 [fix][client] fix negative message re-delivery twice issue
(#20750)
4fe0ddd0b45 is described below
commit 4fe0ddd0b459de542347d78448c57ec2021dc117
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 17 15:43:52 2023 +0800
[fix][client] fix negative message re-delivery twice issue (#20750)
(cherry picked from commit ecd16d68e29700cc9521e6d77d124a403c3486c9)
---
.../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 3 +++
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 3 ++-
.../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 3 +++
.../org/apache/pulsar/client/impl/NegativeAcksTracker.java | 12 +++++++++---
.../org/apache/pulsar/client/impl/UnAckedMessageTracker.java | 8 ++++++++
5 files changed, 25 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index d4ea058f50f..ee2fbce240f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -152,6 +152,9 @@ public class NegativeAcksTest extends ProducerConsumerBase {
}
+ assertTrue(consumer instanceof ConsumerBase<String>);
+ assertEquals(((ConsumerBase<String>)
consumer).getUnAckedMessageTracker().size(), 0);
+
Set<String> receivedMessages = new HashSet<>();
// All the messages should be received again
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index bf703ce047c..115067235cf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -390,6 +390,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return connectionHandler;
}
+ @Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
@@ -762,7 +763,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
negativeAcksTracker.add(message);
// Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
- unAckedMessageTracker.remove(message.getMessageId());
+
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 418dd8561db..e7b876b9273 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -560,6 +560,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
consumer.negativeAcknowledge(messageId);
+ unAckedMessageTracker.remove(messageId);
}
@Override
@@ -568,6 +569,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId)
messageId).getOwnerTopic());
consumer.negativeAcknowledge(message);
+ unAckedMessageTracker.remove(messageId);
}
@Override
@@ -882,6 +884,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
return stats;
}
+ @Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
index 37f58a02180..d6b86e3593d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -32,8 +32,11 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class NegativeAcksTracker implements Closeable {
+ private static final Logger log =
LoggerFactory.getLogger(NegativeAcksTracker.class);
private HashMap<MessageId, Long> nackedMessages = null;
@@ -79,9 +82,12 @@ class NegativeAcksTracker implements Closeable {
}
});
- messagesToRedeliver.forEach(nackedMessages::remove);
- consumer.onNegativeAcksSend(messagesToRedeliver);
- consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+ if (!messagesToRedeliver.isEmpty()) {
+ messagesToRedeliver.forEach(nackedMessages::remove);
+ consumer.onNegativeAcksSend(messagesToRedeliver);
+ log.info("[{}] {} messages will be re-delivered", consumer,
messagesToRedeliver.size());
+ consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+ }
this.timeout = timer.newTimeout(this::triggerRedelivery,
timerIntervalNanos, TimeUnit.NANOSECONDS);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 220771e426f..20ec9c3d99a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -192,6 +192,10 @@ public class UnAckedMessageTracker implements Closeable {
}
public boolean add(MessageId messageId) {
+ if (messageId == null) {
+ return false;
+ }
+
writeLock.lock();
try {
HashSet<MessageId> partition = timePartitions.peekLast();
@@ -220,6 +224,10 @@ public class UnAckedMessageTracker implements Closeable {
}
public boolean remove(MessageId messageId) {
+ if (messageId == null) {
+ return false;
+ }
+
writeLock.lock();
try {
boolean removed = false;