This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new eeebed9dd3b [fix][client] Fix repeat consume when using n-ack and 
batched messages (#21116)
eeebed9dd3b is described below

commit eeebed9dd3baa9c194bb04a0752069c0a9163e27
Author: fengyubiao <[email protected]>
AuthorDate: Tue Sep 5 10:28:17 2023 +0800

    [fix][client] Fix repeat consume when using n-ack and batched messages 
(#21116)
    
    (cherry picked from commit 35bb021cdbd9cc9f4f801bd14f95d035a76a0043)
---
 .../BatchMessageWithBatchIndexLevelTest.java       | 88 ++++++++++++++++++++++
 .../PersistentAcknowledgmentsGroupingTracker.java  | 15 +++-
 2 files changed, 101 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
index 731c65dd33d..769b611b0bd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageWithBatchIndexLevelTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.broker.service;
 import com.google.common.collect.Lists;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
@@ -28,9 +30,13 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import java.util.ArrayList;
@@ -41,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 
+@Slf4j
 @Test(groups = "broker")
 public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
 
@@ -281,4 +288,85 @@ public class BatchMessageWithBatchIndexLevelTest extends 
BatchMessageTest {
         Awaitility.await().until(() -> 
getPulsar().getBrokerService().getTopic(topicName, false)
                 
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages()
 == 0);
     }
+
+    @Test
+    public void testNegativeAckAndLongAckDelayWillNotLeadRepeatConsume() 
throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp_");
+        final String subscriptionName = "s1";
+        final int redeliveryDelaySeconds = 2;
+
+        // Create producer and consumer.
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .enableBatching(true)
+                .batchingMaxMessages(1000)
+                .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+                .create();
+        ConsumerImpl<String> consumer = (ConsumerImpl<String>) 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .negativeAckRedeliveryDelay(redeliveryDelaySeconds, 
TimeUnit.SECONDS)
+                .enableBatchIndexAcknowledgment(true)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .acknowledgmentGroupTime(1, TimeUnit.HOURS)
+                .subscribe();
+
+        // Send 10 messages in batch.
+        ArrayList<String> messagesSent = new ArrayList<>();
+        List<CompletableFuture<MessageId>> sendTasks = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            String msg = Integer.valueOf(i).toString();
+            sendTasks.add(producer.sendAsync(Integer.valueOf(i).toString()));
+            messagesSent.add(msg);
+        }
+        producer.flush();
+        FutureUtil.waitForAll(sendTasks).join();
+
+        // Receive messages.
+        ArrayList<String> messagesReceived = new ArrayList<>();
+        // NegativeAck "batchMessageIdIndex1" once.
+        boolean index1HasBeenNegativeAcked = false;
+        while (true) {
+            Message<String> message = consumer.receive(2, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            if (index1HasBeenNegativeAcked) {
+                messagesReceived.add(message.getValue());
+                consumer.acknowledge(message);
+                continue;
+            }
+            if (message.getMessageId() instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
message.getMessageId();
+                if (batchMessageId.getBatchIndex() == 1) {
+                    consumer.negativeAcknowledge(message);
+                    index1HasBeenNegativeAcked = true;
+                    continue;
+                }
+            }
+            messagesReceived.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        // Receive negative acked messages.
+        // Wait the message negative acknowledgment finished.
+        int tripleRedeliveryDelaySeconds = redeliveryDelaySeconds * 3;
+        while (true) {
+            Message<String> message = 
consumer.receive(tripleRedeliveryDelaySeconds, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            messagesReceived.add(message.getValue());
+            consumer.acknowledge(message);
+        }
+
+        log.info("messagesSent: {}, messagesReceived: {}", messagesSent, 
messagesReceived);
+        Assert.assertEquals(messagesReceived.size(), messagesSent.size());
+
+        // cleanup.
+        producer.close();
+        consumer.close();
+        admin.topics().delete(topicName);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 10b65fed431..b3b2eb0e5ec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -119,10 +119,21 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
             // Already included in a cumulative ack
             return true;
         } else {
-            final MessageIdImpl messageIdImpl = (messageId instanceof 
BatchMessageIdImpl)
+            final MessageIdImpl key = (messageId instanceof BatchMessageIdImpl)
                     ? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
                     : (MessageIdImpl) messageId;
-            return pendingIndividualAcks.contains(messageIdImpl);
+            // If "batchIndexAckEnabled" is false, the batched messages 
acknowledgment will be traced by
+            // pendingIndividualAcks. So no matter what type the message ID 
is, check with "pendingIndividualAcks"
+            // first.
+            if (pendingIndividualAcks.contains(key)) {
+                return true;
+            }
+            if (messageId instanceof BatchMessageIdImpl) {
+                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) 
messageId;
+                ConcurrentBitSetRecyclable bitSet = 
pendingIndividualBatchIndexAcks.get(key);
+                return bitSet != null && 
!bitSet.get(batchMessageId.getBatchIndex());
+            }
+            return false;
         }
     }
 

Reply via email to