This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 83a522f Fix batch ack count is negtive issue. (#14288)
83a522f is described below
commit 83a522f3a17d41eb3727ffee67cdf035e8ea471b
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Feb 15 21:39:10 2022 +0800
Fix batch ack count is negtive issue. (#14288)
As #13383 fixed the batch ack issue. we find that the unack-msg count could
be negative(#14246). At first, we think it was the normal case caused by msg
redelivery. But after diving into the logic, we find it's a bug.
The test is copy from #14246 :
```
for (int i = 0; i < 50; i++) {
Message<String> msg = consumer.receive();
if (i % 2 == 0) {
consumer.acknowledgeAsync(msg);
} else {
consumer.negativeAcknowledge(msg);
}
}
```
When msg is `negativeAcknowledge`,
Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912
When calculating `totalRedeliveryMessages`, it must check `pendingAcks`
contains this message. and remove from `pendingAcks` after that. (Dispatch
messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then
`acknowledgeAsync`.
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease
unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line
556 to 545.
(cherry picked from commit 6b828b41382e5a94f89d628aca38871ccff8df9d)
---
.../org/apache/pulsar/broker/service/Consumer.java | 6 +-
.../BatchMessageWithBatchIndexLevelTest.java | 91 +++++++++++++++++++++-
2 files changed, 94 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 9c75e11..5b70b4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -522,7 +522,8 @@ public class Consumer {
private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position,
long batchSize, long[] ackSets) {
long ackedCount = 0;
- if (isAcknowledgmentAtBatchIndexLevelEnabled &&
Subscription.isIndividualAckMode(subType)) {
+ if (isAcknowledgmentAtBatchIndexLevelEnabled &&
Subscription.isIndividualAckMode(subType)
+ && pendingAcks.get(position.getLedgerId(), position.getEntryId())
!= null) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet =
BitSetRecyclable.create().resetWords(cursorAckSet);
@@ -533,7 +534,7 @@ public class Consumer {
int currentCardinality = cursorBitSet.cardinality();
ackedCount = lastCardinality - currentCardinality;
cursorBitSet.recycle();
- } else if (pendingAcks.get(position.getLedgerId(),
position.getEntryId()) != null) {
+ } else {
ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality();
}
}
@@ -547,6 +548,7 @@ public class Consumer {
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet =
BitSetRecyclable.create().resetWords(cursorAckSet);
unAckedCount = cursorBitSet.cardinality();
+ cursorBitSet.recycle();
}
}
return unAckedCount;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 5e09def..da00b65 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -24,6 +24,7 @@ import lombok.SneakyThrows;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -60,7 +61,8 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
final String topicName = "persistent://prop/ns-abc/batchMessageAck-" +
UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
- ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
@@ -70,6 +72,7 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();
+ @Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topicName)
@@ -173,4 +176,90 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
}
});
}
+
+ @Test
+ public void testBatchMessageMultiNegtiveAck() throws Exception{
+ final String topicName =
"persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
+ final String subscriptionName = "sub-negtive-1";
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(10)
+ .enableBatchIndexAcknowledgment(true)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient
+ .newProducer(Schema.STRING)
+ .topic(topicName)
+ .batchingMaxMessages(20)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .enableBatching(true)
+ .create();
+
+ final int N = 20;
+ for (int i = 0; i < N; i++) {
+ String value = "test-" + i;
+ producer.sendAsync(value);
+ }
+ producer.flush();
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer.receive();
+ if (i % 2 == 0) {
+ consumer.acknowledgeAsync(msg);
+ } else {
+ consumer.negativeAcknowledge(msg);
+ }
+ }
+ Awaitility.await().untilAsserted(() -> {
+ long unackedMessages =
admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName)
+ .getUnackedMessages();
+ assertEquals(unackedMessages, 10);
+ });
+
+ // Test negtive ack with sleep
+ final String topicName2 =
"persistent://prop/ns-abc/batchMessageMultiNegtiveAck2-" + UUID.randomUUID();
+ final String subscriptionName2 = "sub-negtive-2";
+ @Cleanup
+ Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName2)
+ .subscriptionName(subscriptionName2)
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(10)
+ .enableBatchIndexAcknowledgment(true)
+ .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
+ .subscribe();
+ @Cleanup
+ Producer<String> producer2 = pulsarClient
+ .newProducer(Schema.STRING)
+ .topic(topicName2)
+ .batchingMaxMessages(20)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .enableBatching(true)
+ .create();
+
+ for (int i = 0; i < N; i++) {
+ String value = "test-" + i;
+ producer2.sendAsync(value);
+ }
+ producer2.flush();
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer2.receive();
+ if (i % 2 == 0) {
+ consumer.acknowledgeAsync(msg);
+ } else {
+ consumer.negativeAcknowledge(msg);
+ Thread.sleep(100);
+ }
+ }
+ Awaitility.await().untilAsserted(() -> {
+ long unackedMessages =
admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName)
+ .getUnackedMessages();
+ assertEquals(unackedMessages, 10);
+ });
+ }
}