This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new bcdcae29d64 [fix][client] Fix repeat consume when using n-ack and
batched messages (#21116)
bcdcae29d64 is described below
commit bcdcae29d64dd33bc3f0586a9d72167916a4a095
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 5 10:28:17 2023 +0800
[fix][client] Fix repeat consume when using n-ack and batched messages
(#21116)
(cherry picked from commit 35bb021cdbd9cc9f4f801bd14f95d035a76a0043)
---
.../BatchMessageWithBatchIndexLevelTest.java | 88 ++++++++++++++++++++++
.../PersistentAcknowledgmentsGroupingTracker.java | 15 +++-
2 files changed, 101 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 411a53aa821..9c9264c80cf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -19,8 +19,11 @@
package org.apache.pulsar.broker.service;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
import lombok.Cleanup;
import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
@@ -28,9 +31,13 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
+import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.List;
@@ -39,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
+@Slf4j
@Test(groups = "broker")
public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
@@ -199,4 +207,84 @@ public class BatchMessageWithBatchIndexLevelTest extends
BatchMessageTest {
assertEquals(unackedMessages, 10);
});
}
+
+ @Test
+ public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume()
throws Exception {
+ final String topicName =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp_");
+ final String subscriptionName = "s1";
+ final int redeliveryDelaySeconds = 2;
+
+ // Create producer and consumer.
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topicName)
+ .enableBatching(true)
+ .batchingMaxMessages(1000)
+ .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+ .create();
+ ConsumerImpl<String> consumer = (ConsumerImpl<String>)
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topicName)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .negativeAckRedeliveryDelay(redeliveryDelaySeconds,
TimeUnit.SECONDS)
+ .enableBatchIndexAcknowledgment(true)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .acknowledgmentGroupTime(1, TimeUnit.HOURS)
+ .subscribe();
+
+ // Send 10 messages in batch.
+ ArrayList<String> messagesSent = new ArrayList<>();
+ List<CompletableFuture<MessageId>> sendTasks = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String msg = Integer.valueOf(i).toString();
+ sendTasks.add(producer.sendAsync(Integer.valueOf(i).toString()));
+ messagesSent.add(msg);
+ }
+ producer.flush();
+ FutureUtil.waitForAll(sendTasks).join();
+
+ // Receive messages.
+ ArrayList<String> messagesReceived = new ArrayList<>();
+ // NegativeAck "batchMessageIdIndex1" once.
+ boolean index1HasBeenNegativeAcked = false;
+ while (true) {
+ Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ if (index1HasBeenNegativeAcked) {
+ messagesReceived.add(message.getValue());
+ consumer.acknowledge(message);
+ continue;
+ }
+ if (message.getMessageId() instanceof BatchMessageIdImpl
batchMessageId) {
+ if (batchMessageId.getBatchIndex() == 1) {
+ consumer.negativeAcknowledge(message);
+ index1HasBeenNegativeAcked = true;
+ continue;
+ }
+ }
+ messagesReceived.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ // Receive negative acked messages.
+ // Wait the message negative acknowledgment finished.
+ int tripleRedeliveryDelaySeconds = redeliveryDelaySeconds * 3;
+ while (true) {
+ Message<String> message =
consumer.receive(tripleRedeliveryDelaySeconds, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+ messagesReceived.add(message.getValue());
+ consumer.acknowledge(message);
+ }
+
+ log.info("messagesSent: {}, messagesReceived: {}", messagesSent,
messagesReceived);
+ Assert.assertEquals(messagesReceived.size(), messagesSent.size());
+
+ // cleanup.
+ producer.close();
+ consumer.close();
+ admin.topics().delete(topicName);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 74519583d56..5b45f9ed13e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -120,10 +120,21 @@ public class PersistentAcknowledgmentsGroupingTracker
implements Acknowledgments
// Already included in a cumulative ack
return true;
} else {
- final MessageIdImpl messageIdImpl = (messageId instanceof
BatchMessageIdImpl)
+ final MessageIdImpl key = (messageId instanceof BatchMessageIdImpl)
? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
: (MessageIdImpl) messageId;
- return pendingIndividualAcks.contains(messageIdImpl);
+ // If "batchIndexAckEnabled" is false, the batched messages
acknowledgment will be traced by
+ // pendingIndividualAcks. So no matter what type the message ID
is, check with "pendingIndividualAcks"
+ // first.
+ if (pendingIndividualAcks.contains(key)) {
+ return true;
+ }
+ if (messageId instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)
messageId;
+ ConcurrentBitSetRecyclable bitSet =
pendingIndividualBatchIndexAcks.get(key);
+ return bitSet != null &&
!bitSet.get(batchMessageId.getBatchIndex());
+ }
+ return false;
}
}