This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0a6c6b6576b [improve][broker] Make
PulsarWebResource#getOwnerFromPeerClusterList async. (#15940)
0a6c6b6576b is described below
commit 0a6c6b6576bbae104e5b464b9a4898fc991569b1
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Jun 6 16:48:40 2022 +0800
[improve][broker] Make PulsarWebResource#getOwnerFromPeerClusterList async.
(#15940)
---
.../pulsar/broker/web/PulsarWebResource.java | 82 ++++++++++++----------
1 file changed, 45 insertions(+), 37 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 22449e7f119..62f01d906f3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -838,19 +838,26 @@ public abstract class PulsarWebResource {
log.warn(msg);
validationFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED, msg));
} else if
(!policies.replication_clusters.contains(localCluster)) {
- ClusterDataImpl ownerPeerCluster =
getOwnerFromPeerClusterList(pulsarService,
- policies.replication_clusters);
- if (ownerPeerCluster != null) {
- // found a peer that own this namespace
- validationFuture.complete(ownerPeerCluster);
- return;
- }
- String msg = String.format(
- "Namespace missing local cluster name in clusters
list: local_cluster=%s ns=%s clusters=%s",
- localCluster, namespace.toString(),
policies.replication_clusters);
-
- log.warn(msg);
- validationFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED, msg));
+ getOwnerFromPeerClusterListAsync(pulsarService,
policies.replication_clusters)
+ .thenAccept(ownerPeerCluster -> {
+ if (ownerPeerCluster != null) {
+ // found a peer that own this namespace
+
validationFuture.complete(ownerPeerCluster);
+ } else {
+ String msg = String.format(
+ "Namespace missing local cluster
name in clusters list: local_cluster=%s"
+ + " ns=%s clusters=%s",
+ localCluster,
namespace.toString(), policies.replication_clusters);
+ log.warn(msg);
+ validationFuture.completeExceptionally(new
RestException(Status.PRECONDITION_FAILED,
+ msg));
+ }
+ })
+ .exceptionally(ex -> {
+ Throwable cause =
FutureUtil.unwrapCompletionException(ex);
+ validationFuture.completeExceptionally(new
RestException(cause));
+ return null;
+ });
} else {
validationFuture.complete(null);
}
@@ -869,34 +876,35 @@ public abstract class PulsarWebResource {
return validationFuture;
}
- private static ClusterDataImpl getOwnerFromPeerClusterList(PulsarService
pulsar, Set<String> replicationClusters) {
+ private static CompletableFuture<ClusterDataImpl>
getOwnerFromPeerClusterListAsync(PulsarService pulsar,
+ Set<String> replicationClusters) {
String currentCluster = pulsar.getConfiguration().getClusterName();
if (replicationClusters == null || replicationClusters.isEmpty() ||
isBlank(currentCluster)) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
- try {
- Optional<ClusterData> cluster =
-
pulsar.getPulsarResources().getClusterResources().getCluster(currentCluster);
- if (!cluster.isPresent() || cluster.get().getPeerClusterNames() ==
null) {
- return null;
- }
- for (String peerCluster : cluster.get().getPeerClusterNames()) {
- if (replicationClusters.contains(peerCluster)) {
- return (ClusterDataImpl)
pulsar.getPulsarResources().getClusterResources().getCluster(peerCluster)
- .orElseThrow(() -> new
RestException(Status.NOT_FOUND,
- "Peer cluster " + peerCluster + " data not
found"));
- }
- }
- } catch (Exception e) {
- log.error("Failed to get peer-cluster {}-{}", currentCluster,
e.getMessage());
- if (e instanceof RestException) {
- throw (RestException) e;
- } else {
- throw new RestException(e);
- }
- }
- return null;
+ return
pulsar.getPulsarResources().getClusterResources().getClusterAsync(currentCluster)
+ .thenCompose(cluster -> {
+ if (!cluster.isPresent() ||
cluster.get().getPeerClusterNames() == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ for (String peerCluster :
cluster.get().getPeerClusterNames()) {
+ if (replicationClusters.contains(peerCluster)) {
+ return
pulsar.getPulsarResources().getClusterResources().getClusterAsync(peerCluster)
+ .thenApply(ret -> {
+ if (!ret.isPresent()) {
+ throw new
RestException(Status.NOT_FOUND,
+ "Peer cluster " +
peerCluster + " data not found");
+ }
+ return (ClusterDataImpl) ret.get();
+ });
+ }
+ }
+ return CompletableFuture.completedFuture(null);
+ }).exceptionally(ex -> {
+ log.error("Failed to get peer-cluster {}-{}",
currentCluster, ex.getMessage());
+ throw FutureUtil.wrapToCompletionException(ex);
+ });
}
protected static void checkAuthorization(PulsarService pulsarService,
TopicName topicName, String role,