gaoran10 commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574055874
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ########## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) - .thenAccept(clusters -> { - for (String cluster : clusters) { - if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { - // this call happens in the background without async composition. completion is logged. - pulsar().getPulsarResources().getClusterResources() - .getClusterAsync(cluster) - .thenCompose(clusterDataOp -> - ((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), - numPartitions, - true, null)) - .whenComplete((__, ex) -> { - if (ex != null) { - log.error( - "[{}] Failed to create partitioned topic {} in cluster {}.", - clientAppId(), topicName, cluster, ex); - } else { - log.info( - "[{}] Successfully created partitioned topic {} in " - + "cluster {}", - clientAppId(), topicName, cluster); - } - }); - } + .thenAccept(clusters -> { + // this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); + }); + } + + protected Map<String, CompletableFuture<Void>> internalCreatePartitionedTopicToReplicatedClustersInBackground( + Set<String> clusters, int numPartitions) { + final String shortTopicName = topicName.getPartitionedTopicName(); + Map<String, CompletableFuture<Void>> tasksForAllClusters = new HashMap<>(); + for (String cluster : clusters) { + if (cluster.equals(pulsar().getConfiguration().getClusterName())) { + continue; + } + ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); + CompletableFuture<Void> createRemoteTopicFuture = new CompletableFuture<>(); + tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { + if (ex1 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. + log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" + + " {}.", clientAppId(), topicName, cluster, ex1); + createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); + return; + } + // Get cluster data success. + TopicsImpl topics = + (TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); + topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) + .whenComplete((ignore, ex2) -> { + if (ex2 == null) { + // Create success. + log.info("[{}] Successfully created partitioned topic {} in cluster {}", + clientAppId(), topicName, cluster); + createRemoteTopicFuture.complete(null); + return; + } + // Create topic on the remote cluster error. + Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); + // The topic has been created before, check the partitions count is expected. + if (unwrapEx2 instanceof PulsarAdminException.ConflictException) { + topics.getPartitionedTopicMetadataAsync(shortTopicName).whenComplete((topicMeta, ex3) -> { + if (ex3 != null) { + // Unexpected error, such as NPE. Catch all error to avoid the + // "createRemoteTopicFuture" stuck. + log.error("[{}] Failed to check remote-cluster's topic metadata when creating" + + " partitioned topic {} in cluster {}.", + clientAppId(), topicName, cluster, ex3); + createRemoteTopicFuture.completeExceptionally(new RestException(ex3)); + } + // Call get partitioned metadata of remote cluster success. + if (topicMeta.partitions == numPartitions) { Review Comment: Maybe we can improve the method `Topics#createPartitionedTopicAsync(String topic, int numPartitions, Map<String, String> properties)` to make it provide an error code for the partition count conflict problem, if so we don't need to check the partition count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org