codelipenghui commented on code in PR #15694:
URL: https://github.com/apache/pulsar/pull/15694#discussion_r878783911


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -220,49 +220,52 @@ public static CompletableFuture<ByteBuf> 
lookupTopicAsync(PulsarService pulsarSe
                             cluster);
                 }
                 
validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
-                        differentClusterData.getBrokerServiceUrlTls(), true, 
LookupType.Redirect, requestId, false));
+                        differentClusterData.getBrokerServiceUrlTls(), true, 
LookupType.Redirect,
+                        requestId, false));
             } else {
                 // (2) authorize client
-                try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, 
authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorized {} on cluster {}", 
clientAppId, topicName.toString());
-                    
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
-                            authException.getMessage(), requestId));
-                    return;
-                } catch (Exception e) {
-                    log.warn("Unknown error while authorizing {} on cluster 
{}", clientAppId, topicName.toString());
-                    validationFuture.completeExceptionally(e);
-                    return;
-                }
-                // (3) validate global namespace
-                checkLocalOrGetPeerReplicationCluster(pulsarService, 
topicName.getNamespaceObject())
-                        .thenAccept(peerClusterData -> {
-                            if (peerClusterData == null) {
-                                // (4) all validation passed: initiate lookup
-                                validationFuture.complete(null);
-                                return;
-                            }
-                            // if peer-cluster-data is present it means 
namespace is owned by that peer-cluster and
-                            // request should be redirect to the peer-cluster
-                            if 
(StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
-                                    && 
StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
-                                
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
-                                        "Redirected cluster's brokerService 
url is not configured", requestId));
-                                return;
+                checkAuthorizationAsync(pulsarService, topicName, clientAppId, 
authenticationData).thenRun(() ->
+                                validationFuture.complete(null))
+                        .exceptionally(e -> {
+                            if (FutureUtil.unwrapCompletionException(e) 
instanceof RestException) {
+                                log.warn("Failed to authorized {} on cluster 
{}", clientAppId, topicName);
+                                
validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
+                                        
FutureUtil.unwrapCompletionException(e).getMessage(), requestId));
+                            } else {
+                                log.warn("Unknown error while authorizing {} 
on cluster {}", clientAppId, topicName);
+                                validationFuture.completeExceptionally(e);

Review Comment:
   Fixed



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -3880,46 +3879,53 @@ public static 
CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
             PulsarService pulsar, String clientAppId, String originalPrincipal,
             AuthenticationDataSource authenticationData, TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
-        try {
-            // (1) authorize client
-            try {
-                checkAuthorization(pulsar, topicName, clientAppId, 
authenticationData);
-            } catch (RestException e) {
-                try {
-                    validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, 
topicName.getTenant(), authenticationData,
-                            
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorize {} on topic {}", 
clientAppId, topicName);
-                    throw new 
PulsarClientException(String.format("Authorization failed %s on topic %s with 
error %s",
-                            clientAppId, topicName, 
authException.getMessage()));
-                }
-            } catch (Exception ex) {
-                // throw without wrapping to PulsarClientException that 
considers: unknown error marked as internal
-                // server error
-                log.warn("Failed to authorize {} on topic {}", clientAppId, 
topicName, ex);
-                throw ex;
-            }
+        CompletableFuture<Void> authorizationFuture = new 
CompletableFuture<>();
+        checkAuthorizationAsync(pulsar, topicName, clientAppId, 
authenticationData)
+                .thenRun(() -> authorizationFuture.complete(null))
+                .exceptionally(e -> {
+                    if (FutureUtil.unwrapCompletionException(e) instanceof 
RestException) {
+                        validateAdminAccessForTenantAsync(pulsar,
+                                clientAppId, originalPrincipal, 
topicName.getTenant(), authenticationData)
+                                .thenRun(() -> {
+                                    authorizationFuture.complete(null);
+                                }).exceptionally(ex -> {
+                                    if 
(FutureUtil.unwrapCompletionException(ex) instanceof RestException) {
+                                        log.warn("Failed to authorize {} on 
topic {}", clientAppId, topicName);
+                                        
authorizationFuture.completeExceptionally(new PulsarClientException(
+                                                String.format("Authorization 
failed %s on topic %s with error %s",
+                                                clientAppId, topicName, 
ex.getCause().getMessage())));

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to