This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 567a7b9bb6e [fix][broker] Remove synchronous method call in async call 
chain in PersistentTopicsBase (#19387)
567a7b9bb6e is described below

commit 567a7b9bb6e464aac24043a41a37771f14187ba7
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 2 13:05:04 2023 +0200

    [fix][broker] Remove synchronous method call in async call chain in 
PersistentTopicsBase (#19387)
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 78834b24fd1..158c949e271 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3390,20 +3390,22 @@ public class PersistentTopicsBase extends AdminResource 
{
 
         return validateTopicPolicyOperationAsync(topicName, 
PolicyName.REPLICATION, PolicyOperation.WRITE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
-                .thenAccept(__ -> {
+                .thenCompose(__ -> {
                     Set<String> replicationClusters = 
Sets.newHashSet(clusterIds);
                     if (replicationClusters.contains("global")) {
                         throw new RestException(Status.PRECONDITION_FAILED,
                                 "Cannot specify global in the list of 
replication clusters");
                     }
                     Set<String> clusters = clusters();
+                    List<CompletableFuture<Void>> futures = new 
ArrayList<>(replicationClusters.size());
                     for (String clusterId : replicationClusters) {
                         if (!clusters.contains(clusterId)) {
                             throw new RestException(Status.FORBIDDEN, "Invalid 
cluster id: " + clusterId);
                         }
-                        validatePeerClusterConflict(clusterId, 
replicationClusters);
-                        validateClusterForTenant(namespaceName.getTenant(), 
clusterId);
+                        
futures.add(validatePeerClusterConflictAsync(clusterId, replicationClusters));
+                        
futures.add(validateClusterForTenantAsync(namespaceName.getTenant(), 
clusterId));
                     }
+                    return FutureUtil.waitForAll(futures);
                 }).thenCompose(__ ->
                     getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op 
-> {
                             TopicPolicies topicPolicies = 
op.orElseGet(TopicPolicies::new);

Reply via email to