This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 969871d6727b941fb99014156436adad1459f976 Author: Ruguo Yu <[email protected]> AuthorDate: Thu Dec 9 16:10:13 2021 +0800 [Authorization] Optimize the logic of allowing namespace operation (#13090) (cherry picked from commit f04139ffcc3358833b021da8e500e7e0b631e389) --- .../authorization/PulsarAuthorizationProvider.java | 124 +++++++++++---------- 1 file changed, 68 insertions(+), 56 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index b5ebf962ba1..aab4b2b5ac1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -531,27 +531,44 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { String role, NamespaceOperation operation, AuthenticationDataSource authData) { - CompletableFuture<Boolean> isAuthorizedFuture; - switch (operation) { - case PACKAGES: - isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages); - break; - case GET_TOPICS: - case UNSUBSCRIBE: - case CLEAR_BACKLOG: - isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.consume); - break; - default: - isAuthorizedFuture = CompletableFuture.completedFuture(false); + if (log.isDebugEnabled()) { + log.debug("Check allowNamespaceOperationAsync [{}] on [{}].", operation.name(), namespaceName); } - CompletableFuture<Boolean> isTenantAdminFuture = validateTenantAdminAccess(namespaceName.getTenant(), role, authData); - return isTenantAdminFuture.thenCombine(isAuthorizedFuture, (isTenantAdmin, isAuthorized) -> { - if (log.isDebugEnabled()) { - log.debug("Verify if role {} is allowed to {} to topic {}: isTenantAdmin={}, isAuthorized={}", - role, operation, namespaceName, isTenantAdmin, isAuthorized); - } - return isTenantAdmin || isAuthorized; - }); + + return validateTenantAdminAccess(namespaceName.getTenant(), role, authData) + .thenCompose(isSuperUserOrAdmin -> { + if (log.isDebugEnabled()) { + log.debug("Verify if role {} is allowed to {} to namespace {}: isSuperUserOrAdmin={}", + role, operation, namespaceName, isSuperUserOrAdmin); + } + if (isSuperUserOrAdmin) { + return CompletableFuture.completedFuture(true); + } else { + switch (operation) { + case PACKAGES: + return allowTheSpecifiedActionOpsAsync( + namespaceName, role, authData, AuthAction.packages); + case GET_TOPIC: + case GET_TOPICS: + case UNSUBSCRIBE: + case CLEAR_BACKLOG: + return allowTheSpecifiedActionOpsAsync( + namespaceName, role, authData, AuthAction.consume); + case CREATE_TOPIC: + case DELETE_TOPIC: + case ADD_BUNDLE: + case GET_BUNDLE: + case DELETE_BUNDLE: + case GRANT_PERMISSION: + case GET_PERMISSION: + case REVOKE_PERMISSION: + return CompletableFuture.completedFuture(false); + default: + return FutureUtil.failedFuture(new IllegalStateException( + "NamespaceOperation [" + operation.name() + "] is not supported.")); + } + } + }); } @Override @@ -568,41 +585,8 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { String role, TopicOperation operation, AuthenticationDataSource authData) { - log.debug("Check allowTopicOperationAsync [" + operation.name() + "] on [" + topicName.toString() + "]."); - - CompletableFuture<Boolean> isAuthorizedFuture; - - switch (operation) { - case LOOKUP: - case GET_STATS: - case GET_METADATA: - isAuthorizedFuture = canLookupAsync(topicName, role, authData); - break; - case PRODUCE: - isAuthorizedFuture = canProduceAsync(topicName, role, authData); - break; - case GET_SUBSCRIPTIONS: - case CONSUME: - case SUBSCRIBE: - case UNSUBSCRIBE: - case SKIP: - case EXPIRE_MESSAGES: - case PEEK_MESSAGES: - case RESET_CURSOR: - case SET_REPLICATED_SUBSCRIPTION_STATUS: - isAuthorizedFuture = canConsumeAsync(topicName, role, authData, authData.getSubscription()); - break; - case TERMINATE: - case COMPACT: - case OFFLOAD: - case UNLOAD: - case ADD_BUNDLE_RANGE: - case GET_BUNDLE_RANGE: - case DELETE_BUNDLE_RANGE: - return validateTenantAdminAccess(topicName.getTenant(), role, authData); - default: - return FutureUtil.failedFuture( - new IllegalStateException("TopicOperation [" + operation.name() + "] is not supported.")); + if (log.isDebugEnabled()) { + log.debug("Check allowTopicOperationAsync [{}] on [{}].", operation.name(), topicName); } return validateTenantAdminAccess(topicName.getTenant(), role, authData) @@ -614,7 +598,35 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { if (isSuperUserOrAdmin) { return CompletableFuture.completedFuture(true); } else { - return isAuthorizedFuture; + switch (operation) { + case LOOKUP: + case GET_STATS: + case GET_METADATA: + return canLookupAsync(topicName, role, authData); + case PRODUCE: + return canProduceAsync(topicName, role, authData); + case GET_SUBSCRIPTIONS: + case CONSUME: + case SUBSCRIBE: + case UNSUBSCRIBE: + case SKIP: + case EXPIRE_MESSAGES: + case PEEK_MESSAGES: + case RESET_CURSOR: + case SET_REPLICATED_SUBSCRIPTION_STATUS: + return canConsumeAsync(topicName, role, authData, authData.getSubscription()); + case TERMINATE: + case COMPACT: + case OFFLOAD: + case UNLOAD: + case ADD_BUNDLE_RANGE: + case GET_BUNDLE_RANGE: + case DELETE_BUNDLE_RANGE: + return CompletableFuture.completedFuture(false); + default: + return FutureUtil.failedFuture(new IllegalStateException( + "TopicOperation [" + operation.name() + "] is not supported.")); + } } }); }
