mattisonchao commented on a change in pull request #13935: URL: https://github.com/apache/pulsar/pull/13935#discussion_r791344954
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java ########## @@ -358,6 +375,25 @@ protected void validateClusterForTenant(String tenant, String cluster) { log.info("Successfully validated clusters on tenant [{}]", tenant); } + protected CompletableFuture<Void> validateClusterOwnershipAsync(String cluster){ Review comment: ```suggestion protected CompletableFuture<Void> validateClusterOwnershipAsync(String cluster) { ``` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java ########## @@ -407,36 +432,28 @@ private URI getRedirectionUrl(ClusterData differentClusterData) throws Malformed protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar, String cluster, String clientAppId) { - CompletableFuture<ClusterData> clusterDataFuture = new CompletableFuture<>(); - - if (!isValidCluster(pulsar, cluster)) { - try { + if (isValidCluster(pulsar, cluster) || // this code should only happen with a v1 namespace format prop/cluster/namespaces - if (!pulsar.getConfiguration().getClusterName().equals(cluster)) { - // redirect to the cluster requested - pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster) - .thenAccept(clusterDataResult -> { - if (clusterDataResult.isPresent()) { - clusterDataFuture.complete(clusterDataResult.get()); - } else { - log.warn("[{}] Cluster does not exist: requested={}", clientAppId, cluster); - clusterDataFuture.completeExceptionally(new RestException(Status.NOT_FOUND, - "Cluster does not exist: cluster=" + cluster)); - } - }).exceptionally(ex -> { - clusterDataFuture.completeExceptionally(ex); - return null; - }); - } else { - clusterDataFuture.complete(null); - } - } catch (Exception e) { - clusterDataFuture.completeExceptionally(e); - } - } else { + pulsar.getConfiguration().getClusterName().equals(cluster)) { clusterDataFuture.complete(null); + return clusterDataFuture; } + // redirect to the cluster requested + pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster) + .whenComplete((clusterDataResult, ex) -> { + if (ex != null){ Review comment: ```suggestion if (ex != null) { ``` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java ########## @@ -358,6 +375,25 @@ protected void validateClusterForTenant(String tenant, String cluster) { log.info("Successfully validated clusters on tenant [{}]", tenant); } + protected CompletableFuture<Void> validateClusterOwnershipAsync(String cluster){ + return getClusterDataIfDifferentCluster(pulsar(), cluster, clientAppId()) + .thenAccept(differentClusterData -> { + if (differentClusterData != null){ Review comment: ```suggestion if (differentClusterData != null) { ``` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java ########## @@ -85,16 +85,21 @@ @ApiResponse(code = 401, message = "Authentication required"), @ApiResponse(code = 403, message = "This operation requires super-user access"), @ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") }) - public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) throws Exception { - validateSuperUserAccess(); - validateClusterOwnership(cluster); - - try { - return pulsar().getLoadManager().get().getAvailableBrokers(); - } catch (Exception e) { - LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e); - throw new RestException(e); - } + public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse, + @PathParam("cluster") String cluster) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validateClusterOwnershipAsync(cluster)) + .thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync()) + .thenAccept(asyncResponse::resume) + .exceptionally(ex ->{ Review comment: ```suggestion .exceptionally(ex -> { ``` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java ########## @@ -85,16 +85,21 @@ @ApiResponse(code = 401, message = "Authentication required"), @ApiResponse(code = 403, message = "This operation requires super-user access"), @ApiResponse(code = 404, message = "Cluster does not exist: cluster={clustername}") }) - public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) throws Exception { - validateSuperUserAccess(); - validateClusterOwnership(cluster); - - try { - return pulsar().getLoadManager().get().getAvailableBrokers(); - } catch (Exception e) { - LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e); - throw new RestException(e); - } + public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse, + @PathParam("cluster") String cluster) { + validateSuperUserAccessAsync() + .thenCompose(__ -> validateClusterOwnershipAsync(cluster)) + .thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync()) + .thenAccept(asyncResponse::resume) + .exceptionally(ex ->{ + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof WebApplicationException){ Review comment: ```suggestion if (realCause instanceof WebApplicationException) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org