This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db06a1f5a6 [fix][broker] Fix deadlock in broker after race condition 
in topic creation failure (#15570)
5db06a1f5a6 is described below

commit 5db06a1f5a6ca776365825e4b32c8244914cc29a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 12 21:06:46 2022 -0700

    [fix][broker] Fix deadlock in broker after race condition in topic creation 
failure (#15570)
    
    * Fix deadlock in broker after race condition in topic creation failure
    
    * Fixed checkstyle
---
 .../java/org/apache/pulsar/broker/service/BrokerService.java  | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8873eb0992c..7325da6a51c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1400,9 +1400,10 @@ public class BrokerService implements Closeable {
                                             if 
(topicFuture.isCompletedExceptionally()) {
                                                 log.warn("{} future is already 
completed with failure {}, closing the"
                                                         + " topic", topic, 
FutureUtil.getException(topicFuture));
-                                                
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                                                    topics.remove(topic, 
topicFuture);
-                                                });
+                                                
persistentTopic.stopReplProducers()
+                                                        .whenCompleteAsync((v, 
exception) -> {
+                                                            
topics.remove(topic, topicFuture);
+                                                        }, executor());
                                             } else {
                                                 addTopicToStatsMaps(topicName, 
persistentTopic);
                                                 
topicFuture.complete(Optional.of(persistentTopic));
@@ -1411,10 +1412,10 @@ public class BrokerService implements Closeable {
                                         .exceptionally((ex) -> {
                                             log.warn("Replication or dedup 
check failed."
                                                     + " Removing topic from 
topics list {}, {}", topic, ex);
-                                            
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                            
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
                                                 topics.remove(topic, 
topicFuture);
                                                 
topicFuture.completeExceptionally(ex);
-                                            });
+                                            }, executor());
                                             return null;
                                         });
                             } catch (PulsarServerException e) {

Reply via email to