This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 02f3ecc1d20 [fix][client] Fix negative acknowledgement by messageId
(#23060)
02f3ecc1d20 is described below
commit 02f3ecc1d20cda17edd4c308f401b3c15463753c
Author: Hideaki Oguni <[email protected]>
AuthorDate: Mon Jul 29 16:29:59 2024 +0900
[fix][client] Fix negative acknowledgement by messageId (#23060)
(cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4)
---
.../org/apache/pulsar/client/impl/NegativeAcksTest.java | 13 ++++++++-----
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 7812844bdc2..d4ea058f50f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -135,7 +135,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
Set<String> sentMessages = new HashSet<>();
final int N = 10;
- for (int i = 0; i < N; i++) {
+ for (int i = 0; i < N * 2; i++) {
String value = "test-" + i;
producer.sendAsync(value);
sentMessages.add(value);
@@ -146,11 +146,16 @@ public class NegativeAcksTest extends
ProducerConsumerBase {
Message<String> msg = consumer.receive();
consumer.negativeAcknowledge(msg);
}
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer.receive();
+ consumer.negativeAcknowledge(msg.getMessageId());
+ }
+
Set<String> receivedMessages = new HashSet<>();
// All the messages should be received again
- for (int i = 0; i < N; i++) {
+ for (int i = 0; i < N * 2; i++) {
Message<String> msg = consumer.receive();
receivedMessages.add(msg.getValue());
consumer.acknowledge(msg);
@@ -308,9 +313,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
- unAckedMessageTracker.add(batchMessageId);
- unAckedMessageTracker.add(batchMessageId2);
- unAckedMessageTracker.add(batchMessageId3);
+ unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ca5d0f34676..bf703ce047c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -754,7 +754,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
negativeAcksTracker.add(messageId);
// Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
- unAckedMessageTracker.remove(messageId);
+
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId));
}
@Override