This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 40579c5ab328c3e9c41a7e49a1c29271ddb61d89 Author: congbo <[email protected]> AuthorDate: Sun Jan 30 01:08:11 2022 +0800 Fix NPE of cumulative ack mode and incorrect unack message count (#14021) link https://github.com/apache/pulsar/pull/13383 ## Motivation #13383 has fixed the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init.   If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize` ``` ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 9 ================ batch size -> 10 ``` ### Modifications add judge `Subscription.isIndividualAckMode(subType)` when get ackCount. If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset. (cherry picked from commit 618f17c155a8e28a99bbcc5e26f9ec3a6c7d9b08) --- .../org/apache/pulsar/broker/service/Consumer.java | 7 ++- .../BatchMessageWithBatchIndexLevelTest.java | 68 ++++++++++++++++++++++ 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index cf9f5bf..678bdcf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -122,6 +122,7 @@ public class Consumer { private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry"); private volatile int avgMessagesPerEntry = 1000; + private static final long [] EMPTY_ACK_SET = new long[0]; private static final double avgPercent = 0.9; private boolean preciseDispatcherFlowControl; @@ -413,10 +414,10 @@ public class Consumer { } } else { position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); - if (isAcknowledgmentAtBatchIndexLevelEnabled) { + if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { - ackedCount = batchSize - BitSet.valueOf(cursorAckSet).cardinality(); + ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET); } else { ackedCount = batchSize; } @@ -521,7 +522,7 @@ public class Consumer { private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) { long ackedCount = 0; - if (isAcknowledgmentAtBatchIndexLevelEnabled) { + if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index 20ba4f1..5e09def 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -19,21 +19,27 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.Lists; +import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.testng.Assert.assertEquals; @@ -105,4 +111,66 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); }); } + + @DataProvider(name = "testSubTypeAndEnableBatch") + public Object[][] testSubTypeAndEnableBatch() { + return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE }, + { SubscriptionType.Failover, Boolean.TRUE }, + { SubscriptionType.Shared, Boolean.FALSE }, + { SubscriptionType.Failover, Boolean.FALSE }}; + } + + + @Test(dataProvider="testSubTypeAndEnableBatch") + private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType, + boolean enableBatch) throws Exception { + + final int messageCount = 50; + final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID(); + final String subscriptionName = "sub-batch-1"; + @Cleanup + ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient + .newConsumer(Schema.BYTES) + .topic(topicName) + .isAckReceiptEnabled(true) + .subscriptionName(subscriptionName) + .subscriptionType(subType) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer() + .enableBatching(enableBatch) + .topic(topicName) + .batchingMaxMessages(10) + .create(); + + CountDownLatch countDownLatch = new CountDownLatch(messageCount); + for (int i = 0; i < messageCount; i++) { + producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); + } + + countDownLatch.await(); + + for (int i = 0; i < messageCount; i++) { + Message<byte[]> message = consumer.receive(); + // wait for receipt + if (i < messageCount / 2) { + consumer.acknowledgeAsync(message.getMessageId()).get(); + } + } + + String topic = TopicName.get(topicName).toString(); + PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topic, false).get().get().getSubscription(subscriptionName); + + Awaitility.await().untilAsserted(() -> { + if (subType == SubscriptionType.Shared) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); + } + }); + } }
