This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 975f643b97f63fc66c0fdc4db0d3f0c07ffb4b61
Author: fengyubiao <[email protected]>
AuthorDate: Fri Nov 18 14:40:55 2022 +0800

    [fix][client] Fix multi-topic consumer stuck after redeliver messages 
(#18491)
    
    (cherry picked from commit 7a93ff92335ea9a73c73dafb925cad9583440039)
---
 .../pulsar/client/impl/NegativeAcksTest.java       | 54 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  2 +-
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index ced0bec8fcb..a340423a6d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
@@ -33,6 +34,8 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -231,4 +234,55 @@ public class NegativeAcksTest extends ProducerConsumerBase 
{
         assertTrue(count > 9);
         Assert.assertEquals(0, datas.size());
     }
+
+    @Test(invocationCount = 5)
+    public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws 
Exception {
+        final String topic = BrokerTestUtil.newUniqueName("my-topic");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final int receiverQueueSize = 10;
+
+        @Cleanup
+        MultiTopicsConsumerImpl<Integer> consumer =
+                (MultiTopicsConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("sub")
+                .receiverQueueSize(receiverQueueSize)
+                .subscribe();
+        ExecutorService internalPinnedExecutor =
+                WhiteboxImpl.getInternalState(consumer, 
"internalPinnedExecutor");
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 0; i < receiverQueueSize; i++){
+            producer.send(i);
+        }
+
+        Awaitility.await().until(() -> consumer.incomingMessages.size() == 
receiverQueueSize);
+
+        // For testing the race condition of issue #18491
+        // We need to inject a delay for the pinned internal thread
+        Thread.sleep(1000L);
+        internalPinnedExecutor.submit(() -> 
consumer.redeliverUnacknowledgedMessages()).get();
+        // Make sure the message redelivery is completed. The incoming queue 
will be cleaned up during the redelivery.
+        internalPinnedExecutor.submit(() -> {}).get();
+
+        Set<Integer> receivedMsgs = new HashSet<>();
+        for (;;){
+            Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
+            if (msg == null){
+                break;
+            }
+            receivedMsgs.add(msg.getValue());
+        }
+        Assert.assertEquals(receivedMsgs.size(), 10);
+
+        producer.close();
+        consumer.close();
+        admin.topics().deletePartitionedTopic("persistent://public/default/" + 
topic);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 226e2a5055d..071889a177b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -639,8 +639,8 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
             });
             clearIncomingMessages();
             unAckedMessageTracker.clear();
+            resumeReceivingFromPausedConsumersIfNeeded();
         });
-        resumeReceivingFromPausedConsumersIfNeeded();
     }
 
     @Override

Reply via email to