This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5db06a1f5a6 [fix][broker] Fix deadlock in broker after race condition
in topic creation failure (#15570)
5db06a1f5a6 is described below
commit 5db06a1f5a6ca776365825e4b32c8244914cc29a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu May 12 21:06:46 2022 -0700
[fix][broker] Fix deadlock in broker after race condition in topic creation
failure (#15570)
* Fix deadlock in broker after race condition in topic creation failure
* Fixed checkstyle
---
.../java/org/apache/pulsar/broker/service/BrokerService.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 8873eb0992c..7325da6a51c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1400,9 +1400,10 @@ public class BrokerService implements Closeable {
if
(topicFuture.isCompletedExceptionally()) {
log.warn("{} future is already
completed with failure {}, closing the"
+ " topic", topic,
FutureUtil.getException(topicFuture));
-
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
- topics.remove(topic,
topicFuture);
- });
+
persistentTopic.stopReplProducers()
+ .whenCompleteAsync((v,
exception) -> {
+
topics.remove(topic, topicFuture);
+ }, executor());
} else {
addTopicToStatsMaps(topicName,
persistentTopic);
topicFuture.complete(Optional.of(persistentTopic));
@@ -1411,10 +1412,10 @@ public class BrokerService implements Closeable {
.exceptionally((ex) -> {
log.warn("Replication or dedup
check failed."
+ " Removing topic from
topics list {}, {}", topic, ex);
-
persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic,
topicFuture);
topicFuture.completeExceptionally(ex);
- });
+ }, executor());
return null;
});
} catch (PulsarServerException e) {