mattisonchao commented on a change in pull request #13904:
URL: https://github.com/apache/pulsar/pull/13904#discussion_r801237356



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String 
methodName, Throwable thr, Asyn
     }
 
     protected void internalTruncateNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> topic.truncate())
+                .thenAccept(__ -> {
+                    asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),

Review comment:
       Why ``resume`` Exception ? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();

Review comment:
       ``InterruptedException``does not have ``getCause()``.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String 
methodName, Throwable thr, Asyn
     }
 
     protected void internalTruncateNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> topic.truncate())
+                .thenAccept(__ -> {
+                    asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                            Response.Status.NO_CONTENT.getReasonPhrase()));
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    if (cause instanceof WebApplicationException
+                            && ((WebApplicationException) 
cause).getResponse().getStatus()
+                                    == 
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Failed to truncate non-partitioned 
topic {}, redirecting to other brokers.",
+                                    clientAppId(), topicName, cause);
+                        }
+                    } else {
+                        log.error("[{}] Failed to truncate non-partitioned 
topic {}", clientAppId(), topicName, cause);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    return null;
+                });
     }
 
     protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
-
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
-                if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-                    for (int i = 0; i < meta.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar().getAdminClient().topics()
-                                .truncateAsync(topicNamePartition.toString()));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable th = exception.getCause();
-                            if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicName, exception);
-                                asyncResponse.resume(new 
RestException(exception));
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .whenComplete((meta, t) -> {
+                        if (meta.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+                            for (int i = 0; i < meta.partitions; i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    
futures.add(pulsar().getAdminClient().topics()
+                                            
.truncateAsync(topicNamePartition.toString()));
+                                } catch (Exception e) {

Review comment:
       ```suggestion
                                   } catch (PulsarServerException e) {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String 
methodName, Throwable thr, Asyn
     }
 
     protected void internalTruncateNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> topic.truncate())
+                .thenAccept(__ -> {
+                    asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                            Response.Status.NO_CONTENT.getReasonPhrase()));
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    if (cause instanceof WebApplicationException
+                            && ((WebApplicationException) 
cause).getResponse().getStatus()
+                                    == 
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Failed to truncate non-partitioned 
topic {}, redirecting to other brokers.",
+                                    clientAppId(), topicName, cause);
+                        }
+                    } else {
+                        log.error("[{}] Failed to truncate non-partitioned 
topic {}", clientAppId(), topicName, cause);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    return null;
+                });
     }
 
     protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
-
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
-                if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-                    for (int i = 0; i < meta.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar().getAdminClient().topics()
-                                .truncateAsync(topicNamePartition.toString()));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable th = exception.getCause();
-                            if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicName, exception);
-                                asyncResponse.resume(new 
RestException(exception));
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .whenComplete((meta, t) -> {
+                        if (meta.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+                            for (int i = 0; i < meta.partitions; i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    
futures.add(pulsar().getAdminClient().topics()
+                                            
.truncateAsync(topicNamePartition.toString()));
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to truncate topic 
{}", clientAppId(), topicNamePartition, e);
+                                    asyncResponse.resume(new RestException(e));
+                                    return;
+                                }
                             }
+                            FutureUtil.waitForAll(futures).handle((result, 
exception) -> {

Review comment:
       Use ``whenComplete`` instead of ``handle`` or return this future to the 
upper layer future? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();

Review comment:
       ``get()`` need timeout? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException){
+                throw (WebApplicationException) cause;
+            } else {
+                throw new RestException(cause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified 
tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String 
tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), 
originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        CompletableFuture<Void> future = new CompletableFuture<>();

Review comment:
       Could we use ``combine`` or another method to avoid using global future 
that complete at a lot of methods? it's hard to read the code. I think.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException){
+                throw (WebApplicationException) cause;
+            } else {
+                throw new RestException(cause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified 
tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String 
tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), 
originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- 
role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
 
-        TenantInfo tenantInfo = 
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant 
does not exist"));
+        pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
+                    }
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+                        }
 
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
-            }
+                        
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId,
+                                originalPrincipal);
 
-            
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
+                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    
pulsar.getBrokerService().getAuthorizationService();
+                            CompletableFuture<Boolean> isProxySuperUserFuture =
+                                    
authorizationService.isSuperUser(clientAppId, authenticationData);
+                            CompletableFuture<Boolean> 
isOriginalPrincipalSuperUserFuture =
+                                    
authorizationService.isSuperUser(originalPrincipal, authenticationData);
 
-            if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = 
pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = 
authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, 
clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, 
authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on 
tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, 
authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate 
resources on this tenant");
-                    }
-                }
+                            
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+                                if (isProxySuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(authorized -> {
+                                if (!authorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                }
+                                return null;
+                            }).exceptionally(ex -> {
+                                future.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        ex.getMessage()));
+                                return null;
+                            });
 
-                log.debug("Successfully authorized {} on tenant {}", 
clientAppId, tenant);
-            }
-        }
+                            
isOriginalPrincipalSuperUserFuture.thenCompose(isOriginalPrincipalSuperUser -> {
+                                if (isOriginalPrincipalSuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(originalPrincipalAuthorized -> {
+                                if (!originalPrincipalAuthorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                } else {
+                                    log.debug("Successfully authorized {} 
(proxied by {}) on tenant {}",
+                                            originalPrincipal, clientAppId, 
tenant);
+                                    future.complete(null);
+                                }
+                                return null;
+                            }).exceptionally(ex -> {
+                                future.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        ex.getMessage()));
+                                return null;
+                            });
+                        } else {
+                            pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, 
authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return 
pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, 
clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return 
CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenCompose(authorized -> {
+                                        if (!authorized) {
+                                            throw new 
RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to 
administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized 
{} on tenant {}", clientAppId, tenant);
+                                            future.complete(null);
+                                        }
+                                        return null;
+                                    }).exceptionally(ex -> {
+                                        future.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                                ex.getMessage()));
+                                        return null;
+                                    });
+                        }
+                    }
+                    return null;

Review comment:
       NPE? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException){
+                throw (WebApplicationException) cause;
+            } else {
+                throw new RestException(cause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified 
tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String 
tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), 
originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- 
role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
 
-        TenantInfo tenantInfo = 
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant 
does not exist"));
+        pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
+                    }
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+                        }
 
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
-            }
+                        
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId,
+                                originalPrincipal);
 
-            
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
+                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    
pulsar.getBrokerService().getAuthorizationService();
+                            CompletableFuture<Boolean> isProxySuperUserFuture =
+                                    
authorizationService.isSuperUser(clientAppId, authenticationData);
+                            CompletableFuture<Boolean> 
isOriginalPrincipalSuperUserFuture =
+                                    
authorizationService.isSuperUser(originalPrincipal, authenticationData);
 
-            if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = 
pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = 
authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, 
clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, 
authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on 
tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, 
authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate 
resources on this tenant");
-                    }
-                }
+                            
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+                                if (isProxySuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(authorized -> {
+                                if (!authorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                }
+                                return null;

Review comment:
       NPE? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String 
methodName, Throwable thr, Asyn
     }
 
     protected void internalTruncateNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> topic.truncate())
+                .thenAccept(__ -> {
+                    asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                            Response.Status.NO_CONTENT.getReasonPhrase()));
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    if (cause instanceof WebApplicationException
+                            && ((WebApplicationException) 
cause).getResponse().getStatus()
+                                    == 
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Failed to truncate non-partitioned 
topic {}, redirecting to other brokers.",
+                                    clientAppId(), topicName, cause);
+                        }
+                    } else {
+                        log.error("[{}] Failed to truncate non-partitioned 
topic {}", clientAppId(), topicName, cause);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    return null;
+                });
     }
 
     protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
-
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
-                if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-                    for (int i = 0; i < meta.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar().getAdminClient().topics()
-                                .truncateAsync(topicNamePartition.toString()));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable th = exception.getCause();
-                            if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicName, exception);
-                                asyncResponse.resume(new 
RestException(exception));
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .whenComplete((meta, t) -> {
+                        if (meta.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+                            for (int i = 0; i < meta.partitions; i++) {
+                                TopicName topicNamePartition = 
topicName.getPartition(i);
+                                try {
+                                    
futures.add(pulsar().getAdminClient().topics()
+                                            
.truncateAsync(topicNamePartition.toString()));
+                                } catch (Exception e) {
+                                    log.error("[{}] Failed to truncate topic 
{}", clientAppId(), topicNamePartition, e);

Review comment:
       ```suggestion
                                       log.error("[{}] Failed to truncate 
topic, while getting admin client {}", clientAppId(), topicNamePartition, e);
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException){
+                throw (WebApplicationException) cause;
+            } else {
+                throw new RestException(cause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified 
tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String 
tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), 
originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- 
role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
 
-        TenantInfo tenantInfo = 
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant 
does not exist"));
+        pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
+                    }
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+                        }
 
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
-            }
+                        
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId,
+                                originalPrincipal);
 
-            
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
+                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    
pulsar.getBrokerService().getAuthorizationService();
+                            CompletableFuture<Boolean> isProxySuperUserFuture =
+                                    
authorizationService.isSuperUser(clientAppId, authenticationData);
+                            CompletableFuture<Boolean> 
isOriginalPrincipalSuperUserFuture =
+                                    
authorizationService.isSuperUser(originalPrincipal, authenticationData);
 
-            if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = 
pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = 
authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, 
clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, 
authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on 
tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, 
authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate 
resources on this tenant");
-                    }
-                }
+                            
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+                                if (isProxySuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(authorized -> {
+                                if (!authorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                }
+                                return null;
+                            }).exceptionally(ex -> {
+                                future.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        ex.getMessage()));
+                                return null;
+                            });
 
