This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 48dbf01f3e981856fa3b2129ae8d2963f494dc52 Author: Jiwei Guo <[email protected]> AuthorDate: Tue Feb 15 21:39:10 2022 +0800 Fix batch ack count is negtive issue. (#14288) ### Motivation As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery. But after diving into the logic, we find it's a bug. The test is copy from #14246 : ``` for (int i = 0; i < 50; i++) { Message<String> msg = consumer.receive(); if (i % 2 == 0) { consumer.acknowledgeAsync(msg); } else { consumer.negativeAcknowledge(msg); } } ``` When msg is `negativeAcknowledge`, Consumer#redeliverUnacknowledgedMessages will invoke: https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912 When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that. (Dispatch messages will add messages to pendingAcks) So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`. `acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in : https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561 It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545. (cherry picked from commit 6b828b41382e5a94f89d628aca38871ccff8df9d) --- .../org/apache/pulsar/broker/service/Consumer.java | 6 +- .../BatchMessageWithBatchIndexLevelTest.java | 96 +++++++++++++++++++++- 2 files changed, 96 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 00511f4..bfaa660 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -526,7 +526,8 @@ public class Consumer { private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) { long ackedCount = 0; - if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { + if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType) + && pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { long[] cursorAckSet = getCursorAckSet(position); if (cursorAckSet != null) { BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); @@ -537,7 +538,7 @@ public class Consumer { int currentCardinality = cursorBitSet.cardinality(); ackedCount = lastCardinality - currentCardinality; cursorBitSet.recycle(); - } else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) { + } else { ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); } } @@ -558,6 +559,7 @@ public class Consumer { if (cursorAckSet != null) { BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); unAckedCount = cursorBitSet.cardinality(); + cursorBitSet.recycle(); } } return unAckedCount; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java index c3785c7..b953772 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java @@ -19,24 +19,24 @@ package org.apache.pulsar.broker.service; import com.google.common.collect.Lists; +import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - import static org.testng.Assert.assertEquals; @Test(groups = "broker") @@ -56,7 +56,8 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { final String topicName = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID(); final String subscriptionName = "sub-batch-1"; - ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient + @Cleanup + Consumer<byte[]> consumer = pulsarClient .newConsumer() .topic(topicName) .subscriptionName(subscriptionName) @@ -66,6 +67,7 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) .subscribe(); + @Cleanup Producer<byte[]> producer = pulsarClient .newProducer() .topic(topicName) @@ -107,4 +109,90 @@ public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest { assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16); }); } + + @Test + public void testBatchMessageMultiNegtiveAck() throws Exception{ + final String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID(); + final String subscriptionName = "sub-negtive-1"; + + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(10) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .subscribe(); + + @Cleanup + Producer<String> producer = pulsarClient + .newProducer(Schema.STRING) + .topic(topicName) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(true) + .create(); + + final int N = 20; + for (int i = 0; i < N; i++) { + String value = "test-" + i; + producer.sendAsync(value); + } + producer.flush(); + for (int i = 0; i < N; i++) { + Message<String> msg = consumer.receive(); + if (i % 2 == 0) { + consumer.acknowledgeAsync(msg); + } else { + consumer.negativeAcknowledge(msg); + } + } + Awaitility.await().untilAsserted(() -> { + long unackedMessages = admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName) + .getUnackedMessages(); + assertEquals(unackedMessages, 10); + }); + + // Test negtive ack with sleep + final String topicName2 = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck2-" + UUID.randomUUID(); + final String subscriptionName2 = "sub-negtive-2"; + @Cleanup + Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName2) + .subscriptionName(subscriptionName2) + .subscriptionType(SubscriptionType.Shared) + .receiverQueueSize(10) + .enableBatchIndexAcknowledgment(true) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .subscribe(); + @Cleanup + Producer<String> producer2 = pulsarClient + .newProducer(Schema.STRING) + .topic(topicName2) + .batchingMaxMessages(20) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .enableBatching(true) + .create(); + + for (int i = 0; i < N; i++) { + String value = "test-" + i; + producer2.sendAsync(value); + } + producer2.flush(); + for (int i = 0; i < N; i++) { + Message<String> msg = consumer2.receive(); + if (i % 2 == 0) { + consumer.acknowledgeAsync(msg); + } else { + consumer.negativeAcknowledge(msg); + Thread.sleep(100); + } + } + Awaitility.await().untilAsserted(() -> { + long unackedMessages = admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName) + .getUnackedMessages(); + assertEquals(unackedMessages, 10); + }); + } }
