This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 969871d6727b941fb99014156436adad1459f976
Author: Ruguo Yu <[email protected]>
AuthorDate: Thu Dec 9 16:10:13 2021 +0800

    [Authorization] Optimize the logic of allowing namespace operation (#13090)
    
    (cherry picked from commit f04139ffcc3358833b021da8e500e7e0b631e389)
---
 .../authorization/PulsarAuthorizationProvider.java | 124 +++++++++++----------
 1 file changed, 68 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index b5ebf962ba1..aab4b2b5ac1 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -531,27 +531,44 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                                                                    String role,
                                                                    
NamespaceOperation operation,
                                                                    
AuthenticationDataSource authData) {
-        CompletableFuture<Boolean> isAuthorizedFuture;
-        switch (operation) {
-            case PACKAGES:
-                isAuthorizedFuture = 
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, 
AuthAction.packages);
-                break;
-            case GET_TOPICS:
-            case UNSUBSCRIBE:
-            case CLEAR_BACKLOG:
-                isAuthorizedFuture = 
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, 
AuthAction.consume);
-                break;
-            default:
-                isAuthorizedFuture = CompletableFuture.completedFuture(false);
+        if (log.isDebugEnabled()) {
+            log.debug("Check allowNamespaceOperationAsync [{}] on [{}].", 
operation.name(), namespaceName);
         }
-        CompletableFuture<Boolean> isTenantAdminFuture = 
validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
-        return isTenantAdminFuture.thenCombine(isAuthorizedFuture, 
(isTenantAdmin, isAuthorized) -> {
-            if (log.isDebugEnabled()) {
-                log.debug("Verify if role {} is allowed to {} to topic {}: 
isTenantAdmin={}, isAuthorized={}",
-                        role, operation, namespaceName, isTenantAdmin, 
isAuthorized);
-            }
-            return isTenantAdmin || isAuthorized;
-        });
+
+        return validateTenantAdminAccess(namespaceName.getTenant(), role, 
authData)
+                .thenCompose(isSuperUserOrAdmin -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Verify if role {} is allowed to {} to 
namespace {}: isSuperUserOrAdmin={}",
+                                role, operation, namespaceName, 
isSuperUserOrAdmin);
+                    }
+                    if (isSuperUserOrAdmin) {
+                        return CompletableFuture.completedFuture(true);
+                    } else {
+                        switch (operation) {
+                            case PACKAGES:
+                                return allowTheSpecifiedActionOpsAsync(
+                                        namespaceName, role, authData, 
AuthAction.packages);
+                            case GET_TOPIC:
+                            case GET_TOPICS:
+                            case UNSUBSCRIBE:
+                            case CLEAR_BACKLOG:
+                                return allowTheSpecifiedActionOpsAsync(
+                                        namespaceName, role, authData, 
AuthAction.consume);
+                            case CREATE_TOPIC:
+                            case DELETE_TOPIC:
+                            case ADD_BUNDLE:
+                            case GET_BUNDLE:
+                            case DELETE_BUNDLE:
+                            case GRANT_PERMISSION:
+                            case GET_PERMISSION:
+                            case REVOKE_PERMISSION:
+                                return 
CompletableFuture.completedFuture(false);
+                            default:
+                                return FutureUtil.failedFuture(new 
IllegalStateException(
+                                        "NamespaceOperation [" + 
operation.name() + "] is not supported."));
+                        }
+                    }
+                });
     }
 
     @Override
@@ -568,41 +585,8 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                                                                String role,
                                                                TopicOperation 
operation,
                                                                
AuthenticationDataSource authData) {
-        log.debug("Check allowTopicOperationAsync [" + operation.name() + "] 
on [" + topicName.toString() + "].");
-
-        CompletableFuture<Boolean> isAuthorizedFuture;
-
-        switch (operation) {
-            case LOOKUP:
-            case GET_STATS:
-            case GET_METADATA:
-                isAuthorizedFuture = canLookupAsync(topicName, role, authData);
-                break;
-            case PRODUCE:
-                isAuthorizedFuture = canProduceAsync(topicName, role, 
authData);
-                break;
-            case GET_SUBSCRIPTIONS:
-            case CONSUME:
-            case SUBSCRIBE:
-            case UNSUBSCRIBE:
-            case SKIP:
-            case EXPIRE_MESSAGES:
-            case PEEK_MESSAGES:
-            case RESET_CURSOR:
-            case SET_REPLICATED_SUBSCRIPTION_STATUS:
-                isAuthorizedFuture = canConsumeAsync(topicName, role, 
authData, authData.getSubscription());
-                break;
-            case TERMINATE:
-            case COMPACT:
-            case OFFLOAD:
-            case UNLOAD:
-            case ADD_BUNDLE_RANGE:
-            case GET_BUNDLE_RANGE:
-            case DELETE_BUNDLE_RANGE:
-                return validateTenantAdminAccess(topicName.getTenant(), role, 
authData);
-            default:
-                return FutureUtil.failedFuture(
-                        new IllegalStateException("TopicOperation [" + 
operation.name() + "] is not supported."));
+        if (log.isDebugEnabled()) {
+            log.debug("Check allowTopicOperationAsync [{}] on [{}].", 
operation.name(), topicName);
         }
 
         return validateTenantAdminAccess(topicName.getTenant(), role, authData)
@@ -614,7 +598,35 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                     if (isSuperUserOrAdmin) {
                         return CompletableFuture.completedFuture(true);
                     } else {
-                        return isAuthorizedFuture;
+                        switch (operation) {
+                            case LOOKUP:
+                            case GET_STATS:
+                            case GET_METADATA:
+                                return canLookupAsync(topicName, role, 
authData);
+                            case PRODUCE:
+                                return canProduceAsync(topicName, role, 
authData);
+                            case GET_SUBSCRIPTIONS:
+                            case CONSUME:
+                            case SUBSCRIBE:
+                            case UNSUBSCRIBE:
+                            case SKIP:
+                            case EXPIRE_MESSAGES:
+                            case PEEK_MESSAGES:
+                            case RESET_CURSOR:
+                            case SET_REPLICATED_SUBSCRIPTION_STATUS:
+                                return canConsumeAsync(topicName, role, 
authData, authData.getSubscription());
+                            case TERMINATE:
+                            case COMPACT:
+                            case OFFLOAD:
+                            case UNLOAD:
+                            case ADD_BUNDLE_RANGE:
+                            case GET_BUNDLE_RANGE:
+                            case DELETE_BUNDLE_RANGE:
+                                return 
CompletableFuture.completedFuture(false);
+                            default:
+                                return FutureUtil.failedFuture(new 
IllegalStateException(
+                                        "TopicOperation [" + operation.name() 
+ "] is not supported."));
+                        }
                     }
                 });
     }

Reply via email to