This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 999db2c7f5a [fix] [broker] Fix acknowledgeCumulativeAsync block when ackReceipt is enabled (#23841) 999db2c7f5a is described below commit 999db2c7f5a8a3c09e2055fb47f9e3cb65e7d1d2 Author: feynmanlin <315157...@qq.com> AuthorDate: Tue Jan 14 10:41:59 2025 +0800 [fix] [broker] Fix acknowledgeCumulativeAsync block when ackReceipt is enabled (#23841) --- .../apache/pulsar/client/impl/ConsumerAckTest.java | 34 ++++++++++++++++++++++ .../PersistentAcknowledgmentsGroupingTracker.java | 7 ++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java index a83283bc267..6d9025fd966 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java @@ -116,6 +116,40 @@ public class ConsumerAckTest extends ProducerConsumerBase { Assert.assertTrue(e.getCause() instanceof PulsarClientException.NotAllowedException); } } + @Test(timeOut = 30000) + public void testAckReceipt() throws Exception { + String topic = "testAckReceipt"; + @Cleanup + Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + @Cleanup + ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .subscribe(); + for (int i = 0; i < 10; i++) { + producer.send(i); + } + Message<Integer> message = consumer.receive(); + MessageId messageId = message.getMessageId(); + consumer.acknowledgeCumulativeAsync(messageId).get(); + consumer.acknowledgeCumulativeAsync(messageId).get(); + consumer.close(); + @Cleanup + ConsumerImpl<Integer> consumer2 = (ConsumerImpl<Integer>) pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName("sub") + .isAckReceiptEnabled(true) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .subscribe(); + message = consumer2.receive(); + messageId = message.getMessageId(); + consumer2.acknowledgeCumulativeAsync(messageId).get(); + consumer2.acknowledgeCumulativeAsync(messageId).get(); + } @Test public void testIndividualAck() throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index c0ee13b346a..d30c3de0fd7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -312,7 +312,12 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments Optional<Lock> readLock = acquireReadLock(); try { doCumulativeAckAsync(messageId, bitSet); - return readLock.map(__ -> currentCumulativeAckFuture).orElse(CompletableFuture.completedFuture(null)); + return readLock.map(__ -> { + if (consumer.isAckReceiptEnabled() && lastCumulativeAck.compareTo(messageId) == 0) { + return CompletableFuture.<Void>completedFuture(null); + } + return currentCumulativeAckFuture; + }).orElse(CompletableFuture.completedFuture(null)); } finally { readLock.ifPresent(Lock::unlock); }