This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9c76b0902cd [fix][client]Fix deadlock issue of consumer while using 
multiple IO threads (#20669)
9c76b0902cd is described below

commit 9c76b0902cdf4535f7cfbc4e8ee5833dc58c766d
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jun 29 10:05:31 2023 +0800

    [fix][client]Fix deadlock issue of consumer while using multiple IO threads 
(#20669)
    
    (cherry picked from commit d3a8c2289925edcabfa01b5bb69f716e084c7360)
---
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 20 ++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       | 37 +++++++++++-----------
 2 files changed, 38 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index b8ea87ab401..315ce378d69 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -76,6 +76,11 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
         super.internalCleanup();
     }
 
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder 
clientBuilder) {
+       clientBuilder.ioThreads(4).connectionsPerBroker(4);
+    }
+
     // test that reproduces the issue 
https://github.com/apache/pulsar/issues/12024
     // where closing the consumer leads to an endless receive loop
     @Test
@@ -351,4 +356,19 @@ public class MultiTopicsConsumerTest extends 
ProducerConsumerBase {
         }
         consumer.close();
     }
+
+    @Test(invocationCount = 10, timeOut = 30000)
+    public void testMultipleIOThreads() throws PulsarAdminException, 
PulsarClientException {
+        final var topic = TopicName.get(newTopicName()).toString();
+        final var numPartitions = 100;
+        admin.topics().createPartitionedTopic(topic, numPartitions);
+        for (int i = 0; i < 100; i++) {
+            admin.topics().createNonPartitionedTopic(topic + "-" + i);
+        }
+        @Cleanup
+        final var consumer = 
pulsarClient.newConsumer(Schema.INT32).topicsPattern(topic + ".*")
+                .subscriptionName("sub").subscribe();
+        assertTrue(consumer instanceof MultiTopicsConsumerImpl);
+        assertTrue(consumer.isConnected());
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ef0345de919..a0cbcc18e65 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -92,7 +92,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     // sum of topicPartitions, simple topic has 1, partitioned topic equals to 
partition number.
     AtomicInteger allTopicPartitionsNumber;
 
-    private boolean paused = false;
+    private volatile boolean paused = false;
     private final Object pauseMutex = new Object();
     // timeout related to auto check and subscribe partition increasement
     private volatile Timeout partitionsAutoUpdateTimeout = null;
@@ -1059,29 +1059,28 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
 
             CompletableFuture<Consumer<T>> subFuture = new 
CompletableFuture<>();
 
-            consumers.compute(topicName, (key, existingValue) -> {
-                if (existingValue != null) {
-                    String errorMessage = String.format("[%s] Failed to 
subscribe for topic [%s] in topics consumer. "
-                            + "Topic is already being subscribed for in other 
thread.", topic, topicName);
-                    log.warn(errorMessage);
-                    subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
-                    return existingValue;
-                } else {
-                    internalConfig.setStartPaused(paused);
-                    ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
-                            -1, subFuture, createIfDoesNotExist, schema);
-
-                    synchronized (pauseMutex) {
+            synchronized (pauseMutex) {
+                consumers.compute(topicName, (key, existingValue) -> {
+                    if (existingValue != null) {
+                        String errorMessage =
+                                String.format("[%s] Failed to subscribe for 
topic [%s] in topics consumer. "
+                                + "Topic is already being subscribed for in 
other thread.", topic, topicName);
+                        log.warn(errorMessage);
+                        subscribeResult.completeExceptionally(new 
PulsarClientException(errorMessage));
+                        return existingValue;
+                    } else {
+                        internalConfig.setStartPaused(paused);
+                        ConsumerImpl<T> newConsumer = 
createInternalConsumer(internalConfig, topicName,
+                                -1, subFuture, createIfDoesNotExist, schema);
                         if (paused) {
                             newConsumer.pause();
                         } else {
                             newConsumer.resume();
                         }
+                        return newConsumer;
                     }
-                    return newConsumer;
-                }
-            });
-
+                });
+            }
             futureList = Collections.singletonList(subFuture);
         }
 
@@ -1408,7 +1407,7 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                         }
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] create consumer {} for 
partitionName: {}",
-                                topicName, newConsumer.getTopic(), 
partitionName);
+                                    topicName, newConsumer.getTopic(), 
partitionName);
                         }
                         return subFuture;
                     })

Reply via email to