This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d07752276665508a7571ea0afaaa0e5fbedb3281
Author: Qiang Zhao <[email protected]>
AuthorDate: Wed Dec 4 10:16:21 2024 +0800

    [fix][broker] support missing cluster level fine-granted permissions 
(#23675)
    
    (cherry picked from commit 1c1a5cc655511c22b3399005d8ff9102b3553627)
---
 .../authorization/AuthorizationProvider.java       |  20 +
 .../broker/authorization/AuthorizationService.java |  45 +++
 .../authorization/PulsarAuthorizationProvider.java |  15 +
 .../pulsar/broker/admin/impl/ClustersBase.java     | 160 +++++++-
 .../admin/ClusterEndpointsAuthorizationTest.java   | 428 +++++++++++++++++++++
 .../{PolicyName.java => ClusterOperation.java}     |  49 +--
 .../pulsar/common/policies/data/PolicyName.java    |   6 +-
 7 files changed, 668 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 48386265940..cb61292f8e3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BrokerOperation;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -393,4 +394,23 @@ public interface AuthorizationProvider extends Closeable {
         return FutureUtil.failedFuture(
                 new UnsupportedOperationException("allowBrokerOperationAsync 
is not supported yet."));
     }
+
+
+    default CompletableFuture<Boolean> allowClusterOperationAsync(String 
clusterName,
+                                                                  
ClusterOperation clusterOperation,
+                                                                  String role,
+                                                                  
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+                new UnsupportedOperationException("allowClusterOperationAsync 
is not supported yet."));
+    }
+
+    default CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String 
clusterName,
+                                                                        String 
role,
+                                                                        
PolicyName policy,
+                                                                        
PolicyOperation operation,
+                                                                        
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+                new IllegalStateException("ClusterPolicyOperation [" + 
policy.name() + "/" + operation.name() + "] "
+                                          + "is not supported by the 
Authorization provider you are using."));
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 1348a405b0d..40573d99d60 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BrokerOperation;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -567,6 +568,50 @@ public class AuthorizationService {
         }
     }
 
+    public CompletableFuture<Boolean> allowClusterOperationAsync(String 
clusterName,
+                                                                
ClusterOperation clusterOperation,
+                                                                String 
originalRole,
+                                                                String role,
+                                                                
AuthenticationDataSource authData) {
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        if (isProxyRole(role)) {
+            final var isRoleAuthorizedFuture = 
provider.allowClusterOperationAsync(clusterName,
+                    clusterOperation, role, authData);
+            final var isOriginalAuthorizedFuture =  
provider.allowClusterOperationAsync(clusterName,
+                    clusterOperation, originalRole, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return provider.allowClusterOperationAsync(clusterName, 
clusterOperation, role, authData);
+        }
+    }
+
+    public CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String 
clusterName,
+                                                                       
PolicyName policy,
+                                                                       
PolicyOperation operation,
+                                                                 String 
originalRole,
+                                                                 String role,
+                                                                 
AuthenticationDataSource authData) {
+        if (!isValidOriginalPrincipal(role, originalRole, authData)) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        if (isProxyRole(role)) {
+            final var isRoleAuthorizedFuture = 
provider.allowClusterPolicyOperationAsync(clusterName, role,
+                    policy, operation, authData);
+            final var isOriginalAuthorizedFuture =  
provider.allowClusterPolicyOperationAsync(clusterName, originalRole,
+                    policy, operation, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return provider.allowClusterPolicyOperationAsync(clusterName, 
role, policy, operation, authData);
+        }
+    }
+
+
     /**
      * @deprecated - will be removed after 2.12. Use async variant.
      */
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 50783c4d133..976e7b7ee12 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AuthPolicies;
 import org.apache.pulsar.common.policies.data.BrokerOperation;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
@@ -860,4 +861,18 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                     });
         });
     }
