This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 3b164f5ac0c Revert "[improve][broker] Avoid reconnection when a 
partitioned topic was created concurrently (#16043)"
3b164f5ac0c is described below

commit 3b164f5ac0c900d6c162596892ec7e702e48f937
Author: Yunze Xu <[email protected]>
AuthorDate: Fri Jul 29 11:25:00 2022 +0800

    Revert "[improve][broker] Avoid reconnection when a partitioned topic was 
created concurrently (#16043)"
    
    This reverts commit b01d5d2da269c5f072167e8f21d569e25a35306f.
---
 .../pulsar/broker/service/BrokerService.java       | 48 +++++-----------------
 1 file changed, 10 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d943734e0a8..bdaded637b6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,44 +2412,16 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
-                                CompletableFuture<PartitionedTopicMetadata> 
future = new CompletableFuture<>();
-
-                                // There are a couple of potentially blocking 
calls, which we cannot make from the
-                                // MetadataStore callback thread.
-                                pulsar.getExecutor().execute(() -> {
-                                    // If topic is already exist, creating 
partitioned topic is not allowed.
-
-                                    if (metadata.partitions == 0
-                                            && !topicExists
-                                            && !topicName.isPartitioned()
-                                            && 
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
-                                            && 
pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-
-                                        
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
-                                                .thenAccept(md -> 
future.complete(md))
-                                                .exceptionally(ex -> {
-                                                    if (ex.getCause()
-                                                            instanceof 
MetadataStoreException.AlreadyExistsException) {
-                                                        // The partitioned 
topic might be created concurrently
-                                                        
fetchPartitionedTopicMetadataAsync(topicName)
-                                                                
.whenComplete((metadata2, ex2) -> {
-                                                                    if (ex2 == 
null) {
-                                                                        
future.complete(metadata2);
-                                                                    } else {
-                                                                        
future.completeExceptionally(ex2);
-                                                                    }
-                                                                });
-                                                    } else {
-                                                        
future.completeExceptionally(ex);
-                                                    }
-                                                    return null;
-                                                });
-                                    } else {
-                                        future.complete(metadata);
-                                    }
-                                });
-
-                                return future;
+                                // If topic is already exist, creating 
partitioned topic is not allowed.
+                                if (metadata.partitions == 0
+                                        && !topicExists
+                                        && !topicName.isPartitioned()
+                                        && 
pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+                                        && 
pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+                                    return 
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+                                } else {
+                                    return 
CompletableFuture.completedFuture(metadata);
+                                }
                             });
                 });
     }

Reply via email to