This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 999db2c7f5a [fix] [broker] Fix acknowledgeCumulativeAsync block when 
ackReceipt is enabled (#23841)
999db2c7f5a is described below

commit 999db2c7f5a8a3c09e2055fb47f9e3cb65e7d1d2
Author: feynmanlin <315157...@qq.com>
AuthorDate: Tue Jan 14 10:41:59 2025 +0800

    [fix] [broker] Fix acknowledgeCumulativeAsync block when ackReceipt is 
enabled (#23841)
---
 .../apache/pulsar/client/impl/ConsumerAckTest.java | 34 ++++++++++++++++++++++
 .../PersistentAcknowledgmentsGroupingTracker.java  |  7 ++++-
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index a83283bc267..6d9025fd966 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -116,6 +116,40 @@ public class ConsumerAckTest extends ProducerConsumerBase {
             Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.NotAllowedException);
         }
     }
+    @Test(timeOut = 30000)
+    public void testAckReceipt() throws Exception {
+        String topic = "testAckReceipt";
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+        @Cleanup
+        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .isAckReceiptEnabled(true)
+                .subscribe();
+        for (int i = 0; i < 10; i++) {
+            producer.send(i);
+        }
+        Message<Integer> message = consumer.receive();
+        MessageId messageId = message.getMessageId();
+        consumer.acknowledgeCumulativeAsync(messageId).get();
+        consumer.acknowledgeCumulativeAsync(messageId).get();
+        consumer.close();
+        @Cleanup
+        ConsumerImpl<Integer> consumer2 = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .isAckReceiptEnabled(true)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscribe();
+        message = consumer2.receive();
+        messageId = message.getMessageId();
+        consumer2.acknowledgeCumulativeAsync(messageId).get();
+        consumer2.acknowledgeCumulativeAsync(messageId).get();
+    }
 
     @Test
     public void testIndividualAck() throws Exception {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index c0ee13b346a..d30c3de0fd7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -312,7 +312,12 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             Optional<Lock> readLock = acquireReadLock();
             try {
                 doCumulativeAckAsync(messageId, bitSet);
-                return readLock.map(__ -> 
currentCumulativeAckFuture).orElse(CompletableFuture.completedFuture(null));
+                return readLock.map(__ -> {
+                    if (consumer.isAckReceiptEnabled() && 
lastCumulativeAck.compareTo(messageId) == 0) {
+                        return CompletableFuture.<Void>completedFuture(null);
+                    }
+                    return currentCumulativeAckFuture;
+                }).orElse(CompletableFuture.completedFuture(null));
             } finally {
                 readLock.ifPresent(Lock::unlock);
             }

Reply via email to