This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new c1aab487f19 [fix][client] Fix negative acknowledgement by messageId
(#23060)
c1aab487f19 is described below
commit c1aab487f199bdc2cc671694272fd733b441e829
Author: Hideaki Oguni <[email protected]>
AuthorDate: Mon Jul 29 16:29:59 2024 +0900
[fix][client] Fix negative acknowledgement by messageId (#23060)
(cherry picked from commit d4bbf10f58771e2d43e576dc3422e502834b1de4)
---
.../org/apache/pulsar/client/impl/NegativeAcksTest.java | 13 ++++++++-----
.../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 2 +-
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index a6b77a1c727..a41b7f05a8e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -134,7 +134,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
Set<String> sentMessages = new HashSet<>();
final int N = 10;
- for (int i = 0; i < N; i++) {
+ for (int i = 0; i < N * 2; i++) {
String value = "test-" + i;
producer.sendAsync(value);
sentMessages.add(value);
@@ -146,13 +146,18 @@ public class NegativeAcksTest extends
ProducerConsumerBase {
consumer.negativeAcknowledge(msg);
}
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer.receive();
+ consumer.negativeAcknowledge(msg.getMessageId());
+ }
+
assertTrue(consumer instanceof ConsumerBase<String>);
assertEquals(((ConsumerBase<String>)
consumer).getUnAckedMessageTracker().size(), 0);
Set<String> receivedMessages = new HashSet<>();
// All the messages should be received again
- for (int i = 0; i < N; i++) {
+ for (int i = 0; i < N * 2; i++) {
Message<String> msg = consumer.receive();
receivedMessages.add(msg.getValue());
consumer.acknowledge(msg);
@@ -310,9 +315,7 @@ public class NegativeAcksTest extends ProducerConsumerBase {
assertEquals(unAckedMessageTracker.size(), 0);
negativeAcksTracker.close();
// negative batch message id
- unAckedMessageTracker.add(batchMessageId);
- unAckedMessageTracker.add(batchMessageId2);
- unAckedMessageTracker.add(batchMessageId3);
+ unAckedMessageTracker.add(messageId);
consumer.negativeAcknowledge(batchMessageId);
consumer.negativeAcknowledge(batchMessageId2);
consumer.negativeAcknowledge(batchMessageId3);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 6744a65d556..4b21d0908bc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -765,7 +765,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
negativeAcksTracker.add(messageId);
// Ensure the message is not redelivered for ack-timeout, since we did
receive an "ack"
- unAckedMessageTracker.remove(messageId);
+
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(messageId));
}
@Override