-                log.debug("Successfully authorized {} on tenant {}", 
clientAppId, tenant);
-            }
-        }
+                            
isOriginalPrincipalSuperUserFuture.thenCompose(isOriginalPrincipalSuperUser -> {
+                                if (isOriginalPrincipalSuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(originalPrincipalAuthorized -> {
+                                if (!originalPrincipalAuthorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                } else {
+                                    log.debug("Successfully authorized {} 
(proxied by {}) on tenant {}",
+                                            originalPrincipal, clientAppId, 
tenant);
+                                    future.complete(null);
+                                }
+                                return null;
+                            }).exceptionally(ex -> {
+                                future.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        ex.getMessage()));
+                                return null;
+                            });
+                        } else {
+                            pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, 
authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return 
pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, 
clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return 
CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenCompose(authorized -> {
+                                        if (!authorized) {
+                                            throw new 
RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to 
administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized 
{} on tenant {}", clientAppId, tenant);
+                                            future.complete(null);
+                                        }
+                                        return null;

Review comment:
       NPE? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
##########
@@ -247,64 +247,141 @@ protected void validateAdminAccessForTenant(String 
tenant) {
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
                                                        String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                       
AuthenticationDataSource authenticationData) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException){
+                throw (WebApplicationException) cause;
+            } else {
+                throw new RestException(cause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified 
tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String 
tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), 
originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- 
role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
 
-        TenantInfo tenantInfo = 
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant 
does not exist"));
+        pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
+                    }
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+                        }
 
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
-            }
+                        
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId,
+                                originalPrincipal);
 
