This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23d795c  Fix pending batchIndexAcks bitSet batchSize in 
PersistentAcknowledgmentsGroupingTracker (#7828)
23d795c is described below

commit 23d795c08ed54606e71ac8b1de1bc32420e2fbf1
Author: ran <[email protected]>
AuthorDate: Tue Aug 18 17:11:19 2020 +0800

    Fix pending batchIndexAcks bitSet batchSize in 
PersistentAcknowledgmentsGroupingTracker (#7828)
    
    ### Motivation
    
    The pending batchIndexAcks bitSet batchSize is not correct.
    
    ### Modifications
    
    Fix the bitSet batchSize.
---
 .../apache/pulsar/client/impl/BatchMessageIndexAckTest.java  | 12 ++++++++++--
 .../impl/PersistentAcknowledgmentsGroupingTracker.java       |  3 +--
 2 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
index 8f76561..ae10141 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BatchMessageIndexAckTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -58,13 +59,14 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
     }
 
     @Test
-    public void testBatchMessageIndexAckForSharedSubscription() throws 
PulsarClientException, ExecutionException, InterruptedException {
+    public void testBatchMessageIndexAckForSharedSubscription() throws 
Exception {
         final String topic = "testBatchMessageIndexAckForSharedSubscription";
+        final String subscriptionName = "sub";
 
         @Cleanup
         Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
             .topic(topic)
-            .subscriptionName("sub")
+            .subscriptionName(subscriptionName)
             .receiverQueueSize(100)
             .subscriptionType(SubscriptionType.Shared)
             .enableBatchIndexAcknowledgment(true)
@@ -115,6 +117,12 @@ public class BatchMessageIndexAckTest extends 
ProducerConsumerBase {
         Message<Integer> moreMessage = consumer.receive(2, TimeUnit.SECONDS);
         Assert.assertNull(moreMessage);
 
+        // check the mark delete position was changed
+        BatchMessageIdImpl ackedMessageId = (BatchMessageIdImpl) 
received.get(0);
+        PersistentTopicInternalStats stats = 
admin.topics().getInternalStats(topic);
+        String markDeletePosition = 
stats.cursors.get(subscriptionName).markDeletePosition;
+        Assert.assertEquals(ackedMessageId.ledgerId + ":" + 
ackedMessageId.entryId, markDeletePosition);
+
         futures.clear();
         for (int i = 0; i < 50; i++) {
             futures.add(producer.sendAsync(i));
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 17e8c22..6a4deef 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -169,8 +169,7 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.computeIfAbsent(
                 new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), 
msgId.getPartitionIndex()), (v) -> {
                     ConcurrentBitSetRecyclable value = 
ConcurrentBitSetRecyclable.create();
-                    value.set(0, batchSize + 1);
-                    value.clear(batchIndex);
+                    value.set(0, batchSize);
                     return value;
                 });
             bitSet.clear(batchIndex);

Reply via email to