This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 6748d93c2e3 [fix][client] Fix multi-topic consumer stuck after
redeliver messages (#18491)
6748d93c2e3 is described below
commit 6748d93c2e373f485fab7e0dbf1c0dbae396ae7a
Author: fengyubiao <[email protected]>
AuthorDate: Fri Nov 18 14:40:55 2022 +0800
[fix][client] Fix multi-topic consumer stuck after redeliver messages
(#18491)
(cherry picked from commit 7a93ff92335ea9a73c73dafb925cad9583440039)
---
.../pulsar/client/impl/NegativeAcksTest.java | 55 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 2 +-
2 files changed, 56 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
index 25a2539582e..e7cc8dfd314 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -25,6 +25,7 @@ import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
@@ -39,6 +40,9 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.powermock.reflect.Whitebox;
+import org.awaitility.Awaitility;
+import org.testcontainers.shaded.org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -355,4 +359,55 @@ public class NegativeAcksTest extends ProducerConsumerBase
{
// There should be no more messages
assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
}
+
+ @Test(invocationCount = 5)
+ public void testMultiTopicConsumerConcurrentRedeliverAndReceive() throws
Exception {
+ final String topic = BrokerTestUtil.newUniqueName("my-topic");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ final int receiverQueueSize = 10;
+
+ @Cleanup
+ MultiTopicsConsumerImpl<Integer> consumer =
+ (MultiTopicsConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("sub")
+ .receiverQueueSize(receiverQueueSize)
+ .subscribe();
+ ExecutorService internalPinnedExecutor =
+ WhiteboxImpl.getInternalState(consumer,
"internalPinnedExecutor");
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < receiverQueueSize; i++){
+ producer.send(i);
+ }
+
+ Awaitility.await().until(() -> consumer.incomingMessages.size() ==
receiverQueueSize);
+
+ // For testing the race condition of issue #18491
+ // We need to inject a delay for the pinned internal thread
+ Thread.sleep(1000L);
+ internalPinnedExecutor.submit(() ->
consumer.redeliverUnacknowledgedMessages()).get();
+ // Make sure the message redelivery is completed. The incoming queue
will be cleaned up during the redelivery.
+ internalPinnedExecutor.submit(() -> {}).get();
+
+ Set<Integer> receivedMsgs = new HashSet<>();
+ for (;;){
+ Message<Integer> msg = consumer.receive(2, TimeUnit.SECONDS);
+ if (msg == null){
+ break;
+ }
+ receivedMsgs.add(msg.getValue());
+ }
+ Assert.assertEquals(receivedMsgs.size(), 10);
+
+ producer.close();
+ consumer.close();
+ admin.topics().deletePartitionedTopic("persistent://public/default/" +
topic);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 138e8535bca..2068969d12a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -709,8 +709,8 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
});
clearIncomingMessages();
unAckedMessageTracker.clear();
+ resumeReceivingFromPausedConsumersIfNeeded();
});
- resumeReceivingFromPausedConsumersIfNeeded();
}
@Override