This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 567a7b9bb6e [fix][broker] Remove synchronous method call in async call
chain in PersistentTopicsBase (#19387)
567a7b9bb6e is described below
commit 567a7b9bb6e464aac24043a41a37771f14187ba7
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 2 13:05:04 2023 +0200
[fix][broker] Remove synchronous method call in async call chain in
PersistentTopicsBase (#19387)
---
.../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 78834b24fd1..158c949e271 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3390,20 +3390,22 @@ public class PersistentTopicsBase extends AdminResource
{
return validateTopicPolicyOperationAsync(topicName,
PolicyName.REPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
- .thenAccept(__ -> {
+ .thenCompose(__ -> {
Set<String> replicationClusters =
Sets.newHashSet(clusterIds);
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of
replication clusters");
}
Set<String> clusters = clusters();
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(replicationClusters.size());
for (String clusterId : replicationClusters) {
if (!clusters.contains(clusterId)) {
throw new RestException(Status.FORBIDDEN, "Invalid
cluster id: " + clusterId);
}
- validatePeerClusterConflict(clusterId,
replicationClusters);
- validateClusterForTenant(namespaceName.getTenant(),
clusterId);
+
futures.add(validatePeerClusterConflictAsync(clusterId, replicationClusters));
+
futures.add(validateClusterForTenantAsync(namespaceName.getTenant(),
clusterId));
}
+ return FutureUtil.waitForAll(futures);
}).thenCompose(__ ->
getTopicPoliciesAsyncWithRetry(topicName).thenCompose(op
-> {
TopicPolicies topicPolicies =
op.orElseGet(TopicPolicies::new);