+
+    @Override
+    public CompletableFuture<Boolean> allowClusterOperationAsync(String 
clusterName, ClusterOperation clusterOperation,
+                                                                 String role, 
AuthenticationDataSource authData) {
+        return isSuperUser(role, authData, conf);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> allowClusterPolicyOperationAsync(String 
clusterName, String role,
+                                                                       
PolicyName policy,
+                                                                       
PolicyOperation operation,
+                                                                       
AuthenticationDataSource authData) {
+        return isSuperUser(role, authData, conf);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 132c99ce16b..43af241e2ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -63,11 +63,14 @@ import 
org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
 import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl;
 import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import 
org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -117,7 +120,7 @@ public class ClustersBase extends AdminResource {
     public void getCluster(@Suspended AsyncResponse asyncResponse,
                            @ApiParam(value = "The cluster name", required = 
true)
                            @PathParam("cluster") String cluster) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.GET_CLUSTER)
                 .thenCompose(__ -> clusterResources().getClusterAsync(cluster))
                 .thenAccept(clusterData -> {
                     asyncResponse.resume(clusterData
@@ -162,7 +165,7 @@ public class ClustersBase extends AdminResource {
                 )
             )
         ) ClusterDataImpl clusterData) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.CREATE_CLUSTER)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> {
                     NamedEntity.checkName(cluster);
@@ -227,7 +230,7 @@ public class ClustersBase extends AdminResource {
                 )
             )
         ) ClusterDataImpl clusterData) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.UPDATE_CLUSTER)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> {
                     try {
@@ -271,7 +274,7 @@ public class ClustersBase extends AdminResource {
             required = true
         )
         @PathParam("cluster") String cluster) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.CLUSTER_MIGRATION, PolicyOperation.READ)
                 .thenCompose(__ -> 
clusterResources().getClusterPoliciesResources().getClusterPoliciesAsync(cluster))
                 .thenAccept(policies -> {
                     asyncResponse.resume(
@@ -326,7 +329,7 @@ public class ClustersBase extends AdminResource {
             asyncResponse.resume(new RestException(Status.BAD_REQUEST, 
"Cluster url must not be empty"));
             return;
         }
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.CLUSTER_MIGRATION, PolicyOperation.WRITE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> 
clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster,
                         old -> {
@@ -376,7 +379,7 @@ public class ClustersBase extends AdminResource {
                                                    "cluster-b"
                                                 ]""")))
                                     LinkedHashSet<String> peerClusterNames) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.UPDATE_PEER_CLUSTER)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> innerSetPeerClusterNamesAsync(cluster, 
peerClusterNames))
                 .thenAccept(__ -> {
@@ -437,7 +440,7 @@ public class ClustersBase extends AdminResource {
     public void getPeerCluster(@Suspended AsyncResponse asyncResponse,
                                @ApiParam(value = "The cluster name", required 
= true)
                                @PathParam("cluster") String cluster) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.GET_PEER_CLUSTER)
                 .thenCompose(__ -> clusterResources().getClusterAsync(cluster))
                 .thenAccept(clusterOpt -> {
                     ClusterData clusterData =
@@ -466,7 +469,7 @@ public class ClustersBase extends AdminResource {
     public void deleteCluster(@Suspended AsyncResponse asyncResponse,
                               @ApiParam(value = "The cluster name", required = 
true)
                               @PathParam("cluster") String cluster) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.DELETE_CLUSTER)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> internalDeleteClusterAsync(cluster))
                 .thenAccept(__ -> {
@@ -525,7 +528,7 @@ public class ClustersBase extends AdminResource {
         @Suspended AsyncResponse asyncResponse,
         @ApiParam(value = "The cluster name", required = true) 
@PathParam("cluster") String cluster
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
Status.NOT_FOUND))
                 .thenCompose(__ -> 
internalGetNamespaceIsolationPolicies(cluster))
                 .thenAccept(asyncResponse::resume)
@@ -583,7 +586,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The name of the namespace isolation policy", 
required = true)
         @PathParam("policyName") String policyName
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
Status.PRECONDITION_FAILED))
                 .thenCompose(__ -> 
internalGetNamespaceIsolationPolicies(cluster))
                 .thenAccept(policies -> {
@@ -619,7 +622,7 @@ public class ClustersBase extends AdminResource {
             @Suspended AsyncResponse asyncResponse,
             @ApiParam(value = "The cluster name", required = true)
             @PathParam("cluster") String cluster) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
Status.PRECONDITION_FAILED))
                 .thenCompose(__ -> 
pulsar().getLoadManager().get().getAvailableBrokersAsync())
                 .thenCompose(availableBrokers -> 
internalGetNamespaceIsolationPolicies(cluster)
@@ -676,7 +679,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The broker name 
(<broker-hostname>:<web-service-port>)", required = true,
             example = "broker1:8080")
         @PathParam("broker") String broker) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.READ)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
                 .thenCompose(__ -> 
internalGetNamespaceIsolationPolicies(cluster))
                 .thenApply(policies -> 
internalGetBrokerNsIsolationData(broker, policies))
@@ -711,7 +714,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The namespace isolation policy data", required = 
true)
         NamespaceIsolationDataImpl policyData
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE)
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
                 .thenCompose(__ -> {
@@ -875,7 +878,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The namespace isolation policy name", required = 
true)
         @PathParam("policyName") String policyName
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterPolicyOperation(cluster, 
PolicyName.NAMESPACE_ISOLATION, PolicyOperation.WRITE)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
                 .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
                 .thenCompose(__ -> 
namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster))
@@ -925,7 +928,7 @@ public class ClustersBase extends AdminResource {
         @PathParam("domainName") String domainName,
         @ApiParam(value = "The configuration data of a failure domain", 
required = true) FailureDomainImpl domain
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.UPDATE_FAILURE_DOMAIN)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
                 .thenCompose(__ -> validateBrokerExistsInOtherDomain(cluster, 
domainName, domain))
                 .thenCompose(__ -> 
clusterResources().getFailureDomainResources()
@@ -968,7 +971,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The cluster name", required = true)
         @PathParam("cluster") String cluster
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.GET_FAILURE_DOMAIN)
                 .thenCompose(__ -> 
clusterResources().getFailureDomainResources()
                         .listFailureDomainsAsync(cluster)
                         .thenCompose(domainNames -> {
@@ -1024,7 +1027,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.GET_FAILURE_DOMAIN)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
                 .thenCompose(__ -> 
clusterResources().getFailureDomainResources()
                         .getFailureDomainAsync(cluster, domainName))
@@ -1059,7 +1062,7 @@ public class ClustersBase extends AdminResource {
         @ApiParam(value = "The failure domain name", required = true)
         @PathParam("domainName") String domainName
     ) {
-        validateSuperUserAccessAsync()
+        validateBothSuperuserAndClusterOperation(cluster, 
ClusterOperation.DELETE_FAILURE_DOMAIN)
                 .thenCompose(__ -> validateClusterExistAsync(cluster, 
PRECONDITION_FAILED))
                 .thenCompose(__ -> clusterResources()
                         
.getFailureDomainResources().deleteFailureDomainAsync(cluster, domainName))
@@ -1125,5 +1128,126 @@ public class ClustersBase extends AdminResource {
                 });
     }
 
+
+
+    private CompletableFuture<Void> 
validateBothSuperuserAndClusterOperation(String clusterName,
+                                                                             
ClusterOperation operation) {
+        final var superUserAccessValidation = validateSuperUserAccessAsync();
+        final var clusterOperationValidation = 
validateClusterOperationAsync(clusterName, operation);
+        return FutureUtil.waitForAll(List.of(superUserAccessValidation, 
clusterOperationValidation))
+                .handle((result, err) -> {
+                    if (!superUserAccessValidation.isCompletedExceptionally()
+                        || 
!clusterOperationValidation.isCompletedExceptionally()) {
+                        return null;
+                    }
+                    if (log.isDebugEnabled()) {
+                        Throwable superUserValidationException = null;
+                        try {
+                            superUserAccessValidation.join();
+                        } catch (Throwable ex) {
+                            superUserValidationException = 
FutureUtil.unwrapCompletionException(ex);
+                        }
+                        Throwable clusterOperationValidationException = null;
+                        try {
+                            clusterOperationValidation.join();
+                        } catch (Throwable ex) {
+                            clusterOperationValidationException = 
FutureUtil.unwrapCompletionException(ex);
+                        }
+                        log.debug("validateBothSuperuserAndClusterOperation 
failed."
+                                  + " originalPrincipal={} clientAppId={} 
operation={} cluster={} "
+                                  + "superuserValidationError={} 
clusterOperationValidationError={}",
+                                originalPrincipal(), clientAppId(), 
operation.toString(), clusterName,
+                                superUserValidationException, 
clusterOperationValidationException);
+                    }
+                    throw new RestException(Status.UNAUTHORIZED,
+                            String.format("Unauthorized to 
validateBothSuperuserAndClusterOperation for"
+                                          + " originalPrincipal [%s] and 
clientAppId [%s] "
+                                          + "about operation [%s] on cluster 
[%s]",
+                                    originalPrincipal(), clientAppId(), 
operation.toString(), clusterName));
+                });
+    }
+
+
+    private CompletableFuture<Void> 
validateBothSuperuserAndClusterPolicyOperation(String clusterName, PolicyName 
name,
+                                                                               
    PolicyOperation operation) {
+        final var superUserAccessValidation = validateSuperUserAccessAsync();
+        final var clusterOperationValidation = 
validateClusterPolicyOperationAsync(clusterName, name, operation);
+        return FutureUtil.waitForAll(List.of(superUserAccessValidation, 
clusterOperationValidation))
+                .handle((result, err) -> {
+                    if (!superUserAccessValidation.isCompletedExceptionally()
+                        || 
!clusterOperationValidation.isCompletedExceptionally()) {
+                        return null;
+                    }
+                    if (log.isDebugEnabled()) {
+                        Throwable superUserValidationException = null;
+                        try {
+                            superUserAccessValidation.join();
+                        } catch (Throwable ex) {
+                            superUserValidationException = 
FutureUtil.unwrapCompletionException(ex);
+                        }
+                        Throwable clusterOperationValidationException = null;
+                        try {
+                            clusterOperationValidation.join();
+                        } catch (Throwable ex) {
+                            clusterOperationValidationException = 
FutureUtil.unwrapCompletionException(ex);
+                        }
+                        
log.debug("validateBothSuperuserAndClusterPolicyOperation failed."
+                                  + " originalPrincipal={} clientAppId={} 
operation={} cluster={} "
+                                  + "superuserValidationError={} 
clusterOperationValidationError={}",
+                                originalPrincipal(), clientAppId(), 
operation.toString(), clusterName,
+                                superUserValidationException, 
clusterOperationValidationException);
+                    }
+                    throw new RestException(Status.UNAUTHORIZED,
+                            String.format("Unauthorized to 
validateBothSuperuserAndClusterPolicyOperation for"
+                                          + " originalPrincipal [%s] and 
clientAppId [%s] "
+                                          + "about operation [%s] on cluster 
[%s]",
+                                    originalPrincipal(), clientAppId(), 
operation.toString(), clusterName));
+                });
+    }
+
+
+
+
+    private CompletableFuture<Void> validateClusterOperationAsync(String 
cluster, ClusterOperation operation) {
+        final var pulsar = pulsar();
+        if (pulsar.getBrokerService().isAuthenticationEnabled()
+            && pulsar.getBrokerService().isAuthorizationEnabled()) {
+            return pulsar.getBrokerService().getAuthorizationService()
+                    .allowClusterOperationAsync(cluster, operation, 
originalPrincipal(),
+                            clientAppId(), clientAuthData())
+                    .thenAccept(isAuthorized -> {
+                        if (!isAuthorized) {
+                            throw new RestException(Status.UNAUTHORIZED,
+                                    String.format("Unauthorized to 
validateClusterOperation for"
+                                                  + " originalPrincipal [%s] 
and clientAppId [%s] "
+                                                  + "about operation [%s] on 
cluster [%s]",
+                                            originalPrincipal(), 
clientAppId(), operation.toString(), cluster));
+                        }
+                    });
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> validateClusterPolicyOperationAsync(String 
cluster, PolicyName policyName,
+                                                                        
PolicyOperation operation) {
+        final var pulsar = pulsar();
+        if (pulsar.getBrokerService().isAuthenticationEnabled()
+            && pulsar.getBrokerService().isAuthorizationEnabled()) {
+            return pulsar.getBrokerService().getAuthorizationService()
+                    .allowClusterPolicyOperationAsync(cluster, policyName, 
operation, originalPrincipal(),
+                            clientAppId(), clientAuthData())
+                    .thenAccept(isAuthorized -> {
+                        if (!isAuthorized) {
+                            throw new RestException(Status.UNAUTHORIZED,
+                                    String.format("Unauthorized to 
validateClusterPolicyOperation for"
+                                                  + " originalPrincipal [%s] 
and clientAppId [%s] "
+                                                  + "about operation [%s] on 
cluster [%s]",
+                                            originalPrincipal(), 
clientAppId(), operation.toString(), cluster));
+                        }
+                    });
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ClustersBase.class);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
new file mode 100644
index 00000000000..ccf5ccb9481
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ClusterEndpointsAuthorizationTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterOperation;
+import org.apache.pulsar.common.policies.data.ClusterPolicies;
+import org.apache.pulsar.common.policies.data.FailureDomain;
+import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.security.MockedPulsarStandalone;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.LinkedHashSet;
+import java.util.UUID;
+
+
+@Test(groups = "broker-admin")
+public class ClusterEndpointsAuthorizationTest extends MockedPulsarStandalone {
+
+    private AuthorizationService orignalAuthorizationService;
+    private AuthorizationService spyAuthorizationService;
+
+    private PulsarAdmin superUserAdmin;
+    private PulsarAdmin nobodyAdmin;
+
+    @SneakyThrows
+    @BeforeClass(alwaysRun = true)
+    public void setup() {
+        configureTokenAuthentication();
+        configureDefaultAuthorization();
+        start();
+        this.superUserAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(SUPER_USER_TOKEN))
+                .build();
+        this.nobodyAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(getPulsarService().getWebServiceAddress())
+                .authentication(new AuthenticationToken(NOBODY_TOKEN))
+                .build();
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void before() throws IllegalAccessException {
+        orignalAuthorizationService = 
getPulsarService().getBrokerService().getAuthorizationService();
+        spyAuthorizationService = spy(orignalAuthorizationService);
+        FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService",
+                spyAuthorizationService, true);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void after() throws IllegalAccessException {
+        if (orignalAuthorizationService != null) {
+            FieldUtils.writeField(getPulsarService().getBrokerService(), 
"authorizationService", orignalAuthorizationService, true);
+        }
+    }
+
+    @SneakyThrows
+    @AfterClass(alwaysRun = true)
+    public void cleanup() {
+        if (superUserAdmin != null) {
+            superUserAdmin.close();
+            superUserAdmin = null;
+        }
+        spyAuthorizationService = null;
+        orignalAuthorizationService = null;
+        super.close();
+    }
+
+
+    @Test
+    public void testGetCluster() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().getCluster(clusterName);
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.GET_CLUSTER), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().getCluster(clusterName));
+    }
+
+
+    @Test
+    public void testCreateCluster() throws PulsarAdminException {
+        final String clusterName = UUID.randomUUID().toString();
+        superUserAdmin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.CREATE_CLUSTER), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().createCluster(clusterName, 
ClusterData.builder().build()));
+    }
+
+    @Test
+    public void testUpdateCluster() {
+        final String clusterName = UUID.randomUUID().toString();
+        try {
+            superUserAdmin.clusters().updateCluster(clusterName, 
ClusterData.builder().serviceUrl("aaa").build());
+        } catch (Throwable ignore) {
+
+        }
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.UPDATE_CLUSTER), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().updateCluster(clusterName, 
ClusterData.builder().build()));
+    }
+
+
+    @Test
+    public void testGetClusterMigration() {
+        final String clusterName = UUID.randomUUID().toString();
+        try {
+            superUserAdmin.clusters().getClusterMigration(clusterName);
+        } catch (Throwable ignore) {
+
+        }
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName), 
eq(PolicyName.CLUSTER_MIGRATION),
+                        eq(PolicyOperation.READ), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().getClusterMigration(clusterName));
+    }
+
+
+    @Test
+    public void testUpdateClusterMigration() throws PulsarAdminException {
+        final String clusterName = UUID.randomUUID().toString();
+        superUserAdmin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+        Mockito.clearInvocations(spyAuthorizationService);
+
+        superUserAdmin.clusters().updateClusterMigration(clusterName, false, 
new ClusterPolicies.ClusterUrl());
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName), 
eq(PolicyName.CLUSTER_MIGRATION),
+                        eq(PolicyOperation.WRITE), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters()
+                        .updateClusterMigration(clusterName, false, new 
ClusterPolicies.ClusterUrl()));
+    }
+
+    @Test
+    public void testSetPeerClusterNames() throws PulsarAdminException {
+        final LinkedHashSet<String> linkedHashSet = new LinkedHashSet<>();
+        linkedHashSet.add("a");
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        try {
+            superUserAdmin.clusters().updatePeerClusterNames(clusterName, 
linkedHashSet);
+        } catch (Throwable ignore) {
+
+        }
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName),
+                        eq(ClusterOperation.UPDATE_PEER_CLUSTER), any(), 
any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().updatePeerClusterNames(clusterName, linkedHashSet));
+    }
+
+    @Test
+    public void testGetPeerCluster() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().getPeerClusterNames(clusterName);
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName),
+                        eq(ClusterOperation.GET_PEER_CLUSTER), any(), any(), 
any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().getPeerClusterNames(clusterName));
+    }
+
+    @Test
+    public void testDeleteCluster() throws PulsarAdminException {
+        final String clusterName = UUID.randomUUID().toString();
+        superUserAdmin.clusters().createCluster(clusterName, 
ClusterData.builder().build());
+        Mockito.clearInvocations(spyAuthorizationService);
+
+        superUserAdmin.clusters().deleteCluster(clusterName);
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName),
+                        eq(ClusterOperation.DELETE_CLUSTER), any(), any(), 
any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().deleteCluster(clusterName));
+    }
+
+
+    @Test
+    public void testGetNamespaceIsolationPolicies() throws 
PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().getNamespaceIsolationPolicies(clusterName);
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName),
+                        eq(PolicyName.NAMESPACE_ISOLATION), 
eq(PolicyOperation.READ), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().getNamespaceIsolationPolicies(clusterName));
+    }
+
+
+    @Test
+    public void testGetNamespaceIsolationPolicy() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().getNamespaceIsolationPolicy(clusterName, "");
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName),
+                        eq(PolicyName.NAMESPACE_ISOLATION), 
eq(PolicyOperation.READ), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().getNamespaceIsolationPolicy(clusterName, ""));
+    }
+
+
+    @Test
+    public void testGetBrokersWithNamespaceIsolationPolicy() throws 
PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        
superUserAdmin.clusters().getBrokersWithNamespaceIsolationPolicy(clusterName);
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName),
+                        eq(PolicyName.NAMESPACE_ISOLATION), 
eq(PolicyOperation.READ), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().getBrokersWithNamespaceIsolationPolicy(clusterName));
+    }
+
+
+    @Test
+    public void testGetBrokerWithNamespaceIsolationPolicy() throws 
PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        
superUserAdmin.clusters().getBrokerWithNamespaceIsolationPolicy(clusterName, 
getPulsarService().getBrokerId());
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName),
+                        eq(PolicyName.NAMESPACE_ISOLATION), 
eq(PolicyOperation.READ), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().getBrokerWithNamespaceIsolationPolicy(clusterName, ""));
+    }
+
+
+    @Test
+    public void testSetNamespaceIsolationPolicy() {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+
+        try {
+            
superUserAdmin.clusters().updateNamespaceIsolationPolicy(clusterName, "test",
+                    NamespaceIsolationData.builder().build());
+        } catch (Throwable ignore) {
+
+        }
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName),
+                        eq(PolicyName.NAMESPACE_ISOLATION), 
eq(PolicyOperation.WRITE), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().updateNamespaceIsolationPolicy(clusterName, "test",
+                        NamespaceIsolationData.builder().build()));
+    }
+
+    @Test
+    public void testDeleteNamespaceIsolationPolicy() throws 
PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().deleteNamespaceIsolationPolicy(clusterName, 
"test");
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterPolicyOperationAsync(eq(clusterName),
+                        eq(PolicyName.NAMESPACE_ISOLATION), 
eq(PolicyOperation.WRITE), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> 
nobodyAdmin.clusters().deleteNamespaceIsolationPolicy(clusterName, "test"));
+    }
+
+
+    @Test
+    public void testSetFailureDomain() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().updateFailureDomain(clusterName, "test", 
FailureDomain.builder().build());
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.UPDATE_FAILURE_DOMAIN), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().updateFailureDomain(clusterName,
+                        "test", FailureDomain.builder().build()));
+    }
+
+    @Test
+    public void testGetFailureDomains() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        superUserAdmin.clusters().getFailureDomains(clusterName);
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.GET_FAILURE_DOMAIN), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().getFailureDomains(clusterName));
+    }
+
+
+    @Test
+    public void testGetDomain() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        try {
+            superUserAdmin.clusters().getFailureDomain(clusterName, "test");
+        } catch (Throwable ignore) {
+
+        }
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.GET_FAILURE_DOMAIN), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().getFailureDomain(clusterName, 
"test"));
+    }
+
+    @Test
+    public void testDeleteFailureDomain() throws PulsarAdminException {
+        final String clusterName = 
getPulsarService().getConfiguration().getClusterName();
+        try {
+            superUserAdmin.clusters().deleteFailureDomain(clusterName, "test");
+        } catch (Throwable ignore) {
+
+        }
+
+        // test allow cluster operation
+        verify(spyAuthorizationService)
+                .allowClusterOperationAsync(eq(clusterName), 
eq(ClusterOperation.DELETE_FAILURE_DOMAIN), any(), any(), any());
+        // fallback to superuser
+        verify(spyAuthorizationService).isSuperUser(any(), any());
+        // ---- test nobody
+        Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> nobodyAdmin.clusters().deleteFailureDomain(clusterName, 
"test"));
+    }
+
+
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java
similarity index 54%
copy from 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java
index 86ab545215e..bbdc64f729e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterOperation.java
@@ -18,40 +18,17 @@
  */
 package org.apache.pulsar.common.policies.data;
 
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
-    ALL,
-    ANTI_AFFINITY,
-    AUTO_SUBSCRIPTION_CREATION,
-    AUTO_TOPIC_CREATION,
-    BACKLOG,
-    COMPACTION,
-    DELAYED_DELIVERY,
-    INACTIVE_TOPIC,
-    DEDUPLICATION,
-    MAX_CONSUMERS,
-    MAX_PRODUCERS,
-    DEDUPLICATION_SNAPSHOT,
-    MAX_UNACKED,
-    MAX_SUBSCRIPTIONS,
-    OFFLOAD,
-    PARTITION,
-    PERSISTENCE,
-    RATE,
-    RETENTION,
-    REPLICATION,
-    REPLICATION_RATE,
-    SCHEMA_COMPATIBILITY_STRATEGY,
-    SUBSCRIPTION_AUTH_MODE,
-    SUBSCRIPTION_EXPIRATION_TIME,
-    ENCRYPTION,
-    TTL,
-    MAX_TOPICS,
-    RESOURCEGROUP,
-    ENTRY_FILTERS,
-    SHADOW_TOPIC,
-    DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
-    ALLOW_CLUSTERS
+public enum ClusterOperation {
+    LIST_CLUSTERS,
+    GET_CLUSTER,
+    CREATE_CLUSTER,
+    UPDATE_CLUSTER,
+    DELETE_CLUSTER,
+
+    // detailed update
+    GET_PEER_CLUSTER,
+    UPDATE_PEER_CLUSTER,
+    GET_FAILURE_DOMAIN,
+    UPDATE_FAILURE_DOMAIN,
+    DELETE_FAILURE_DOMAIN
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 86ab545215e..d77f92eb032 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -53,5 +53,9 @@ public enum PolicyName {
     ENTRY_FILTERS,
     SHADOW_TOPIC,
     DISPATCHER_PAUSE_ON_ACK_STATE_PERSISTENT,
-    ALLOW_CLUSTERS
+    ALLOW_CLUSTERS,
+
+    // cluster policies
+    CLUSTER_MIGRATION,
+    NAMESPACE_ISOLATION,
 }

Reply via email to