This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7604d4aef67 [fix][broker] Replace sync call in async call chain of 
AdminResource#internalCreatePartitionedTopic (#19399)
7604d4aef67 is described below

commit 7604d4aef670e31177d803e9f3c4a912be94267f
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 2 15:03:56 2023 +0200

    [fix][broker] Replace sync call in async call chain of 
AdminResource#internalCreatePartitionedTopic (#19399)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 71 +++++++++++-----------
 1 file changed, 35 insertions(+), 36 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5a8eaab8e6b..dd3ea8535b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -503,23 +503,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
         return getNamespacePolicies(ns);
     }
 
-    protected boolean isNamespaceReplicated(NamespaceName namespaceName) {
-        return getNamespaceReplicatedClusters(namespaceName).size() > 1;
-    }
-
-    protected Set<String> getNamespaceReplicatedClusters(NamespaceName 
namespaceName) {
-        try {
-            final Policies policies = 
namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
-            return policies.replication_clusters;
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get namespace policies {}", 
clientAppId(), namespaceName, e);
-            throw new RestException(e);
-        }
-    }
-
     protected CompletableFuture<Set<String>> 
getNamespaceReplicatedClustersAsync(NamespaceName namespaceName) {
         return namespaceResources().getPoliciesAsync(namespaceName)
                 .thenApply(policies -> {
@@ -616,26 +599,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 .thenCompose(__ -> 
provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly, properties))
                 .thenCompose(__ -> tryCreatePartitionsAsync(numPartitions))
                 .thenRun(() -> {
-                    List<String> replicatedClusters = new ArrayList<>();
-                    if (!createLocalTopicOnly && topicName.isGlobal() && 
isNamespaceReplicated(namespaceName)) {
-                        getNamespaceReplicatedClusters(namespaceName)
-                                .stream()
-                                .filter(cluster -> 
!cluster.equals(pulsar().getConfiguration().getClusterName()))
-                                .forEach(replicatedClusters::add);
+                    if (!createLocalTopicOnly && topicName.isGlobal()) {
+                        
internalCreatePartitionedTopicToReplicatedClustersInBackground(numPartitions);
                     }
-                    replicatedClusters.forEach(cluster -> {
-                        
pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
-                                .thenAccept(clusterDataOp ->
-                                        ((TopicsImpl) 
pulsar().getBrokerService()
-                                                
.getClusterPulsarAdmin(cluster, clusterDataOp).topics())
-                                                .createPartitionedTopicAsync(
-                                                        
topicName.getPartitionedTopicName(), numPartitions,
-                                                        true, null))
-                                .exceptionally(ex -> {
-                                    log.error("Failed to create partition 
topic in cluster {}.", cluster, ex);
-                                    return null;
-                                });
-                    });
                     log.info("[{}] Successfully created partitions for topic 
{} in cluster {}",
                             clientAppId(), topicName, 
pulsar().getConfiguration().getClusterName());
                     asyncResponse.resume(Response.noContent().build());
@@ -647,6 +613,39 @@ public abstract class AdminResource extends 
PulsarWebResource {
                 });
     }
 
+    private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
+        getNamespaceReplicatedClustersAsync(namespaceName)
+                .thenAccept(clusters -> {
+                    for (String cluster : clusters) {
+                        if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
+                            // this call happens in the background without 
async composition. completion is logged.
+                            pulsar().getPulsarResources().getClusterResources()
+                                    .getClusterAsync(cluster)
+                                    .thenCompose(clusterDataOp ->
+                                            ((TopicsImpl) 
pulsar().getBrokerService()
+                                                    
.getClusterPulsarAdmin(cluster,
+                                                            
clusterDataOp).topics())
+                                                    
.createPartitionedTopicAsync(
+                                                            
topicName.getPartitionedTopicName(),
+                                                            numPartitions,
+                                                            true, null))
+                                    .whenComplete((__, ex) -> {
+                                        if (ex != null) {
+                                            log.error(
+                                                    "[{}] Failed to create 
partitioned topic {} in cluster {}.",
+                                                    clientAppId(), topicName, 
cluster, ex);
+                                        } else {
+                                            log.info(
+                                                    "[{}] Successfully created 
partitioned topic {} in "
+                                                            + "cluster {}",
+                                                    clientAppId(), topicName, 
cluster);
+                                        }
+                                    });
+                        }
+                    }
+                });
+    }
+
     /**
      * Check the exists topics contains the given topic.
      * Since there are topic partitions and non-partitioned topics in Pulsar, 
must ensure both partitions

Reply via email to