This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new dae1e7dae19 [fix][broker] support missing cluster level fine-granted
permissions (#23675)
dae1e7dae19 is described below
commit dae1e7dae193907b7ec4af79049e463344c09131
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Dec 4 10:16:21 2024 +0800
[fix][broker] support missing cluster level fine-granted permissions
(#23675)
(cherry picked from commit 1c1a5cc655511c22b3399005d8ff9102b3553627)
---
.../authorization/AuthorizationProvider.java | 20 +
.../broker/authorization/AuthorizationService.java | 45 +++
.../authorization/PulsarAuthorizationProvider.java | 15 +
.../pulsar/broker/admin/impl/ClustersBase.java | 160 +++++++-
.../admin/ClusterEndpointsAuthorizationTest.java | 428 +++++++++++++++++++++
.../{PolicyName.java => ClusterOperation.java} | 49 +--
.../pulsar/common/policies/data/PolicyName.java | 6 +-
7 files changed, 668 insertions(+), 55 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 48386265940..cb61292f8e3 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BrokerOperation;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -393,4 +394,23 @@ public interface AuthorizationProvider extends Closeable {
return FutureUtil.failedFuture(
new UnsupportedOperationException("allowBrokerOperationAsync
is not supported yet."));
}
+
+
+ default CompletableFuture<Boolean> allowClusterOperationAsync(String
clusterName,
+
ClusterOperation clusterOperation,
+ String role,
+
AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new UnsupportedOperationException("allowClusterOperationAsync
is not supported yet."));
+ }
+
+ default CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String
clusterName,
+ String
role,
+
PolicyName policy,
+
PolicyOperation operation,
+
AuthenticationDataSource authData) {
+ return FutureUtil.failedFuture(
+ new IllegalStateException("ClusterPolicyOperation [" +
policy.name() + "/" + operation.name() + "] "
+ + "is not supported by the
Authorization provider you are using."));
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 1348a405b0d..40573d99d60 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BrokerOperation;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -567,6 +568,50 @@ public class AuthorizationService {
}
}
+ public CompletableFuture<Boolean> allowClusterOperationAsync(String
clusterName,
+
ClusterOperation clusterOperation,
+ String
originalRole,
+ String role,
+
AuthenticationDataSource authData) {
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ if (isProxyRole(role)) {
+ final var isRoleAuthorizedFuture =
provider.allowClusterOperationAsync(clusterName,
+ clusterOperation, role, authData);
+ final var isOriginalAuthorizedFuture =
provider.allowClusterOperationAsync(clusterName,
+ clusterOperation, originalRole, authData);
+ return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return provider.allowClusterOperationAsync(clusterName,
clusterOperation, role, authData);
+ }
+ }
+
+ public CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String
clusterName,
+
PolicyName policy,
+
PolicyOperation operation,
+ String
originalRole,
+ String role,
+
AuthenticationDataSource authData) {
+ if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ if (isProxyRole(role)) {
+ final var isRoleAuthorizedFuture =
provider.allowClusterPolicyOperationAsync(clusterName, role,
+ policy, operation, authData);
+ final var isOriginalAuthorizedFuture =
provider.allowClusterPolicyOperationAsync(clusterName, originalRole,
+ policy, operation, authData);
+ return
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+ (isRoleAuthorized, isOriginalAuthorized) ->
isRoleAuthorized && isOriginalAuthorized);
+ } else {
+ return provider.allowClusterPolicyOperationAsync(clusterName,
role, policy, operation, authData);
+ }
+ }
+
+
/**
* @deprecated - will be removed after 2.12. Use async variant.
*/
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 50783c4d133..976e7b7ee12 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AuthPolicies;
import org.apache.pulsar.common.policies.data.BrokerOperation;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -860,4 +861,18 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
});
});
}
+
+ @Override
+ public CompletableFuture<Boolean> allowClusterOperationAsync(String
clusterName, ClusterOperation clusterOperation,
+ String role,
AuthenticationDataSource authData) {
+ return isSuperUser(role, authData, conf);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String
clusterName, String role,
+
PolicyName policy,
+
PolicyOperation operation,
+
AuthenticationDataSource authData) {
+ return isSuperUser(role, authData, conf);
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index b261033ca52..d24a3255b55 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -63,11 +63,14 @@ import
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
@@ -117,7 +120,7 @@ public class ClustersBase extends AdminResource {
public void getCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required =
true)
@PathParam("cluster") String cluster) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.GET_CLUSTER)
.thenCompose(__ -> clusterResources().getClusterAsync(cluster))
.thenAccept(clusterData -> {
asyncResponse.resume(clusterData
@@ -162,7 +165,7 @@ public class ClustersBase extends AdminResource {
)
)
) ClusterDataImpl clusterData) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.CREATE_CLUSTER)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
NamedEntity.checkName(cluster);
@@ -227,7 +230,7 @@ public class ClustersBase extends AdminResource {
)
)
) ClusterDataImpl clusterData) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.UPDATE_CLUSTER)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> {
try {
@@ -271,7 +274,7 @@ public class ClustersBase extends AdminResource {
required = true
)
@PathParam("cluster") String cluster) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.CLUSTER_MIGRATION, PolicyOperation.READ)
.thenCompose(__ ->
clusterResources().getClusterPoliciesResources().getClusterPoliciesAsync(cluster))
.thenAccept(policies -> {
asyncResponse.resume(
@@ -326,7 +329,7 @@ public class ClustersBase extends AdminResource {
asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Cluster url must not be empty"));
return;
}
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.CLUSTER_MIGRATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ ->
clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster,
old -> {
@@ -376,7 +379,7 @@ public class ClustersBase extends AdminResource {
"cluster-b"
]""")))
LinkedHashSet<String> peerClusterNames) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.UPDATE_PEER_CLUSTER)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster,
peerClusterNames))
.thenAccept(__ -> {
@@ -437,7 +440,7 @@ public class ClustersBase extends AdminResource {
public void getPeerCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required
= true)
@PathParam("cluster") String cluster) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.GET_PEER_CLUSTER)
.thenCompose(__ -> clusterResources().getClusterAsync(cluster))
.thenAccept(clusterOpt -> {
ClusterData clusterData =
@@ -466,7 +469,7 @@ public class ClustersBase extends AdminResource {
public void deleteCluster(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required =
true)
@PathParam("cluster") String cluster) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.DELETE_CLUSTER)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> internalDeleteClusterAsync(cluster))
.thenAccept(__ -> {
@@ -525,7 +528,7 @@ public class ClustersBase extends AdminResource {
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
.thenCompose(__ -> validateClusterExistAsync(cluster,
Status.NOT_FOUND))
.thenCompose(__ ->
internalGetNamespaceIsolationPolicies(cluster))
.thenAccept(asyncResponse::resume)
@@ -583,7 +586,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The name of the namespace isolation policy",
required = true)
@PathParam("policyName") String policyName
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
.thenCompose(__ -> validateClusterExistAsync(cluster,
Status.PRECONDITION_FAILED))
.thenCompose(__ ->
internalGetNamespaceIsolationPolicies(cluster))
.thenAccept(policies -> {
@@ -619,7 +622,7 @@ public class ClustersBase extends AdminResource {
@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
.thenCompose(__ -> validateClusterExistAsync(cluster,
Status.PRECONDITION_FAILED))
.thenCompose(__ ->
pulsar().getLoadManager().get().getAvailableBrokersAsync())
.thenCompose(availableBrokers ->
internalGetNamespaceIsolationPolicies(cluster)
@@ -676,7 +679,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The broker name
(<broker-hostname>:<web-service-port>)", required = true,
example = "broker1:8080")
@PathParam("broker") String broker) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
.thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
.thenCompose(__ ->
internalGetNamespaceIsolationPolicies(cluster))
.thenApply(policies ->
internalGetBrokerNsIsolationData(broker, policies))
@@ -711,7 +714,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The namespace isolation policy data", required =
true)
NamespaceIsolationDataImpl policyData
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
.thenCompose(__ -> {
@@ -874,7 +877,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The namespace isolation policy name", required =
true)
@PathParam("policyName") String policyName
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterPolicyOperation(cluster,
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE)
.thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ ->
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
@@ -924,7 +927,7 @@ public class ClustersBase extends AdminResource {
@PathParam("domainName") String domainName,
@ApiParam(value = "The configuration data of a failure domain",
required = true) FailureDomainImpl domain
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.UPDATE_FAILURE_DOMAIN)
.thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
.thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster,
domainName, domain))
.thenCompose(__ ->
clusterResources().getFailureDomainResources()
@@ -967,7 +970,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The cluster name", required = true)
@PathParam("cluster") String cluster
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.GET_FAILURE_DOMAIN)
.thenCompose(__ ->
clusterResources().getFailureDomainResources()
.listFailureDomainsAsync(cluster)
.thenCompose(domainNames -> {
@@ -1023,7 +1026,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.GET_FAILURE_DOMAIN)
.thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
.thenCompose(__ ->
clusterResources().getFailureDomainResources()
.getFailureDomainAsync(cluster, domainName))
@@ -1058,7 +1061,7 @@ public class ClustersBase extends AdminResource {
@ApiParam(value = "The failure domain name", required = true)
@PathParam("domainName") String domainName
) {
- validateSuperUserAccessAsync()
+ validateBothSuperuserAndClusterOperation(cluster,
ClusterOperation.DELETE_FAILURE_DOMAIN)
.thenCompose(__ -> validateClusterExistAsync(cluster,
PRECONDITION_FAILED))
.thenCompose(__ -> clusterResources()
.getFailureDomainResources().deleteFailureDomainAsync(cluster, domainName))
@@ -1124,5 +1127,126 @@ public class ClustersBase extends AdminResource {
});
}
+
+
+ private CompletableFuture<Void>
validateBothSuperuserAndClusterOperation(String clusterName,
+
ClusterOperation operation) {
+ final var superUserAccessValidation = validateSuperUserAccessAsync();
+ final var clusterOperationValidation =
validateClusterOperationAsync(clusterName, operation);
+ return FutureUtil.waitForAll(List.of(superUserAccessValidation,
clusterOperationValidation))
+ .handle((result, err) -> {
+ if (!superUserAccessValidation.isCompletedExceptionally()
+ ||
!clusterOperationValidation.isCompletedExceptionally()) {
+ return null;
+ }
+ if (log.isDebugEnabled()) {
+ Throwable superUserValidationException = null;
+ try {
+ superUserAccessValidation.join();
+ } catch (Throwable ex) {
+ superUserValidationException =
FutureUtil.unwrapCompletionException(ex);
+ }
+ Throwable clusterOperationValidationException = null;
+ try {
+ clusterOperationValidation.join();
+ } catch (Throwable ex) {
+ clusterOperationValidationException =
FutureUtil.unwrapCompletionException(ex);
+ }
+ log.debug("validateBothSuperuserAndClusterOperation
failed."
+ + " originalPrincipal={} clientAppId={}
operation={} cluster={} "
+ + "superuserValidationError={}
clusterOperationValidationError={}",
+ originalPrincipal(), clientAppId(),
operation.toString(), clusterName,
+ superUserValidationException,
clusterOperationValidationException);
+ }
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Unauthorized to
validateBothSuperuserAndClusterOperation for"
+ + " originalPrincipal [%s] and
clientAppId [%s] "
+ + "about operation [%s] on cluster
[%s]",
+ originalPrincipal(), clientAppId(),
operation.toString(), clusterName));
+ });
+ }
+
+
+ private CompletableFuture<Void>
validateBothSuperuserAndClusterPolicyOperation(String clusterName, PolicyName
name,
+
PolicyOperation operation) {
+ final var superUserAccessValidation = validateSuperUserAccessAsync();
+ final var clusterOperationValidation =
validateClusterPolicyOperationAsync(clusterName, name, operation);
+ return FutureUtil.waitForAll(List.of(superUserAccessValidation,
clusterOperationValidation))
+ .handle((result, err) -> {
+ if (!superUserAccessValidation.isCompletedExceptionally()
+ ||
!clusterOperationValidation.isCompletedExceptionally()) {
+ return null;
+ }
+ if (log.isDebugEnabled()) {
+ Throwable superUserValidationException = null;
+ try {
+ superUserAccessValidation.join();
+ } catch (Throwable ex) {
+ superUserValidationException =
FutureUtil.unwrapCompletionException(ex);
+ }
+ Throwable clusterOperationValidationException = null;
+ try {
+ clusterOperationValidation.join();
+ } catch (Throwable ex) {
+ clusterOperationValidationException =
FutureUtil.unwrapCompletionException(ex);
+ }
+
log.debug("validateBothSuperuserAndClusterPolicyOperation failed."
+ + " originalPrincipal={} clientAppId={}
operation={} cluster={} "
+ + "superuserValidationError={}
clusterOperationValidationError={}",
+ originalPrincipal(), clientAppId(),
operation.toString(), clusterName,
+ superUserValidationException,
clusterOperationValidationException);
+ }
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Unauthorized to
validateBothSuperuserAndClusterPolicyOperation for"
+ + " originalPrincipal [%s] and
clientAppId [%s] "
+ + "about operation [%s] on cluster
[%s]",
+ originalPrincipal(), clientAppId(),
operation.toString(), clusterName));
+ });
+ }
+
+
+
+
+ private CompletableFuture<Void> validateClusterOperationAsync(String
cluster, ClusterOperation operation) {
+ final var pulsar = pulsar();
+ if (pulsar.getBrokerService().isAuthenticationEnabled()
+ && pulsar.getBrokerService().isAuthorizationEnabled()) {
+ return pulsar.getBrokerService().getAuthorizationService()
+ .allowClusterOperationAsync(cluster, operation,
originalPrincipal(),
+ clientAppId(), clientAuthData())
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Unauthorized to
validateClusterOperation for"
+ + " originalPrincipal [%s]
and clientAppId [%s] "
+ + "about operation [%s] on
cluster [%s]",
+ originalPrincipal(),
clientAppId(), operation.toString(), cluster));
+ }
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private CompletableFuture<Void> validateClusterPolicyOperationAsync(String
cluster, PolicyName policyName,
+
PolicyOperation operation) {
+ final var pulsar = pulsar();
+ if (pulsar.getBrokerService().isAuthenticationEnabled()
+ && pulsar.getBrokerService().isAuthorizationEnabled()) {
+ return pulsar.getBrokerService().getAuthorizationService()
+ .allowClusterPolicyOperationAsync(cluster, policyName,
operation, originalPrincipal(),
+ clientAppId(), clientAuthData())
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Unauthorized to
validateClusterPolicyOperation for"
+ + " originalPrincipal [%s]
and clientAppId [%s] "
+ + "about operation [%s] on
cluster [%s]",
+ originalPrincipal(),
clientAppId(), operation.toString(), cluster));
+ }
+ });
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
private static final Logger log =
LoggerFactory.getLogger(ClustersBase.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
new file mode 100644
index 00000000000..ccf5ccb9481
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.LinkedHashSet;
+import java.util.UUID;
+
+
+@Test(groups = "broker-admin")
+public class ClusterEndpointsAuthorizationTest extends MockedPulsarStandalone {
+
+ private AuthorizationService orignalAuthorizationService;
+ private AuthorizationService spyAuthorizationService;
+
+ private PulsarAdmin superUserAdmin;
+ private PulsarAdmin nobodyAdmin;
+
+ @SneakyThrows
+ @BeforeClass(alwaysRun = true)
+ public void setup() {
+ configureTokenAuthentication();
+ configureDefaultAuthorization();
+ start();
+ this.superUserAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+ .build();
+ this.nobodyAdmin = PulsarAdmin.builder()
+ .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+ .authentication(new AuthenticationToken(NOBODY_TOKEN))
+ .build();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void before() throws IllegalAccessException {
+ orignalAuthorizationService =
getPulsarService().getBrokerService().getAuthorizationService();
+ spyAuthorizationService = spy(orignalAuthorizationService);
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
"authorizationService",
+ spyAuthorizationService, true);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void after() throws IllegalAccessException {
+ if (orignalAuthorizationService != null) {
+ FieldUtils.writeField(getPulsarService().getBrokerService(),
"authorizationService", orignalAuthorizationService, true);
+ }
+ }
+
+ @SneakyThrows
+ @AfterClass(alwaysRun = true)
+ public void cleanup() {
+ if (superUserAdmin != null) {
+ superUserAdmin.close();
+ superUserAdmin = null;
+ }
+ spyAuthorizationService = null;
+ orignalAuthorizationService = null;
+ super.close();
+ }
+
+
+ @Test
+ public void testGetCluster() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().getCluster(clusterName);
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.GET_CLUSTER), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().getCluster(clusterName));
+ }
+
+
+ @Test
+ public void testCreateCluster() throws PulsarAdminException {
+ final String clusterName = UUID.randomUUID().toString();
+ superUserAdmin.clusters().createCluster(clusterName,
ClusterData.builder().build());
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.CREATE_CLUSTER), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().createCluster(clusterName,
ClusterData.builder().build()));
+ }
+
+ @Test
+ public void testUpdateCluster() {
+ final String clusterName = UUID.randomUUID().toString();
+ try {
+ superUserAdmin.clusters().updateCluster(clusterName,
ClusterData.builder().serviceUrl("aaa").build());
+ } catch (Throwable ignore) {
+
+ }
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.UPDATE_CLUSTER), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().updateCluster(clusterName,
ClusterData.builder().build()));
+ }
+
+
+ @Test
+ public void testGetClusterMigration() {
+ final String clusterName = UUID.randomUUID().toString();
+ try {
+ superUserAdmin.clusters().getClusterMigration(clusterName);
+ } catch (Throwable ignore) {
+
+ }
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
eq(PolicyName.CLUSTER_MIGRATION),
+ eq(PolicyOperation.READ), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().getClusterMigration(clusterName));
+ }
+
+
+ @Test
+ public void testUpdateClusterMigration() throws PulsarAdminException {
+ final String clusterName = UUID.randomUUID().toString();
+ superUserAdmin.clusters().createCluster(clusterName,
ClusterData.builder().build());
+ Mockito.clearInvocations(spyAuthorizationService);
+
+ superUserAdmin.clusters().updateClusterMigration(clusterName, false,
new ClusterPolicies.ClusterUrl());
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
eq(PolicyName.CLUSTER_MIGRATION),
+ eq(PolicyOperation.WRITE), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters()
+ .updateClusterMigration(clusterName, false, new
ClusterPolicies.ClusterUrl()));
+ }
+
+ @Test
+ public void testSetPeerClusterNames() throws PulsarAdminException {
+ final LinkedHashSet<String> linkedHashSet = new LinkedHashSet<>();
+ linkedHashSet.add("a");
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ try {
+ superUserAdmin.clusters().updatePeerClusterNames(clusterName,
linkedHashSet);
+ } catch (Throwable ignore) {
+
+ }
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
+ eq(ClusterOperation.UPDATE_PEER_CLUSTER), any(),
any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().updatePeerClusterNames(clusterName, linkedHashSet));
+ }
+
+ @Test
+ public void testGetPeerCluster() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().getPeerClusterNames(clusterName);
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
+ eq(ClusterOperation.GET_PEER_CLUSTER), any(), any(),
any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().getPeerClusterNames(clusterName));
+ }
+
+ @Test
+ public void testDeleteCluster() throws PulsarAdminException {
+ final String clusterName = UUID.randomUUID().toString();
+ superUserAdmin.clusters().createCluster(clusterName,
ClusterData.builder().build());
+ Mockito.clearInvocations(spyAuthorizationService);
+
+ superUserAdmin.clusters().deleteCluster(clusterName);
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
+ eq(ClusterOperation.DELETE_CLUSTER), any(), any(),
any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().deleteCluster(clusterName));
+ }
+
+
+ @Test
+ public void testGetNamespaceIsolationPolicies() throws
PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().getNamespaceIsolationPolicies(clusterName);
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
+ eq(PolicyName.NAMESPACE_ISOLATION),
eq(PolicyOperation.READ), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().getNamespaceIsolationPolicies(clusterName));
+ }
+
+
+ @Test
+ public void testGetNamespaceIsolationPolicy() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().getNamespaceIsolationPolicy(clusterName, "");
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
+ eq(PolicyName.NAMESPACE_ISOLATION),
eq(PolicyOperation.READ), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().getNamespaceIsolationPolicy(clusterName, ""));
+ }
+
+
+ @Test
+ public void testGetBrokersWithNamespaceIsolationPolicy() throws
PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+
superUserAdmin.clusters().getBrokersWithNamespaceIsolationPolicy(clusterName);
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
+ eq(PolicyName.NAMESPACE_ISOLATION),
eq(PolicyOperation.READ), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().getBrokersWithNamespaceIsolationPolicy(clusterName));
+ }
+
+
+ @Test
+ public void testGetBrokerWithNamespaceIsolationPolicy() throws
PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+
superUserAdmin.clusters().getBrokerWithNamespaceIsolationPolicy(clusterName,
getPulsarService().getBrokerId());
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
+ eq(PolicyName.NAMESPACE_ISOLATION),
eq(PolicyOperation.READ), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().getBrokerWithNamespaceIsolationPolicy(clusterName, ""));
+ }
+
+
+ @Test
+ public void testSetNamespaceIsolationPolicy() {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+
+ try {
+
superUserAdmin.clusters().updateNamespaceIsolationPolicy(clusterName, "test",
+ NamespaceIsolationData.builder().build());
+ } catch (Throwable ignore) {
+
+ }
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
+ eq(PolicyName.NAMESPACE_ISOLATION),
eq(PolicyOperation.WRITE), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().updateNamespaceIsolationPolicy(clusterName, "test",
+ NamespaceIsolationData.builder().build()));
+ }
+
+ @Test
+ public void testDeleteNamespaceIsolationPolicy() throws
PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().deleteNamespaceIsolationPolicy(clusterName,
"test");
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterPolicyOperationAsync(eq(clusterName),
+ eq(PolicyName.NAMESPACE_ISOLATION),
eq(PolicyOperation.WRITE), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () ->
nobodyAdmin.clusters().deleteNamespaceIsolationPolicy(clusterName, "test"));
+ }
+
+
+ @Test
+ public void testSetFailureDomain() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().updateFailureDomain(clusterName, "test",
FailureDomain.builder().build());
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.UPDATE_FAILURE_DOMAIN), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().updateFailureDomain(clusterName,
+ "test", FailureDomain.builder().build()));
+ }
+
+ @Test
+ public void testGetFailureDomains() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ superUserAdmin.clusters().getFailureDomains(clusterName);
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.GET_FAILURE_DOMAIN), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().getFailureDomains(clusterName));
+ }
+
+
+ @Test
+ public void testGetDomain() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ try {
+ superUserAdmin.clusters().getFailureDomain(clusterName, "test");
+ } catch (Throwable ignore) {
+
+ }
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.GET_FAILURE_DOMAIN), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().getFailureDomain(clusterName,
"test"));
+ }
+
+ @Test
+ public void testDeleteFailureDomain() throws PulsarAdminException {
+ final String clusterName =
getPulsarService().getConfiguration().getClusterName();
+ try {
+ superUserAdmin.clusters().deleteFailureDomain(clusterName, "test");
+ } catch (Throwable ignore) {
+
+ }
+
+ // test allow cluster operation
+ verify(spyAuthorizationService)
+ .allowClusterOperationAsync(eq(clusterName),
eq(ClusterOperation.DELETE_FAILURE_DOMAIN), any(), any(), any());
+ // fallback to superuser
+ verify(spyAuthorizationService).isSuperUser(any(), any());
+ // ---- test nobody
+ Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+ () -> nobodyAdmin.clusters().deleteFailureDomain(clusterName,
"test"));
+ }
+
+
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java
similarity index 54%
copy from
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java
index 86ab545215e..bbdc64f729e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java
@@ -18,40 +18,17 @@
*/
package org.apache.pulsar.common.policies.data;
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
- ALL,
- ANTI_AFFINITY,
- AUTO_SUBSCRIPTION_CREATION,
- AUTO_TOPIC_CREATION,
- BACKLOG,
- COMPACTION,
- DELAYED_DELIVERY,
- INACTIVE_TOPIC,
- DEDUPLICATION,
- MAX_CONSUMERS,
- MAX_PRODUCERS,
- DEDUPLICATION_SNAPSHOT,
- MAX_UNACKED,
- MAX_SUBSCRIPTIONS,
- OFFLOAD,
- PARTITION,
- PERSISTENCE,
- RATE,
- RETENTION,
- REPLICATION,
- REPLICATION_RATE,
- SCHEMA_COMPATIBILITY_STRATEGY,
- SUBSCRIPTION_AUTH_MODE,
- SUBSCRIPTION_EXPIRATION_TIME,
- ENCRYPTION,
- TTL,
- MAX_TOPICS,
- RESOURCEGROUP,
- ENTRY_FILTERS,
- SHADOW_TOPIC,
- DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
- ALLOW_CLUSTERS
+public enum ClusterOperation {
+ LIST_CLUSTERS,
+ GET_CLUSTER,
+ CREATE_CLUSTER,
+ UPDATE_CLUSTER,
+ DELETE_CLUSTER,
+
+ // detailed update
+ GET_PEER_CLUSTER,
+ UPDATE_PEER_CLUSTER,
+ GET_FAILURE_DOMAIN,
+ UPDATE_FAILURE_DOMAIN,
+ DELETE_FAILURE_DOMAIN
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 86ab545215e..d77f92eb032 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -53,5 +53,9 @@ public enum PolicyName {
ENTRY_FILTERS,
SHADOW_TOPIC,
DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
- ALLOW_CLUSTERS
+ ALLOW_CLUSTERS,
+
+ // cluster policies
+ CLUSTER_MIGRATION,
+ NAMESPACE_ISOLATION,
}