This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9c76b0902cd [fix][client]Fix deadlock issue of consumer while using
multiple IO threads (#20669)
9c76b0902cd is described below
commit 9c76b0902cdf4535f7cfbc4e8ee5833dc58c766d
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jun 29 10:05:31 2023 +0800
[fix][client]Fix deadlock issue of consumer while using multiple IO threads
(#20669)
(cherry picked from commit d3a8c2289925edcabfa01b5bb69f716e084c7360)
---
.../pulsar/client/api/MultiTopicsConsumerTest.java | 20 ++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 37 +++++++++++-----------
2 files changed, 38 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index b8ea87ab401..315ce378d69 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -76,6 +76,11 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
super.internalCleanup();
}
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder
clientBuilder) {
+ clientBuilder.ioThreads(4).connectionsPerBroker(4);
+ }
+
// test that reproduces the issue
https://github.com/apache/pulsar/issues/12024
// where closing the consumer leads to an endless receive loop
@Test
@@ -351,4 +356,19 @@ public class MultiTopicsConsumerTest extends
ProducerConsumerBase {
}
consumer.close();
}
+
+ @Test(invocationCount = 10, timeOut = 30000)
+ public void testMultipleIOThreads() throws PulsarAdminException,
PulsarClientException {
+ final var topic = TopicName.get(newTopicName()).toString();
+ final var numPartitions = 100;
+ admin.topics().createPartitionedTopic(topic, numPartitions);
+ for (int i = 0; i < 100; i++) {
+ admin.topics().createNonPartitionedTopic(topic + "-" + i);
+ }
+ @Cleanup
+ final var consumer =
pulsarClient.newConsumer(Schema.INT32).topicsPattern(topic + ".*")
+ .subscriptionName("sub").subscribe();
+ assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+ assertTrue(consumer.isConnected());
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ef0345de919..a0cbcc18e65 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -92,7 +92,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
// sum of topicPartitions, simple topic has 1, partitioned topic equals to
partition number.
AtomicInteger allTopicPartitionsNumber;
- private boolean paused = false;
+ private volatile boolean paused = false;
private final Object pauseMutex = new Object();
// timeout related to auto check and subscribe partition increasement
private volatile Timeout partitionsAutoUpdateTimeout = null;
@@ -1059,29 +1059,28 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new
CompletableFuture<>();
- consumers.compute(topicName, (key, existingValue) -> {
- if (existingValue != null) {
- String errorMessage = String.format("[%s] Failed to
subscribe for topic [%s] in topics consumer. "
- + "Topic is already being subscribed for in other
thread.", topic, topicName);
- log.warn(errorMessage);
- subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
- return existingValue;
- } else {
- internalConfig.setStartPaused(paused);
- ConsumerImpl<T> newConsumer =
createInternalConsumer(internalConfig, topicName,
- -1, subFuture, createIfDoesNotExist, schema);
-
- synchronized (pauseMutex) {
+ synchronized (pauseMutex) {
+ consumers.compute(topicName, (key, existingValue) -> {
+ if (existingValue != null) {
+ String errorMessage =
+ String.format("[%s] Failed to subscribe for
topic [%s] in topics consumer. "
+ + "Topic is already being subscribed for in
other thread.", topic, topicName);
+ log.warn(errorMessage);
+ subscribeResult.completeExceptionally(new
PulsarClientException(errorMessage));
+ return existingValue;
+ } else {
+ internalConfig.setStartPaused(paused);
+ ConsumerImpl<T> newConsumer =
createInternalConsumer(internalConfig, topicName,
+ -1, subFuture, createIfDoesNotExist, schema);
if (paused) {
newConsumer.pause();
} else {
newConsumer.resume();
}
+ return newConsumer;
}
- return newConsumer;
- }
- });
-
+ });
+ }
futureList = Collections.singletonList(subFuture);
}
@@ -1408,7 +1407,7 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
}
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for
partitionName: {}",
- topicName, newConsumer.getTopic(),
partitionName);
+ topicName, newConsumer.getTopic(),
partitionName);
}
return subFuture;
})