This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 1de29156052 [fix][client] Fix failover/exclusive consumer with batch 
cumulate ack issue. (#18454)
1de29156052 is described below

commit 1de291560524218ad35be6bbbc30e3acc95b53ee
Author: Jiwei Guo <[email protected]>
AuthorDate: Tue Nov 15 11:37:16 2022 +0800

    [fix][client] Fix failover/exclusive consumer with batch cumulate ack 
issue. (#18454)
    
    (cherry picked from commit 7712aa396bfca55a79a45761e0a405e90185e0f8)
---
 .../impl/ConsumerDedupPermitsUpdateTest.java       | 21 ++++--
 .../pulsar/client/impl/NegativeAcksTest.java       | 77 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  9 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 -
 4 files changed, 100 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
index 4c9922acbec..ceb7d7fd484 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdateTest.java
@@ -116,10 +116,23 @@ public class ConsumerDedupPermitsUpdateTest extends 
ProducerConsumerBase {
         }
         producer.flush();
 
-        for (int i = 0; i < 30; i++) {
-            Message<String> msg = consumer.receive();
-            assertEquals(msg.getValue(), "new-message-" + i);
-            consumer.acknowledge(msg);
+        if (batchingEnabled) {
+            for (int i = 0; i < 30; i++) {
+                Message<String> msg = consumer.receive();
+                assertEquals(msg.getValue(), "hello-" + i);
+                consumer.acknowledge(msg);
+            }
+            for (int i = 0; i < 30; i++) {
+                Message<String> msg = consumer.receive();
+                assertEquals(msg.getValue(), "new-message-" + i);
+                consumer.acknowledge(msg);
+            }
+        } else {
+            for (int i = 0; i < 30; i++) {
+                Message<String> msg = consumer.receive();
+                assertEquals(msg.getValue(), "new-message-" + i);
+                consumer.acknowledge(msg);
+            }
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 5eb43af38f7..de130b78270 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertNull;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
@@ -35,6 +36,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -154,4 +156,79 @@ public class NegativeAcksTest extends ProducerConsumerBase 
{
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testFailoverConsumerBatchCumulateAck() throws Exception {
+        final String topic = BrokerTestUtil.newUniqueName("my-topic");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        @Cleanup
+        Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .enableBatchIndexAcknowledgment(true)
+                .acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)
+                .receiverQueueSize(10)
+                .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .batchingMaxMessages(10)
+                .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+                .blockIfQueueFull(true)
+                .create();
+
+        int count = 0;
+        Set<Integer> datas = new HashSet<>();
+        CountDownLatch producerLatch = new CountDownLatch(10);
+        while (count < 10) {
+            datas.add(count);
+            producer.sendAsync(count).whenComplete((m, e) -> {
+                producerLatch.countDown();
+            });
+            count++;
+        }
+        producerLatch.await();
+        CountDownLatch consumerLatch = new CountDownLatch(1);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                consumer.receiveAsync()
+                        .thenCompose(m -> {
+                            log.info("received one msg : {}", 
m.getMessageId());
+                            datas.remove(m.getValue());
+                            return consumer.acknowledgeCumulativeAsync(m);
+                        })
+                        .thenAccept(ignore -> {
+                            try {
+                                Thread.sleep(500);
+                                consumer.redeliverUnacknowledgedMessages();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        })
+                        .whenComplete((r, e) -> {
+                            consumerLatch.countDown();
+                        });
+            }
+        }).start();
+        consumerLatch.await();
+        Thread.sleep(500);
+        count = 0;
+        while(true) {
+            Message<Integer> msg = consumer.receive(5, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            consumer.acknowledgeCumulative(msg);
+            Thread.sleep(200);
+            datas.remove(msg.getValue());
+            log.info("received msg : {}", msg.getMessageId());
+            count++;
+        }
+        Assert.assertEquals(count, 9);
+        Assert.assertEquals(0, datas.size());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1a185d4c17d..5381b280330 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1174,9 +1174,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         final int numMessages = msgMetadata.getNumMessagesInBatch();
         final int numChunks = msgMetadata.hasNumChunksFromMsg() ? 
msgMetadata.getNumChunksFromMsg() : 0;
         final boolean isChunkedMessage = numChunks > 1 && 
conf.getSubscriptionType() != SubscriptionType.Shared;
-
         MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), getPartitionIndex());
-        if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
+        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
+                && acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] Ignoring message as it was already being 
acked earlier by same consumer {}/{}",
                         topic, subscription, consumerName, msgId);
@@ -1429,7 +1429,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         skippedMessages++;
                         continue;
                     }
-
+                }
+                if 
(acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) {
+                    skippedMessages++;
+                    continue;
                 }
                 executeNotifyCallback(message);
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e5cc32af8fe..b2de0e3b92c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -985,7 +985,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     partitionIndex -> {
                         String partitionName = 
TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
-                        configurationData.setStartPaused(paused);
                         ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
                                 partitionIndex, subFuture, 
createIfDoesNotExist, schema);
                         synchronized (pauseMutex) {
@@ -1012,7 +1011,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
                     return existingValue;
                 } else {
-                    internalConfig.setStartPaused(paused);
                     ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
                             -1, subFuture, createIfDoesNotExist, schema);
 
@@ -1328,7 +1326,6 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
                         CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
                         ConsumerConfigurationData<T> configurationData = 
getInternalConsumerConfig();
-                        configurationData.setStartPaused(paused);
                         ConsumerImpl<T> newConsumer = 
createInternalConsumer(configurationData, partitionName,
                                 partitionIndex, subFuture, true, schema);
                         synchronized (pauseMutex) {

Reply via email to