This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 5269c2d411b [fix][client] Fix multi-topic consumer stuck after
redeliver messages (#18491)
5269c2d411b is described below
commit 5269c2d411bac9c41a7856dc433fbbc6eb2dcd73
Author: fengyubiao <[email protected]>
AuthorDate: Fri Nov 18 14:40:55 2022 +0800
[fix][client] Fix multi-topic consumer stuck after redeliver messages
(#18491)
(cherry picked from commit 7a93ff92335ea9a73c73dafb925cad9583440039)
---
.../pulsar/client/impl/NegativeAcksTest.java | 54 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index de130b78270..94727632a92 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -24,6 +24,7 @@ import static org.testng.Assert.assertNull;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -36,6 +37,8 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -231,4 +234,55 @@ public class NegativeAcksTest extends ProducerConsumerBase
{
Assert.assertEquals(count, 9);
Assert.assertEquals(0, datas.size());
}
+
+ @Test(invocationCount = 5)
+ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName("my-topic");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ final int receiverQueueSize = 10;
+
+ @Cleanup
+ MultiTopicsConsumerImpl<Integer> consumer =
+ (MultiTopicsConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .receiverQueueSize(receiverQueueSize)
+ .subscribe();
+ ExecutorService internalPinnedExecutor =
+ WhiteboxImpl.getInternalState(consumer,
"internalPinnedExecutor");
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < receiverQueueSize; i++){
+ producer.send(i);
+ }
+
+ Awaitility.await().until(() -> consumer.incomingMessages.size() ==
receiverQueueSize);
+
+ // For testing the race condition of issue #18491
+ // We need to inject a delay for the pinned internal thread
+ Thread.sleep(1000L);
+ internalPinnedExecutor.submit(() ->
consumer.redeliverUnacknowledgedMessages()).get();
+ // Make sure the message redelivery is completed. The incoming queue
will be cleaned up during the redelivery.
+ internalPinnedExecutor.submit(() -> {}).get();
+
+ Set<Integer> receivedMsgs = new HashSet<>();
+ for (;;){
+ Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null){
+ break;
+ }
+ receivedMsgs.add(msg.getValue());
+ }
+ Assert.assertEquals(receivedMsgs.size(), 10);
+
+ producer.close();
+ consumer.close();
+ admin.topics().deletePartitionedTopic("persistent://public/default/" +
topic);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 2dd6bb9e304..d0d88b068aa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -640,8 +640,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
clearIncomingMessages();
unAckedMessageTracker.clear();
+ resumeReceivingFromPausedConsumersIfNeeded();
});
- resumeReceivingFromPausedConsumersIfNeeded();
}
@Override