gaoran10 commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1576003194


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##########
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
     private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
         getNamespaceReplicatedClustersAsync(namespaceName)
-                .thenAccept(clusters -> {
-                    for (String cluster : clusters) {
-                        if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-                            // this call happens in the background without 
async composition. completion is logged.
-                            pulsar().getPulsarResources().getClusterResources()
-                                    .getClusterAsync(cluster)
-                                    .thenCompose(clusterDataOp ->
-                                            ((TopicsImpl) 
pulsar().getBrokerService()
-                                                    
.getClusterPulsarAdmin(cluster,
-                                                            
clusterDataOp).topics())
-                                                    
.createPartitionedTopicAsync(
-                                                            
topicName.getPartitionedTopicName(),
-                                                            numPartitions,
-                                                            true, null))
-                                    .whenComplete((__, ex) -> {
-                                        if (ex != null) {
-                                            log.error(
-                                                    "[{}] Failed to create 
partitioned topic {} in cluster {}.",
-                                                    clientAppId(), topicName, 
cluster, ex);
-                                        } else {
-                                            log.info(
-                                                    "[{}] Successfully created 
partitioned topic {} in "
-                                                            + "cluster {}",
-                                                    clientAppId(), topicName, 
cluster);
-                                        }
-                                    });
-                        }
+            .thenAccept(clusters -> {
+                // this call happens in the background without async 
composition. completion is logged.
+                
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+            });
+    }
+
+    protected Map<String, CompletableFuture<Void>> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+            Set<String> clusters, int numPartitions) {
+        final String shortTopicName = topicName.getPartitionedTopicName();
+        Map<String, CompletableFuture<Void>> tasksForAllClusters = new 
HashMap<>();
+        for (String cluster : clusters) {
+            if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+                continue;
+            }
+            ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+            CompletableFuture<Void> createRemoteTopicFuture = new 
CompletableFuture<>();
+            tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+            
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+                if (ex1 != null) {
+                    // Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+                    log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
+                                    + " {}.", clientAppId(), topicName, 
cluster, ex1);
+                    createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+                    return;
+                }
+                // Get cluster data success.
+                TopicsImpl topics =
+                        (TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+                topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+                        .whenComplete((ignore, ex2) -> {
+                    if (ex2 == null) {
+                        // Create success.
+                        log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+                                clientAppId(), topicName, cluster);
+                        createRemoteTopicFuture.complete(null);
+                        return;
+                    }
+                    // Create topic on the remote cluster error.
+                    Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+                    // The topic has been created before, check the partitions 
count is expected.
+                    if (unwrapEx2 instanceof 
PulsarAdminException.ConflictException) {
+                        
topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta,
 ex3) -> {
+                            if (ex3 != null) {
+                                // Unexpected error, such as NPE. Catch all 
error to avoid the
+                                // "createRemoteTopicFuture" stuck.
+                                log.error("[{}] Failed to check 
remote-cluster's topic metadata when creating"
+                                                + " partitioned topic {} in 
cluster {}.",
+                                        clientAppId(), topicName, cluster, 
ex3);
+                                
createRemoteTopicFuture.completeExceptionally(new RestException(ex3));
+                            }
+                            // Call get partitioned metadata of remote cluster 
success.
+                            if (topicMeta.partitions == numPartitions) {

Review Comment:
   Got it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to