mattisonchao commented on a change in pull request #13935:
URL: https://github.com/apache/pulsar/pull/13935#discussion_r791344954



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -358,6 +375,25 @@ protected void validateClusterForTenant(String tenant, 
String cluster) {
         log.info("Successfully validated clusters on tenant [{}]", tenant);
     }
 
+    protected CompletableFuture<Void> validateClusterOwnershipAsync(String 
cluster){

Review comment:
       ```suggestion
       protected CompletableFuture<Void> validateClusterOwnershipAsync(String 
cluster) {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -407,36 +432,28 @@ private URI getRedirectionUrl(ClusterData 
differentClusterData) throws Malformed
     protected static CompletableFuture<ClusterData> 
getClusterDataIfDifferentCluster(PulsarService pulsar,
                                                                                
      String cluster,
                                                                                
      String clientAppId) {
-
         CompletableFuture<ClusterData> clusterDataFuture = new 
CompletableFuture<>();
-
-        if (!isValidCluster(pulsar, cluster)) {
-            try {
+        if (isValidCluster(pulsar, cluster) ||
                 // this code should only happen with a v1 namespace format 
prop/cluster/namespaces
-                if 
(!pulsar.getConfiguration().getClusterName().equals(cluster)) {
-                    // redirect to the cluster requested
-                    
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
-                            .thenAccept(clusterDataResult -> {
-                                if (clusterDataResult.isPresent()) {
-                                    
clusterDataFuture.complete(clusterDataResult.get());
-                                } else {
-                                    log.warn("[{}] Cluster does not exist: 
requested={}", clientAppId, cluster);
-                                    
clusterDataFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
-                                            "Cluster does not exist: cluster=" 
+ cluster));
-                                }
-                            }).exceptionally(ex -> {
-                                clusterDataFuture.completeExceptionally(ex);
-                                return null;
-                            });
-                } else {
-                    clusterDataFuture.complete(null);
-                }
-            } catch (Exception e) {
-                clusterDataFuture.completeExceptionally(e);
-            }
-        } else {
+                pulsar.getConfiguration().getClusterName().equals(cluster)) {
             clusterDataFuture.complete(null);
+            return clusterDataFuture;
         }
+        // redirect to the cluster requested
+        
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
+                .whenComplete((clusterDataResult, ex) -> {
+                    if (ex != null){

Review comment:
       ```suggestion
                       if (ex != null) {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -358,6 +375,25 @@ protected void validateClusterForTenant(String tenant, 
String cluster) {
         log.info("Successfully validated clusters on tenant [{}]", tenant);
     }
 
+    protected CompletableFuture<Void> validateClusterOwnershipAsync(String 
cluster){
+        return getClusterDataIfDifferentCluster(pulsar(), cluster, 
clientAppId())
+                .thenAccept(differentClusterData -> {
+                    if (differentClusterData != null){

Review comment:
       ```suggestion
                       if (differentClusterData != null) {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -85,16 +85,21 @@
             @ApiResponse(code = 401, message = "Authentication required"),
             @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
             @ApiResponse(code = 404, message = "Cluster does not exist: 
cluster={clustername}") })
-    public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) 
throws Exception {
-        validateSuperUserAccess();
-        validateClusterOwnership(cluster);
-
-        try {
-            return pulsar().getLoadManager().get().getAvailableBrokers();
-        } catch (Exception e) {
-            LOG.error("[{}] Failed to get active broker list: cluster={}", 
clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
+    public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
+                                 @PathParam("cluster") String cluster) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
+                .thenCompose(__ -> 
pulsar().getLoadManager().get().getAvailableBrokersAsync())
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex ->{

Review comment:
       ```suggestion
                   .exceptionally(ex -> {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -85,16 +85,21 @@
             @ApiResponse(code = 401, message = "Authentication required"),
             @ApiResponse(code = 403, message = "This operation requires 
super-user access"),
             @ApiResponse(code = 404, message = "Cluster does not exist: 
cluster={clustername}") })
-    public Set<String> getActiveBrokers(@PathParam("cluster") String cluster) 
throws Exception {
-        validateSuperUserAccess();
-        validateClusterOwnership(cluster);
-
-        try {
-            return pulsar().getLoadManager().get().getAvailableBrokers();
-        } catch (Exception e) {
-            LOG.error("[{}] Failed to get active broker list: cluster={}", 
clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
+    public void getActiveBrokers(@Suspended final AsyncResponse asyncResponse,
+                                 @PathParam("cluster") String cluster) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterOwnershipAsync(cluster))
+                .thenCompose(__ -> 
pulsar().getLoadManager().get().getAvailableBrokersAsync())
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex ->{
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    if (realCause instanceof WebApplicationException){

Review comment:
       ```suggestion
                       if (realCause instanceof WebApplicationException) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to