This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d07752276665508a7571ea0afaaa0e5fbedb3281 Author: Qiang Zhao <[email protected]> AuthorDate: Wed Dec 4 10:16:21 2024 +0800 [fix][broker] support missing cluster level fine-granted permissions (#23675) (cherry picked from commit 1c1a5cc655511c22b3399005d8ff9102b3553627) --- .../authorization/AuthorizationProvider.java | 20 + .../broker/authorization/AuthorizationService.java | 45 +++ .../authorization/PulsarAuthorizationProvider.java | 15 + .../pulsar/broker/admin/impl/ClustersBase.java | 160 +++++++- .../admin/ClusterEndpointsAuthorizationTest.java | 428 +++++++++++++++++++++ .../{PolicyName.java => ClusterOperation.java} | 49 +-- .../pulsar/common/policies/data/PolicyName.java | 6 +- 7 files changed, 668 insertions(+), 55 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java index 48386265940..cb61292f8e3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java @@ -33,6 +33,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BrokerOperation; +import org.apache.pulsar.common.policies.data.ClusterOperation; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; @@ -393,4 +394,23 @@ public interface AuthorizationProvider extends Closeable { return FutureUtil.failedFuture( new UnsupportedOperationException("allowBrokerOperationAsync is not supported yet.")); } + + + default CompletableFuture<Boolean> allowClusterOperationAsync(String clusterName, + ClusterOperation clusterOperation, + String role, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture( + new UnsupportedOperationException("allowClusterOperationAsync is not supported yet.")); + } + + default CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String clusterName, + String role, + PolicyName policy, + PolicyOperation operation, + AuthenticationDataSource authData) { + return FutureUtil.failedFuture( + new IllegalStateException("ClusterPolicyOperation [" + policy.name() + "/" + operation.name() + "] " + + "is not supported by the Authorization provider you are using.")); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 1348a405b0d..40573d99d60 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -39,6 +39,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.BrokerOperation; +import org.apache.pulsar.common.policies.data.ClusterOperation; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; @@ -567,6 +568,50 @@ public class AuthorizationService { } } + public CompletableFuture<Boolean> allowClusterOperationAsync(String clusterName, + ClusterOperation clusterOperation, + String originalRole, + String role, + AuthenticationDataSource authData) { + if (!isValidOriginalPrincipal(role, originalRole, authData)) { + return CompletableFuture.completedFuture(false); + } + + if (isProxyRole(role)) { + final var isRoleAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, + clusterOperation, role, authData); + final var isOriginalAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, + clusterOperation, originalRole, authData); + return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, + (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); + } else { + return provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData); + } + } + + public CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String clusterName, + PolicyName policy, + PolicyOperation operation, + String originalRole, + String role, + AuthenticationDataSource authData) { + if (!isValidOriginalPrincipal(role, originalRole, authData)) { + return CompletableFuture.completedFuture(false); + } + + if (isProxyRole(role)) { + final var isRoleAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, role, + policy, operation, authData); + final var isOriginalAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, originalRole, + policy, operation, authData); + return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, + (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); + } else { + return provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData); + } + } + + /** * @deprecated - will be removed after 2.12. Use async variant. */ diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index 50783c4d133..976e7b7ee12 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -41,6 +41,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthPolicies; import org.apache.pulsar.common.policies.data.BrokerOperation; +import org.apache.pulsar.common.policies.data.ClusterOperation; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.PolicyName; import org.apache.pulsar.common.policies.data.PolicyOperation; @@ -860,4 +861,18 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider { }); }); } + + @Override + public CompletableFuture<Boolean> allowClusterOperationAsync(String clusterName, ClusterOperation clusterOperation, + String role, AuthenticationDataSource authData) { + return isSuperUser(role, authData, conf); + } + + @Override + public CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String clusterName, String role, + PolicyName policy, + PolicyOperation operation, + AuthenticationDataSource authData) { + return isSuperUser(role, authData, conf); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 132c99ce16b..43af241e2ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -63,11 +63,14 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.ClusterOperation; import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl; import org.apache.pulsar.common.util.FutureUtil; @@ -117,7 +120,7 @@ public class ClustersBase extends AdminResource { public void getCluster(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.GET_CLUSTER) .thenCompose(__ -> clusterResources().getClusterAsync(cluster)) .thenAccept(clusterData -> { asyncResponse.resume(clusterData @@ -162,7 +165,7 @@ public class ClustersBase extends AdminResource { ) ) ) ClusterDataImpl clusterData) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.CREATE_CLUSTER) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> { NamedEntity.checkName(cluster); @@ -227,7 +230,7 @@ public class ClustersBase extends AdminResource { ) ) ) ClusterDataImpl clusterData) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.UPDATE_CLUSTER) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> { try { @@ -271,7 +274,7 @@ public class ClustersBase extends AdminResource { required = true ) @PathParam("cluster") String cluster) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.CLUSTER_MIGRATION, PolicyOperation.READ) .thenCompose(__ -> clusterResources().getClusterPoliciesResources().getClusterPoliciesAsync(cluster)) .thenAccept(policies -> { asyncResponse.resume( @@ -326,7 +329,7 @@ public class ClustersBase extends AdminResource { asyncResponse.resume(new RestException(Status.BAD_REQUEST, "Cluster url must not be empty")); return; } - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.CLUSTER_MIGRATION, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster, old -> { @@ -376,7 +379,7 @@ public class ClustersBase extends AdminResource { "cluster-b" ]"""))) LinkedHashSet<String> peerClusterNames) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.UPDATE_PEER_CLUSTER) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, peerClusterNames)) .thenAccept(__ -> { @@ -437,7 +440,7 @@ public class ClustersBase extends AdminResource { public void getPeerCluster(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.GET_PEER_CLUSTER) .thenCompose(__ -> clusterResources().getClusterAsync(cluster)) .thenAccept(clusterOpt -> { ClusterData clusterData = @@ -466,7 +469,7 @@ public class ClustersBase extends AdminResource { public void deleteCluster(@Suspended AsyncResponse asyncResponse, @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.DELETE_CLUSTER) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> internalDeleteClusterAsync(cluster)) .thenAccept(__ -> { @@ -525,7 +528,7 @@ public class ClustersBase extends AdminResource { @Suspended AsyncResponse asyncResponse, @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ) .thenCompose(__ -> validateClusterExistAsync(cluster, Status.NOT_FOUND)) .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster)) .thenAccept(asyncResponse::resume) @@ -583,7 +586,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The name of the namespace isolation policy", required = true) @PathParam("policyName") String policyName ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ) .thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED)) .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster)) .thenAccept(policies -> { @@ -619,7 +622,7 @@ public class ClustersBase extends AdminResource { @Suspended AsyncResponse asyncResponse, @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ) .thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED)) .thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync()) .thenCompose(availableBrokers -> internalGetNamespaceIsolationPolicies(cluster) @@ -676,7 +679,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The broker name (<broker-hostname>:<web-service-port>)", required = true, example = "broker1:8080") @PathParam("broker") String broker) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ) .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster)) .thenApply(policies -> internalGetBrokerNsIsolationData(broker, policies)) @@ -711,7 +714,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The namespace isolation policy data", required = true) NamespaceIsolationDataImpl policyData ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) .thenCompose(__ -> { @@ -875,7 +878,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The namespace isolation policy name", required = true) @PathParam("policyName") String policyName ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterPolicyOperation(cluster, PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE) .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster)) @@ -925,7 +928,7 @@ public class ClustersBase extends AdminResource { @PathParam("domainName") String domainName, @ApiParam(value = "The configuration data of a failure domain", required = true) FailureDomainImpl domain ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.UPDATE_FAILURE_DOMAIN) .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster, domainName, domain)) .thenCompose(__ -> clusterResources().getFailureDomainResources() @@ -968,7 +971,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.GET_FAILURE_DOMAIN) .thenCompose(__ -> clusterResources().getFailureDomainResources() .listFailureDomainsAsync(cluster) .thenCompose(domainNames -> { @@ -1024,7 +1027,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The failure domain name", required = true) @PathParam("domainName") String domainName ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.GET_FAILURE_DOMAIN) .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) .thenCompose(__ -> clusterResources().getFailureDomainResources() .getFailureDomainAsync(cluster, domainName)) @@ -1059,7 +1062,7 @@ public class ClustersBase extends AdminResource { @ApiParam(value = "The failure domain name", required = true) @PathParam("domainName") String domainName ) { - validateSuperUserAccessAsync() + validateBothSuperuserAndClusterOperation(cluster, ClusterOperation.DELETE_FAILURE_DOMAIN) .thenCompose(__ -> validateClusterExistAsync(cluster, PRECONDITION_FAILED)) .thenCompose(__ -> clusterResources() .getFailureDomainResources().deleteFailureDomainAsync(cluster, domainName)) @@ -1125,5 +1128,126 @@ public class ClustersBase extends AdminResource { }); } + + + private CompletableFuture<Void> validateBothSuperuserAndClusterOperation(String clusterName, + ClusterOperation operation) { + final var superUserAccessValidation = validateSuperUserAccessAsync(); + final var clusterOperationValidation = validateClusterOperationAsync(clusterName, operation); + return FutureUtil.waitForAll(List.of(superUserAccessValidation, clusterOperationValidation)) + .handle((result, err) -> { + if (!superUserAccessValidation.isCompletedExceptionally() + || !clusterOperationValidation.isCompletedExceptionally()) { + return null; + } + if (log.isDebugEnabled()) { + Throwable superUserValidationException = null; + try { + superUserAccessValidation.join(); + } catch (Throwable ex) { + superUserValidationException = FutureUtil.unwrapCompletionException(ex); + } + Throwable clusterOperationValidationException = null; + try { + clusterOperationValidation.join(); + } catch (Throwable ex) { + clusterOperationValidationException = FutureUtil.unwrapCompletionException(ex); + } + log.debug("validateBothSuperuserAndClusterOperation failed." + + " originalPrincipal={} clientAppId={} operation={} cluster={} " + + "superuserValidationError={} clusterOperationValidationError={}", + originalPrincipal(), clientAppId(), operation.toString(), clusterName, + superUserValidationException, clusterOperationValidationException); + } + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateBothSuperuserAndClusterOperation for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] on cluster [%s]", + originalPrincipal(), clientAppId(), operation.toString(), clusterName)); + }); + } + + + private CompletableFuture<Void> validateBothSuperuserAndClusterPolicyOperation(String clusterName, PolicyName name, + PolicyOperation operation) { + final var superUserAccessValidation = validateSuperUserAccessAsync(); + final var clusterOperationValidation = validateClusterPolicyOperationAsync(clusterName, name, operation); + return FutureUtil.waitForAll(List.of(superUserAccessValidation, clusterOperationValidation)) + .handle((result, err) -> { + if (!superUserAccessValidation.isCompletedExceptionally() + || !clusterOperationValidation.isCompletedExceptionally()) { + return null; + } + if (log.isDebugEnabled()) { + Throwable superUserValidationException = null; + try { + superUserAccessValidation.join(); + } catch (Throwable ex) { + superUserValidationException = FutureUtil.unwrapCompletionException(ex); + } + Throwable clusterOperationValidationException = null; + try { + clusterOperationValidation.join(); + } catch (Throwable ex) { + clusterOperationValidationException = FutureUtil.unwrapCompletionException(ex); + } + log.debug("validateBothSuperuserAndClusterPolicyOperation failed." + + " originalPrincipal={} clientAppId={} operation={} cluster={} " + + "superuserValidationError={} clusterOperationValidationError={}", + originalPrincipal(), clientAppId(), operation.toString(), clusterName, + superUserValidationException, clusterOperationValidationException); + } + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateBothSuperuserAndClusterPolicyOperation for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] on cluster [%s]", + originalPrincipal(), clientAppId(), operation.toString(), clusterName)); + }); + } + + + + + private CompletableFuture<Void> validateClusterOperationAsync(String cluster, ClusterOperation operation) { + final var pulsar = pulsar(); + if (pulsar.getBrokerService().isAuthenticationEnabled() + && pulsar.getBrokerService().isAuthorizationEnabled()) { + return pulsar.getBrokerService().getAuthorizationService() + .allowClusterOperationAsync(cluster, operation, originalPrincipal(), + clientAppId(), clientAuthData()) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateClusterOperation for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] on cluster [%s]", + originalPrincipal(), clientAppId(), operation.toString(), cluster)); + } + }); + } + return CompletableFuture.completedFuture(null); + } + + private CompletableFuture<Void> validateClusterPolicyOperationAsync(String cluster, PolicyName policyName, + PolicyOperation operation) { + final var pulsar = pulsar(); + if (pulsar.getBrokerService().isAuthenticationEnabled() + && pulsar.getBrokerService().isAuthorizationEnabled()) { + return pulsar.getBrokerService().getAuthorizationService() + .allowClusterPolicyOperationAsync(cluster, policyName, operation, originalPrincipal(), + clientAppId(), clientAuthData()) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.UNAUTHORIZED, + String.format("Unauthorized to validateClusterPolicyOperation for" + + " originalPrincipal [%s] and clientAppId [%s] " + + "about operation [%s] on cluster [%s]", + originalPrincipal(), clientAppId(), operation.toString(), cluster)); + } + }); + } + return CompletableFuture.completedFuture(null); + } + private static final Logger log = LoggerFactory.getLogger(ClustersBase.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java new file mode 100644 index 00000000000..ccf5ccb9481 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterOperation; +import org.apache.pulsar.common.policies.data.ClusterPolicies; +import org.apache.pulsar.common.policies.data.FailureDomain; +import org.apache.pulsar.common.policies.data.NamespaceIsolationData; +import org.apache.pulsar.common.policies.data.PolicyName; +import org.apache.pulsar.common.policies.data.PolicyOperation; +import org.apache.pulsar.security.MockedPulsarStandalone; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.LinkedHashSet; +import java.util.UUID; + + +@Test(groups = "broker-admin") +public class ClusterEndpointsAuthorizationTest extends MockedPulsarStandalone { + + private AuthorizationService orignalAuthorizationService; + private AuthorizationService spyAuthorizationService; + + private PulsarAdmin superUserAdmin; + private PulsarAdmin nobodyAdmin; + + @SneakyThrows + @BeforeClass(alwaysRun = true) + public void setup() { + configureTokenAuthentication(); + configureDefaultAuthorization(); + start(); + this.superUserAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(SUPER_USER_TOKEN)) + .build(); + this.nobodyAdmin = PulsarAdmin.builder() + .serviceHttpUrl(getPulsarService().getWebServiceAddress()) + .authentication(new AuthenticationToken(NOBODY_TOKEN)) + .build(); + } + + @BeforeMethod(alwaysRun = true) + public void before() throws IllegalAccessException { + orignalAuthorizationService = getPulsarService().getBrokerService().getAuthorizationService(); + spyAuthorizationService = spy(orignalAuthorizationService); + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", + spyAuthorizationService, true); + } + + @AfterMethod(alwaysRun = true) + public void after() throws IllegalAccessException { + if (orignalAuthorizationService != null) { + FieldUtils.writeField(getPulsarService().getBrokerService(), "authorizationService", orignalAuthorizationService, true); + } + } + + @SneakyThrows + @AfterClass(alwaysRun = true) + public void cleanup() { + if (superUserAdmin != null) { + superUserAdmin.close(); + superUserAdmin = null; + } + spyAuthorizationService = null; + orignalAuthorizationService = null; + super.close(); + } + + + @Test + public void testGetCluster() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getCluster(clusterName); + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.GET_CLUSTER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getCluster(clusterName)); + } + + + @Test + public void testCreateCluster() throws PulsarAdminException { + final String clusterName = UUID.randomUUID().toString(); + superUserAdmin.clusters().createCluster(clusterName, ClusterData.builder().build()); + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.CREATE_CLUSTER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().createCluster(clusterName, ClusterData.builder().build())); + } + + @Test + public void testUpdateCluster() { + final String clusterName = UUID.randomUUID().toString(); + try { + superUserAdmin.clusters().updateCluster(clusterName, ClusterData.builder().serviceUrl("aaa").build()); + } catch (Throwable ignore) { + + } + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.UPDATE_CLUSTER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().updateCluster(clusterName, ClusterData.builder().build())); + } + + + @Test + public void testGetClusterMigration() { + final String clusterName = UUID.randomUUID().toString(); + try { + superUserAdmin.clusters().getClusterMigration(clusterName); + } catch (Throwable ignore) { + + } + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), eq(PolicyName.CLUSTER_MIGRATION), + eq(PolicyOperation.READ), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getClusterMigration(clusterName)); + } + + + @Test + public void testUpdateClusterMigration() throws PulsarAdminException { + final String clusterName = UUID.randomUUID().toString(); + superUserAdmin.clusters().createCluster(clusterName, ClusterData.builder().build()); + Mockito.clearInvocations(spyAuthorizationService); + + superUserAdmin.clusters().updateClusterMigration(clusterName, false, new ClusterPolicies.ClusterUrl()); + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), eq(PolicyName.CLUSTER_MIGRATION), + eq(PolicyOperation.WRITE), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters() + .updateClusterMigration(clusterName, false, new ClusterPolicies.ClusterUrl())); + } + + @Test + public void testSetPeerClusterNames() throws PulsarAdminException { + final LinkedHashSet<String> linkedHashSet = new LinkedHashSet<>(); + linkedHashSet.add("a"); + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + try { + superUserAdmin.clusters().updatePeerClusterNames(clusterName, linkedHashSet); + } catch (Throwable ignore) { + + } + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), + eq(ClusterOperation.UPDATE_PEER_CLUSTER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().updatePeerClusterNames(clusterName, linkedHashSet)); + } + + @Test + public void testGetPeerCluster() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getPeerClusterNames(clusterName); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), + eq(ClusterOperation.GET_PEER_CLUSTER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getPeerClusterNames(clusterName)); + } + + @Test + public void testDeleteCluster() throws PulsarAdminException { + final String clusterName = UUID.randomUUID().toString(); + superUserAdmin.clusters().createCluster(clusterName, ClusterData.builder().build()); + Mockito.clearInvocations(spyAuthorizationService); + + superUserAdmin.clusters().deleteCluster(clusterName); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), + eq(ClusterOperation.DELETE_CLUSTER), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().deleteCluster(clusterName)); + } + + + @Test + public void testGetNamespaceIsolationPolicies() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getNamespaceIsolationPolicies(clusterName); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), + eq(PolicyName.NAMESPACE_ISOLATION), eq(PolicyOperation.READ), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getNamespaceIsolationPolicies(clusterName)); + } + + + @Test + public void testGetNamespaceIsolationPolicy() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getNamespaceIsolationPolicy(clusterName, ""); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), + eq(PolicyName.NAMESPACE_ISOLATION), eq(PolicyOperation.READ), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getNamespaceIsolationPolicy(clusterName, "")); + } + + + @Test + public void testGetBrokersWithNamespaceIsolationPolicy() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getBrokersWithNamespaceIsolationPolicy(clusterName); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), + eq(PolicyName.NAMESPACE_ISOLATION), eq(PolicyOperation.READ), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getBrokersWithNamespaceIsolationPolicy(clusterName)); + } + + + @Test + public void testGetBrokerWithNamespaceIsolationPolicy() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getBrokerWithNamespaceIsolationPolicy(clusterName, getPulsarService().getBrokerId()); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), + eq(PolicyName.NAMESPACE_ISOLATION), eq(PolicyOperation.READ), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getBrokerWithNamespaceIsolationPolicy(clusterName, "")); + } + + + @Test + public void testSetNamespaceIsolationPolicy() { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + + try { + superUserAdmin.clusters().updateNamespaceIsolationPolicy(clusterName, "test", + NamespaceIsolationData.builder().build()); + } catch (Throwable ignore) { + + } + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), + eq(PolicyName.NAMESPACE_ISOLATION), eq(PolicyOperation.WRITE), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().updateNamespaceIsolationPolicy(clusterName, "test", + NamespaceIsolationData.builder().build())); + } + + @Test + public void testDeleteNamespaceIsolationPolicy() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().deleteNamespaceIsolationPolicy(clusterName, "test"); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterPolicyOperationAsync(eq(clusterName), + eq(PolicyName.NAMESPACE_ISOLATION), eq(PolicyOperation.WRITE), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().deleteNamespaceIsolationPolicy(clusterName, "test")); + } + + + @Test + public void testSetFailureDomain() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().updateFailureDomain(clusterName, "test", FailureDomain.builder().build()); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.UPDATE_FAILURE_DOMAIN), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().updateFailureDomain(clusterName, + "test", FailureDomain.builder().build())); + } + + @Test + public void testGetFailureDomains() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + superUserAdmin.clusters().getFailureDomains(clusterName); + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.GET_FAILURE_DOMAIN), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getFailureDomains(clusterName)); + } + + + @Test + public void testGetDomain() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + try { + superUserAdmin.clusters().getFailureDomain(clusterName, "test"); + } catch (Throwable ignore) { + + } + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.GET_FAILURE_DOMAIN), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().getFailureDomain(clusterName, "test")); + } + + @Test + public void testDeleteFailureDomain() throws PulsarAdminException { + final String clusterName = getPulsarService().getConfiguration().getClusterName(); + try { + superUserAdmin.clusters().deleteFailureDomain(clusterName, "test"); + } catch (Throwable ignore) { + + } + + // test allow cluster operation + verify(spyAuthorizationService) + .allowClusterOperationAsync(eq(clusterName), eq(ClusterOperation.DELETE_FAILURE_DOMAIN), any(), any(), any()); + // fallback to superuser + verify(spyAuthorizationService).isSuperUser(any(), any()); + // ---- test nobody + Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class, + () -> nobodyAdmin.clusters().deleteFailureDomain(clusterName, "test")); + } + + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java similarity index 54% copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java index 86ab545215e..bbdc64f729e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java @@ -18,40 +18,17 @@ */ package org.apache.pulsar.common.policies.data; -/** - * PolicyName authorization operations. - */ -public enum PolicyName { - ALL, - ANTI_AFFINITY, - AUTO_SUBSCRIPTION_CREATION, - AUTO_TOPIC_CREATION, - BACKLOG, - COMPACTION, - DELAYED_DELIVERY, - INACTIVE_TOPIC, - DEDUPLICATION, - MAX_CONSUMERS, - MAX_PRODUCERS, - DEDUPLICATION_SNAPSHOT, - MAX_UNACKED, - MAX_SUBSCRIPTIONS, - OFFLOAD, - PARTITION, - PERSISTENCE, - RATE, - RETENTION, - REPLICATION, - REPLICATION_RATE, - SCHEMA_COMPATIBILITY_STRATEGY, - SUBSCRIPTION_AUTH_MODE, - SUBSCRIPTION_EXPIRATION_TIME, - ENCRYPTION, - TTL, - MAX_TOPICS, - RESOURCEGROUP, - ENTRY_FILTERS, - SHADOW_TOPIC, - DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, - ALLOW_CLUSTERS +public enum ClusterOperation { + LIST_CLUSTERS, + GET_CLUSTER, + CREATE_CLUSTER, + UPDATE_CLUSTER, + DELETE_CLUSTER, + + // detailed update + GET_PEER_CLUSTER, + UPDATE_PEER_CLUSTER, + GET_FAILURE_DOMAIN, + UPDATE_FAILURE_DOMAIN, + DELETE_FAILURE_DOMAIN } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java index 86ab545215e..d77f92eb032 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java @@ -53,5 +53,9 @@ public enum PolicyName { ENTRY_FILTERS, SHADOW_TOPIC, DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT, - ALLOW_CLUSTERS + ALLOW_CLUSTERS, + + // cluster policies + CLUSTER_MIGRATION, + NAMESPACE_ISOLATION, }