-            
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
+                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    
pulsar.getBrokerService().getAuthorizationService();
+                            CompletableFuture<Boolean> isProxySuperUserFuture =
+                                    
authorizationService.isSuperUser(clientAppId, authenticationData);
+                            CompletableFuture<Boolean> 
isOriginalPrincipalSuperUserFuture =
+                                    
authorizationService.isSuperUser(originalPrincipal, authenticationData);
 
-            if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = 
pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = 
authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, 
clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, 
authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
-                    }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on 
tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, 
authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate 
resources on this tenant");
-                    }
-                }
+                            
isProxySuperUserFuture.thenCompose(isProxySuperUser -> {
+                                if (isProxySuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(authorized -> {
+                                if (!authorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                }
+                                return null;
+                            }).exceptionally(ex -> {
+                                future.completeExceptionally(new 
RestException(Status.INTERNAL_SERVER_ERROR,
+                                        ex.getMessage()));
+                                return null;
+                            });
 
-                log.debug("Successfully authorized {} on tenant {}", 
clientAppId, tenant);
-            }
-        }
+                            
isOriginalPrincipalSuperUserFuture.thenCompose(isOriginalPrincipalSuperUser -> {
+                                if (isOriginalPrincipalSuperUser) {
+                                    return 
CompletableFuture.completedFuture(true);
+                                } else {
+                                    return 
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo,
+                                            authenticationData);
+                                }
+                            }).thenCompose(originalPrincipalAuthorized -> {
+                                if (!originalPrincipalAuthorized) {
+                                    throw new 
RestException(Status.UNAUTHORIZED,
+                                            String.format(
+                                                    "Proxy not authorized to 
access resource (proxy:%s,original:%s)",
+                                                    clientAppId, 
originalPrincipal));
+                                } else {
+                                    log.debug("Successfully authorized {} 
(proxied by {}) on tenant {}",
+                                            originalPrincipal, clientAppId, 
tenant);
+                                    future.complete(null);
+                                }
+                                return null;

Review comment:
       NPE? 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -4414,74 +4414,79 @@ protected void handleTopicPolicyException(String 
methodName, Throwable thr, Asyn
     }
 
     protected void internalTruncateNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), 
topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+        validateAdminAccessForTenantAsync(topicName.getTenant())
+                .thenCompose(__ -> validateTopicOwnershipAsync(topicName, 
authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> topic.truncate())
+                .thenAccept(__ -> {
+                    asyncResponse.resume(new 
RestException(Response.Status.NO_CONTENT.getStatusCode(),
+                            Response.Status.NO_CONTENT.getReasonPhrase()));
+                }).exceptionally(ex -> {
+                    Throwable cause = ex.getCause();
+                    if (cause instanceof WebApplicationException
+                            && ((WebApplicationException) 
cause).getResponse().getStatus()
+                                    == 
Status.TEMPORARY_REDIRECT.getStatusCode()) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Failed to truncate non-partitioned 
topic {}, redirecting to other brokers.",
+                                    clientAppId(), topicName, cause);
+                        }
+                    } else {
+                        log.error("[{}] Failed to truncate non-partitioned 
topic {}", clientAppId(), topicName, cause);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    return null;
+                });
     }
 
     protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean 
authoritative) {
-
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).whenComplete((meta, t) -> {
-                if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
-                    for (int i = 0; i < meta.partitions; i++) {
-                        TopicName topicNamePartition = 
topicName.getPartition(i);
-                        try {
-                            futures.add(pulsar().getAdminClient().topics()
-                                .truncateAsync(topicNamePartition.toString()));
-                        } catch (Exception e) {
-                            log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-                    }
-                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
-                        if (exception != null) {
-                            Throwable th = exception.getCause();
-                            if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", 
clientAppId(), topicName, exception);
-                                asyncResponse.resume(new 
RestException(exception));
+            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .whenComplete((meta, t) -> {
+                        if (meta.partitions > 0) {
+                            final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();

Review comment:
       ```suggestion
                               final List<CompletableFuture<Void>> futures = 
Lists.newArrayList(meta.partitions);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to