This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4a02d47e269e9d005db1197cb707704fb0ea80db Author: Jiwei Guo <[email protected]> AuthorDate: Mon Jun 6 16:48:40 2022 +0800 [improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async. (#15940) (cherry picked from commit 0a6c6b6576bbae104e5b464b9a4898fc991569b1) --- .../pulsar/broker/web/PulsarWebResource.java | 82 ++++++++++++---------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index d810de85bf4..0c07ba7a091 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -830,19 +830,26 @@ public abstract class PulsarWebResource { log.warn(msg); validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg)); } else if (!policies.replication_clusters.contains(localCluster)) { - ClusterDataImpl ownerPeerCluster = getOwnerFromPeerClusterList(pulsarService, - policies.replication_clusters); - if (ownerPeerCluster != null) { - // found a peer that own this namespace - validationFuture.complete(ownerPeerCluster); - return; - } - String msg = String.format( - "Namespace missing local cluster name in clusters list: local_cluster=%s ns=%s clusters=%s", - localCluster, namespace.toString(), policies.replication_clusters); - - log.warn(msg); - validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, msg)); + getOwnerFromPeerClusterListAsync(pulsarService, policies.replication_clusters) + .thenAccept(ownerPeerCluster -> { + if (ownerPeerCluster != null) { + // found a peer that own this namespace + validationFuture.complete(ownerPeerCluster); + } else { + String msg = String.format( + "Namespace missing local cluster name in clusters list: local_cluster=%s" + + " ns=%s clusters=%s", + localCluster, namespace.toString(), policies.replication_clusters); + log.warn(msg); + validationFuture.completeExceptionally(new RestException(Status.PRECONDITION_FAILED, + msg)); + } + }) + .exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + validationFuture.completeExceptionally(new RestException(cause)); + return null; + }); } else { validationFuture.complete(null); } @@ -861,34 +868,35 @@ public abstract class PulsarWebResource { return validationFuture; } - private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService pulsar, Set<String> replicationClusters) { + private static CompletableFuture<ClusterDataImpl> getOwnerFromPeerClusterListAsync(PulsarService pulsar, + Set<String> replicationClusters) { String currentCluster = pulsar.getConfiguration().getClusterName(); if (replicationClusters == null || replicationClusters.isEmpty() || isBlank(currentCluster)) { - return null; + return CompletableFuture.completedFuture(null); } - try { - Optional<ClusterData> cluster = - pulsar.getPulsarResources().getClusterResources().getCluster(currentCluster); - if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) { - return null; - } - for (String peerCluster : cluster.get().getPeerClusterNames()) { - if (replicationClusters.contains(peerCluster)) { - return (ClusterDataImpl) pulsar.getPulsarResources().getClusterResources().getCluster(peerCluster) - .orElseThrow(() -> new RestException(Status.NOT_FOUND, - "Peer cluster " + peerCluster + " data not found")); - } - } - } catch (Exception e) { - log.error("Failed to get peer-cluster {}-{}", currentCluster, e.getMessage()); - if (e instanceof RestException) { - throw (RestException) e; - } else { - throw new RestException(e); - } - } - return null; + return pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster) + .thenCompose(cluster -> { + if (!cluster.isPresent() || cluster.get().getPeerClusterNames() == null) { + return CompletableFuture.completedFuture(null); + } + for (String peerCluster : cluster.get().getPeerClusterNames()) { + if (replicationClusters.contains(peerCluster)) { + return pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster) + .thenApply(ret -> { + if (!ret.isPresent()) { + throw new RestException(Status.NOT_FOUND, + "Peer cluster " + peerCluster + " data not found"); + } + return (ClusterDataImpl) ret.get(); + }); + } + } + return CompletableFuture.completedFuture(null); + }).exceptionally(ex -> { + log.error("Failed to get peer-cluster {}-{}", currentCluster, ex.getMessage()); + throw FutureUtil.wrapToCompletionException(ex); + }); } protected static CompletableFuture<Void> checkAuthorizationAsync(PulsarService pulsarService, TopicName topicName,
