This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dc5de8  [Issue 5720][authz] - add topics authz granularity (#7523)
5dc5de8 is described below

commit 5dc5de849b582c7f312764db043da9483c17146a
Author: Alexandre DUVAL <[email protected]>
AuthorDate: Sat May 8 10:23:32 2021 +0200

    [Issue 5720][authz] - add topics authz granularity (#7523)
    
    Fixes a part of #5720
    
    ### Motivation
    
    add granularity in topics api authz
---
 .../authentication/AuthenticationDataHttp.java     |  20 ++
 .../authorization/AuthorizationProvider.java       |  43 ++++-
 .../broker/authorization/AuthorizationService.java | 163 ++++++++++++----
 .../authorization/PulsarAuthorizationProvider.java | 108 +++++++----
 .../apache/pulsar/broker/admin/AdminResource.java  |  18 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   2 +-
 .../broker/admin/impl/PersistentTopicsBase.java    | 208 ++++++++++-----------
 .../broker/admin/v1/NonPersistentTopics.java       |  17 +-
 .../broker/admin/v2/NonPersistentTopics.java       |  19 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   9 +-
 .../pulsar/broker/lookup/TopicLookupBase.java      |   9 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  53 +++++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   1 -
 .../pulsar/broker/service/ServerCnxTest.java       |   1 +
 .../api/AuthenticatedProducerConsumerTest.java     |  11 +-
 .../api/AuthorizationProducerConsumerTest.java     |  21 ++-
 .../pulsar/common/policies/data/PolicyName.java    |   1 +
 .../common/policies/data/TopicOperation.java       |   3 +
 18 files changed, 471 insertions(+), 236 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
index a990b9e..958e5ea 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationDataHttp.java
@@ -28,6 +28,8 @@ public class AuthenticationDataHttp implements 
AuthenticationDataSource {
     protected final HttpServletRequest request;
     protected final SocketAddress remoteAddress;
 
+    protected String subscription;
+
     public AuthenticationDataHttp(HttpServletRequest request) {
         if (request == null) {
             throw new IllegalArgumentException();
@@ -69,4 +71,22 @@ public class AuthenticationDataHttp implements 
AuthenticationDataSource {
         return remoteAddress;
     }
 
+    /*
+     * Subscription
+     */
+    @Override
+    public boolean hasSubscription() {
+        return this.subscription != null;
+    }
+
+    @Override
+    public void setSubscription(String subscription) {
+        this.subscription = subscription;
+    }
+
+    @Override
+    public String getSubscription() {
+        return subscription;
+    }
+
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 0403f34..f8e73c3 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 
@@ -293,7 +294,8 @@ public interface AuthorizationProvider extends Closeable {
                                                                     
NamespaceOperation operation,
                                                                     
AuthenticationDataSource authData) {
         return FutureUtil.failedFuture(
-            new IllegalStateException("NamespaceOperation is not supported by 
the Authorization provider you are using."));
+            new IllegalStateException("NamespaceOperation [" + 
operation.name() + "] is not supported by "
+                    + "the Authorization provider you are using."));
     }
 
     default Boolean allowNamespaceOperation(NamespaceName namespaceName,
@@ -363,7 +365,8 @@ public interface AuthorizationProvider extends Closeable {
                                                                           
String role,
                                                                           
AuthenticationDataSource authData) {
         return FutureUtil.failedFuture(
-                new IllegalStateException("NamespacePolicyOperation is not 
supported by the Authorization provider you are using."));
+                new IllegalStateException("NamespacePolicyOperation  [" + 
policy.name() + "/" + operation.name() + "] "
+                        + "is not supported by is not supported by the 
Authorization provider you are using."));
     }
 
     default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
@@ -436,7 +439,8 @@ public interface AuthorizationProvider extends Closeable {
                                                                 TopicOperation 
operation,
                                                                 
AuthenticationDataSource authData) {
         return FutureUtil.failedFuture(
-            new IllegalStateException("TopicOperation is not supported by the 
Authorization provider you are using."));
+            new IllegalStateException("TopicOperation [" + operation.name() + 
"] is not supported by the Authorization"
+                    + "provider you are using."));
     }
 
     default Boolean allowTopicOperation(TopicName topicName,
@@ -489,4 +493,37 @@ public interface AuthorizationProvider extends Closeable {
             throw new RestException(e.getCause());
         }
     }
+
+    /**
+     * Check if a given <tt>role</tt> is allowed to execute a given topic 
<tt>operation</tt> on topic's <tt>policy</tt>.
+     *
+     * @param topic topic name
+     * @param role role name
+     * @param operation topic operation
+     * @param authData authenticated data
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> 
allowTopicPolicyOperationAsync(TopicName topic,
+                                                                      String 
role,
+                                                                      
PolicyName policy,
+                                                                      
PolicyOperation operation,
+                                                                      
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+                new IllegalStateException("TopicPolicyOperation [" + 
policy.name() + "/" + operation.name() + "] "
+                        + "is not supported by the Authorization provider you 
are using."));
+    }
+
+    default Boolean allowTopicPolicyOperation(TopicName topicName,
+                                              String role,
+                                              PolicyName policy,
+                                              PolicyOperation operation,
+                                              AuthenticationDataSource 
authData) {
+        try {
+            return allowTopicPolicyOperationAsync(topicName, role, policy, 
operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 75c759f..403542f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pulsar.broker.authorization;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -30,10 +26,10 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -41,6 +37,11 @@ import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
@@ -103,7 +104,7 @@ public class AuthorizationService {
      *             when failed to grant permission
      */
     public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespace, Set<AuthAction> actions, String role,
-            String authDataJson) {
+                                                        String authDataJson) {
 
         if (provider != null) {
             return provider.grantPermissionAsync(namespace, actions, role, 
authDataJson);
@@ -122,7 +123,7 @@ public class AuthorizationService {
      * @return
      */
     public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
-            Set<String> roles, String authDataJson) {
+                                                                    
Set<String> roles, String authDataJson) {
 
         if (provider != null) {
             return provider.grantSubscriptionPermissionAsync(namespace, 
subscriptionName, roles, authDataJson);
@@ -139,7 +140,7 @@ public class AuthorizationService {
      * @return
      */
     public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
-            String role, String authDataJson) {
+                                                                     String 
role, String authDataJson) {
         if (provider != null) {
             return provider.revokeSubscriptionPermissionAsync(namespace, 
subscriptionName, role, authDataJson);
         }
@@ -158,7 +159,7 @@ public class AuthorizationService {
      *             when failed to grant permission
      */
     public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, 
Set<AuthAction> actions, String role,
-            String authDataJson) {
+                                                        String authDataJson) {
 
         if (provider != null) {
             return provider.grantPermissionAsync(topicname, actions, role, 
authDataJson);
@@ -176,7 +177,7 @@ public class AuthorizationService {
      *            the app id used to send messages to the topic.
      */
     public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, 
String role,
-            AuthenticationDataSource authenticationData) {
+                                                      AuthenticationDataSource 
authenticationData) {
 
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
@@ -204,7 +205,7 @@ public class AuthorizationService {
      *            the subscription name defined by the client
      */
     public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, 
String role,
-            AuthenticationDataSource authenticationData, String subscription) {
+                                                      AuthenticationDataSource 
authenticationData, String subscription) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
@@ -237,7 +238,7 @@ public class AuthorizationService {
     }
 
     public boolean canConsume(TopicName topicName, String role, 
AuthenticationDataSource authenticationData,
-            String subscription) throws Exception {
+                              String subscription) throws Exception {
         try {
             return canConsumeAsync(topicName, role, authenticationData, 
subscription)
                     .get(conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
@@ -289,7 +290,7 @@ public class AuthorizationService {
      * @throws Exception
      */
     public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, 
String role,
-            AuthenticationDataSource authenticationData) {
+                                                     AuthenticationDataSource 
authenticationData) {
         CompletableFuture<Boolean> finalResult = new 
CompletableFuture<Boolean>();
         canProduceAsync(topicName, role, 
authenticationData).whenComplete((produceAuthorized, ex) -> {
             if (ex == null) {
@@ -327,7 +328,7 @@ public class AuthorizationService {
     }
 
     public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName 
namespaceName, String role,
-                                                       
AuthenticationDataSource authenticationData) {
+                                                            
AuthenticationDataSource authenticationData) {
         return provider.allowFunctionOpsAsync(namespaceName, role, 
authenticationData);
     }
 
@@ -396,11 +397,11 @@ public class AuthorizationService {
         validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTenantOperationAsync(
-                tenantName, operation, role, authData);
+                    tenantName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTenantOperationAsync(
-                tenantName, operation, originalRole, authData);
+                    tenantName, operation, originalRole, authData);
             return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
-                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized 
&& isOriginalAuthorized);
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
         } else {
             return allowTenantOperationAsync(tenantName, operation, role, 
authData);
         }
@@ -413,7 +414,7 @@ public class AuthorizationService {
                                         AuthenticationDataSource authData) {
         try {
             return allowTenantOperationAsync(
-                tenantName, operation, originalRole, role, authData).get();
+                    tenantName, operation, originalRole, role, authData).get();
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -457,11 +458,11 @@ public class AuthorizationService {
         validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespaceOperationAsync(
-                namespaceName, operation, role, authData);
+                    namespaceName, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespaceOperationAsync(
-                namespaceName, operation, originalRole, authData);
+                    namespaceName, operation, originalRole, authData);
             return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
-                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized 
&& isOriginalAuthorized);
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
         } else {
             return allowNamespaceOperationAsync(namespaceName, operation, 
role, authData);
         }
@@ -474,7 +475,7 @@ public class AuthorizationService {
                                            AuthenticationDataSource authData) {
         try {
             return allowNamespaceOperationAsync(
-                namespaceName, operation, originalRole, role, authData).get();
+                    namespaceName, operation, originalRole, role, 
authData).get();
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -520,11 +521,11 @@ public class AuthorizationService {
         validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
         if (isProxyRole(role)) {
             CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
-                namespaceName, policy, operation, role, authData);
+                    namespaceName, policy, operation, role, authData);
             CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
-                namespaceName, policy, operation, originalRole, authData);
+                    namespaceName, policy, operation, originalRole, authData);
             return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
-                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized 
&& isOriginalAuthorized);
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
         } else {
             return allowNamespacePolicyOperationAsync(namespaceName, policy, 
operation, role, authData);
         }
@@ -538,7 +539,71 @@ public class AuthorizationService {
                                                  AuthenticationDataSource 
authData) {
         try {
             return allowNamespacePolicyOperationAsync(
-                namespaceName, policy, operation, originalRole, role, 
authData).get();
+                    namespaceName, policy, operation, originalRole, role, 
authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
+     * Grant authorization-action permission on a topic to the given client
+     *
+     * @param topicName
+     * @param policy
+     * @param operation
+     * @param role
+     * @param authData additional authdata in json for targeted authorization 
provider
+     * @throws IllegalStateException when failed to grant permission
+     */
+    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName 
topicName,
+                                                                     
PolicyName policy,
+                                                                     
PolicyOperation operation,
+                                                                     String 
role,
+                                                                     
AuthenticationDataSource authData) {
+        if (!this.conf.isAuthorizationEnabled()) {
+            return CompletableFuture.completedFuture(true);
+        }
+
+        if (provider != null) {
+            return provider.allowTopicPolicyOperationAsync(topicName, role, 
policy, operation, authData);
+        }
+
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured for " +
+                "allowTopicPolicyOperationAsync"));
+    }
+
+    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName 
topicName,
+                                                                     
PolicyName policy,
+                                                                     
PolicyOperation operation,
+                                                                     String 
originalRole,
+                                                                     String 
role,
+                                                                     
AuthenticationDataSource authData) {
+
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicPolicyOperationAsync(
+                    topicName, policy, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTopicPolicyOperationAsync(
+                    topicName, policy, operation, originalRole, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return allowTopicPolicyOperationAsync(topicName, policy, 
operation, role, authData);
+        }
+    }
+
+
+    public Boolean allowTopicPolicyOperation(TopicName topicName,
+                                             PolicyName policy,
+                                             PolicyOperation operation,
+                                             String originalRole,
+                                             String role,
+                                             AuthenticationDataSource 
authData) {
+        try {
+            return allowTopicPolicyOperationAsync(
+                    topicName, policy, operation, originalRole, role, 
authData).get();
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -564,7 +629,7 @@ public class AuthorizationService {
                                                                
AuthenticationDataSource authData) {
         if (log.isDebugEnabled()) {
             log.debug("Check if role {} is allowed to execute topic operation 
{} on topic {}",
-                role, operation, topicName);
+                    role, operation, topicName);
         }
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
@@ -572,21 +637,21 @@ public class AuthorizationService {
 
         if (provider != null) {
             CompletableFuture<Boolean> allowFuture =
-                provider.allowTopicOperationAsync(topicName, role, operation, 
authData);
+                    provider.allowTopicOperationAsync(topicName, role, 
operation, authData);
             if (log.isDebugEnabled()) {
                 return allowFuture.whenComplete((allowed, exception) -> {
                     if (exception == null) {
                         if (allowed) {
                             log.debug("Topic operation {} on topic {} is 
allowed: role = {}",
-                                operation, topicName, role);
-                        } else{
+                                    operation, topicName, role);
+                        } else {
                             log.debug("Topic operation {} on topic {} is NOT 
allowed: role = {}",
-                                operation, topicName, role);
+                                    operation, topicName, role);
                         }
                     } else {
                         log.debug("Failed to check if topic operation {} on 
topic {} is allowed:"
-                                + " role = {}",
-                            operation, topicName, role, exception);
+                                        + " role = {}",
+                                operation, topicName, role, exception);
                     }
                 });
             } else {
@@ -597,4 +662,36 @@ public class AuthorizationService {
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured for " +
                 "allowTopicOperationAsync"));
     }
+
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName,
+                                                               TopicOperation 
operation,
+                                                               String 
originalRole,
+                                                               String role,
+                                                               
AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTopicOperationAsync(
+                    topicName, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTopicOperationAsync(
+                    topicName, operation, originalRole, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                    (isRoleAuthorized, isOriginalAuthorized) -> 
isRoleAuthorized && isOriginalAuthorized);
+        } else {
+            return allowTopicOperationAsync(topicName, operation, role, 
authData);
+        }
+    }
+
+    public Boolean allowTopicOperation(TopicName topicName,
+                                       TopicOperation operation,
+                                       String originalRole,
+                                       String role,
+                                       AuthenticationDataSource authData) {
+        try {
+            return allowTopicOperationAsync(topicName, operation, 
originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index f0118b6..05c9fb2 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -529,17 +529,18 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                                                                    
NamespaceOperation operation,
                                                                    
AuthenticationDataSource authData) {
         CompletableFuture<Boolean> isAuthorizedFuture;
-        if (operation == NamespaceOperation.PACKAGES) {
-            isAuthorizedFuture = 
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, 
AuthAction.packages);
-        } else {
-            isAuthorizedFuture = CompletableFuture.completedFuture(false);
+        switch (operation) {
+            case PACKAGES:
+                isAuthorizedFuture = 
allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, 
AuthAction.packages);
+                break;
+            default:
+                isAuthorizedFuture = CompletableFuture.completedFuture(false);
         }
         CompletableFuture<Boolean> isTenantAdminFuture = 
validateTenantAdminAccess(namespaceName.getTenant(), role, authData);
         return isTenantAdminFuture.thenCombine(isAuthorizedFuture, 
(isTenantAdmin, isAuthorized) -> {
             if (log.isDebugEnabled()) {
-                log.debug("Verify if role {} is allowed to {} to topic {}:"
-                        + " isTenantAdmin={}, isAuthorized={}",
-                    role, operation, namespaceName, isTenantAdmin, 
isAuthorized);
+                log.debug("Verify if role {} is allowed to {} to topic {}: 
isTenantAdmin={}, isAuthorized={}",
+                        role, operation, namespaceName, isTenantAdmin, 
isAuthorized);
             }
             return isTenantAdmin || isAuthorized;
         });
@@ -559,29 +560,48 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                                                                String role,
                                                                TopicOperation 
operation,
                                                                
AuthenticationDataSource authData) {
+        log.debug("Check allowTopicOperationAsync [" + operation.name() + "] 
on [" + topicName.toString() + "].");
+
         CompletableFuture<Boolean> isAuthorizedFuture;
 
         switch (operation) {
-            case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, 
authData);
+            case LOOKUP:
+            case GET_STATS:
+                isAuthorizedFuture = canLookupAsync(topicName, role, authData);
                 break;
-            case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, 
role, authData);
+            case PRODUCE:
+                isAuthorizedFuture = canProduceAsync(topicName, role, 
authData);
                 break;
-            case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, 
role, authData, authData.getSubscription());
+            case GET_SUBSCRIPTIONS:
+            case CONSUME:
+            case SUBSCRIBE:
+            case UNSUBSCRIBE:
+            case SKIP:
+            case EXPIRE_MESSAGES:
+            case PEEK_MESSAGES:
+            case RESET_CURSOR:
+                isAuthorizedFuture = canConsumeAsync(topicName, role, 
authData, authData.getSubscription());
                 break;
+            case TERMINATE:
+            case COMPACT:
+            case OFFLOAD:
+            case UNLOAD:
+            case ADD_BUNDLE_RANGE:
+            case GET_BUNDLE_RANGE:
+            case DELETE_BUNDLE_RANGE:
+                return validateTenantAdminAccess(topicName.getTenant(), role, 
authData);
             default:
-                return FutureUtil.failedFuture(new 
IllegalStateException("TopicOperation is not supported."));
+                return FutureUtil.failedFuture(
+                        new IllegalStateException("TopicOperation [" + 
operation.name() + "] is not supported."));
         }
 
-        CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, 
authData, conf);
-
-        // check isSuperUser first
-        return isSuperUserFuture
-                .thenCompose(isSuperUser -> {
+        return validateTenantAdminAccess(topicName.getTenant(), role, authData)
+                .thenCompose(isSuperUserOrAdmin -> {
                     if (log.isDebugEnabled()) {
-                        log.debug("Verify if role {} is allowed to {} to topic 
{}: isSuperUser={}",
-                                role, operation, topicName, isSuperUser);
+                        log.debug("Verify if role {} is allowed to {} to topic 
{}: isSuperUserOrAdmin={}",
+                                role, operation, topicName, 
isSuperUserOrAdmin);
                     }
-                    if (isSuperUser) {
+                    if (isSuperUserOrAdmin) {
                         return CompletableFuture.completedFuture(true);
                     } else {
                         return isAuthorizedFuture;
@@ -589,6 +609,15 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                 });
     }
 
+    @Override
+    public CompletableFuture<Boolean> allowTopicPolicyOperationAsync(TopicName 
topicName,
+                                                                     String 
role,
+                                                                     
PolicyName policyName,
+                                                                     
PolicyOperation policyOperation,
+                                                                     
AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(topicName.getTenant(), role, 
authData);
+    }
+
     private static String path(String... parts) {
         StringBuilder sb = new StringBuilder();
         sb.append("/admin/");
@@ -596,27 +625,28 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return sb.toString();
     }
 
-    private CompletableFuture<Boolean> validateTenantAdminAccess(String 
tenantName,
+    public CompletableFuture<Boolean> validateTenantAdminAccess(String 
tenantName,
                                                                  String role,
-                                                                
AuthenticationDataSource authData) {
-        try {
-            TenantInfo tenantInfo = pulsarResources.getTenantResources()
-                    .get(path(POLICIES, tenantName))
-                    .orElseThrow(() -> new 
RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
-
-            // role check
-            CompletableFuture<Boolean> isRoleSuperUserFuture = 
isSuperUser(role, authData, conf);
-            CompletableFuture<Boolean> isRoleTenantAdminFuture = 
isTenantAdmin(tenantName, role, tenantInfo, authData);
-            return isRoleSuperUserFuture
-                    .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, 
isRoleTenantAdmin) ->
-                            isRoleSuperUser || isRoleTenantAdmin);
-        } catch (NotFoundException e) {
-            log.warn("Failed to get tenant info data for non existing tenant 
{}", tenantName);
-            throw new RestException(Response.Status.NOT_FOUND, "Tenant does 
not exist");
-        } catch (Exception e) {
-            log.error("Failed to get tenant {}", tenantName, e);
-            throw new RestException(e);
-        }
+                                                                 
AuthenticationDataSource authData) {
+        return isSuperUser(role, authData, conf)
+                .thenCompose(isSuperUser -> {
+                    if (isSuperUser) {
+                        return CompletableFuture.completedFuture(true);
+                    } else {
+                        try {
+                            TenantInfo tenantInfo = 
pulsarResources.getTenantResources()
+                                    .get(path(POLICIES, tenantName))
+                                    .orElseThrow(() -> new 
RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
+                            return isTenantAdmin(tenantName, role, tenantInfo, 
authData);
+                        } catch (NotFoundException e) {
+                            log.warn("Failed to get tenant info data for non 
existing tenant {}", tenantName);
+                            throw new RestException(Response.Status.NOT_FOUND, 
"Tenant does not exist");
+                        } catch (Exception e) {
+                            log.error("Failed to get tenant {}", tenantName, 
e);
+                            throw new RestException(e);
+                        }
+                    }
+                });
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2b9d67b..78792d1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -55,10 +55,12 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -490,13 +492,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
         }
 
         try {
-            checkConnect(topicName);
-        } catch (WebApplicationException e) {
-            try {
-                validateAdminAccessForTenant(topicName.getTenant());
-            } catch (Exception ex) {
-                return FutureUtil.failedFuture(ex);
-            }
+            validateTopicOperation(topicName, TopicOperation.LOOKUP);
+        } catch (RestException e) {
+            return FutureUtil.failedFuture(e);
         } catch (Exception e) {
             // unknown error marked as internal server error
             log.warn("Unexpected error while authorizing lookup. topic={}, 
role={}. Error: {}", topicName,
@@ -520,9 +518,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
         validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
 
         try {
-            checkConnect(topicName);
-        } catch (WebApplicationException e) {
-            validateAdminAccessForTenant(topicName.getTenant());
+            validateTopicOperation(topicName, TopicOperation.LOOKUP);
         } catch (Exception e) {
             // unknown error marked as internal server error
             log.warn("Unexpected error while authorizing lookup. topic={}, 
role={}. Error: {}", topicName,
@@ -694,7 +690,7 @@ public abstract class AdminResource extends 
PulsarWebResource {
 
         final int maxPartitions = 
pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         try {
-            validateAdminAccessForTenant(topicName.getTenant());
+            validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.CREATE_TOPIC);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a58cb7d..013a9ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -831,7 +831,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetAutoSubscriptionCreation(
-                AsyncResponse asyncResponse, AutoSubscriptionCreationOverride 
autoSubscriptionCreationOverride) {
+            AsyncResponse asyncResponse, AutoSubscriptionCreationOverride 
autoSubscriptionCreationOverride) {
         validateNamespacePolicyOperation(namespaceName, 
PolicyName.AUTO_SUBSCRIPTION_CREATION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 76747e4..2c1af56 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -107,6 +107,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -114,10 +115,13 @@ import 
org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
@@ -137,7 +141,7 @@ public class PersistentTopicsBase extends AdminResource {
     private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = 
Version.forIntegers(1, 21);
 
     protected List<String> internalGetList() {
-        validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_TOPICS);
 
         // Validate that namespace exists, throws 404 if it doesn't exist
         try {
@@ -173,6 +177,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected List<String> internalGetPartitionedTopicList() {
         validateAdminAccessForTenant(namespaceName.getTenant());
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_TOPICS);
         // Validate that namespace exists, throws 404 if it doesn't exist
         try {
             if (!namespaceResources().exists(path(POLICIES, 
namespaceName.toString()))) {
@@ -252,74 +257,6 @@ public class PersistentTopicsBase extends AdminResource {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    public void validateReadOperationOnTopic(boolean authoritative) {
-        validateTopicOwnership(topicName, authoritative);
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] failed to validate admin access for {}", 
topicName, clientAppId());
-            }
-            validateAdminAccessForSubscriber("");
-        }
-    }
-
-    public void validateWriteOperationOnTopic(boolean authoritative) {
-        validateTopicOwnership(topicName, authoritative);
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] failed to validate admin access for {}", 
topicName, clientAppId());
-            }
-            try {
-                if 
(!pulsar().getBrokerService().getAuthorizationService().canProduce(topicName, 
clientAppId(),
-                  clientAuthData())) {
-                    log.warn("[{}} Subscriber {} is not authorized to access 
api", topicName, clientAppId());
-                    throw new RestException(Status.UNAUTHORIZED,
-                      String.format("Subscriber %s is not authorized to access 
this operation", clientAppId()));
-                }
-            } catch (RestException re) {
-                throw re;
-            } catch (Exception ex) {
-                // unknown error marked as internal server error
-                log.warn("Unexpected error while authorizing request. 
topic={}, role={}. Error: {}", topicName,
-                  clientAppId(), e.getMessage(), ex);
-                throw new RestException(ex);
-            }
-        }
-    }
-
-    protected void validateAdminAccessForSubscriber(String subscriptionName, 
boolean authoritative) {
-        validateTopicOwnership(topicName, authoritative);
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-        } catch (Exception e) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] failed to validate admin access for {}", 
topicName, clientAppId());
-            }
-            validateAdminAccessForSubscriber(subscriptionName);
-        }
-    }
-
-    private void validateAdminAccessForSubscriber(String subscriptionName) {
-        try {
-            if 
(!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName, 
clientAppId(),
-                    clientAuthData(), subscriptionName)) {
-                log.warn("[{}} Subscriber {} is not authorized to access api", 
topicName, clientAppId());
-                throw new RestException(Status.UNAUTHORIZED,
-                        String.format("Subscriber %s is not authorized to 
access this operation", clientAppId()));
-            }
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            // unknown error marked as internal server error
-            log.warn("Unexpected error while authorizing request. topic={}, 
role={}. Error: {}", topicName,
-                    clientAppId(), e.getMessage(), e);
-            throw new RestException(e);
-        }
-    }
-
     private void grantPermissions(String topicUri, String role, 
Set<AuthAction> actions) {
         try {
             namespaceResources().set(path(POLICIES, namespaceName.toString()), 
(policies) -> {
@@ -358,7 +295,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalDeleteTopicForcefully(boolean authoritative, 
boolean deleteSchema) {
-        validateWriteOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.DELETE_TOPIC);
 
         try {
             pulsar().getBrokerService().deleteTopic(topicName.toString(), 
true, deleteSchema).get();
@@ -419,13 +357,12 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
-        validateWriteOperationOnTopic(authoritative);
         validateNonPartitionTopicName(topicName.getLocalName());
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-
         validateTopicOwnership(topicName, authoritative);
+        validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.CREATE_TOPIC);
 
         PartitionedTopicMetadata partitionMetadata = 
getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
@@ -465,7 +402,9 @@ public class PersistentTopicsBase extends AdminResource {
      */
     protected void internalUpdatePartitionedTopic(int numPartitions,
                                                   boolean 
updateLocalTopicOnly, boolean authoritative) {
-        validateWriteOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE);
+
         // Only do the validation if it's the first hop.
         if (!updateLocalTopicOnly) {
             validatePartitionTopicUpdate(topicName.getLocalName(), 
numPartitions);
@@ -613,7 +552,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, 
boolean authoritative,
                                                   boolean force, boolean 
deleteSchema) {
         try {
-            validateWriteOperationOnTopic(authoritative);
+            validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.DELETE_TOPIC);
+            validateTopicOwnership(topicName, authoritative);
         } catch (WebApplicationException wae) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Failed to delete partitioned topic {}, 
redirecting to other brokers.",
@@ -1005,8 +945,8 @@ public class PersistentTopicsBase extends AdminResource {
     private void internalUnloadNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
         Topic topic;
         try {
-            validateAdminAccessForTenant(topicName.getTenant());
             validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.UNLOAD);
             topic = getTopicReference(topicName);
         } catch (Exception e) {
             log.error("[{}] Failed to unload topic {}", clientAppId(), 
topicName, e);
@@ -1034,7 +974,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalDeleteTopic(boolean authoritative, boolean 
deleteSchema) {
-        validateWriteOperationOnTopic(authoritative);
+        validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.DELETE_TOPIC);
+        validateTopicOwnership(topicName, authoritative);
 
         try {
             pulsar().getBrokerService().deleteTopic(topicName.toString(), 
false, deleteSchema).get();
@@ -1062,6 +1003,9 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
+
+        validateTopicOwnership(topicName, authoritative);
+
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, 
authoritative);
@@ -1115,7 +1059,9 @@ public class PersistentTopicsBase extends AdminResource {
 
     private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse 
asyncResponse, boolean authoritative) {
         try {
-            validateReadOperationOnTopic(authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, 
TopicOperation.GET_SUBSCRIPTIONS);
+
             Topic topic = getTopicReference(topicName);
             final List<String> subscriptions = Lists.newArrayList();
             topic.getSubscriptions().forEach((subName, sub) -> 
subscriptions.add(subName));
@@ -1136,25 +1082,29 @@ public class PersistentTopicsBase extends AdminResource 
{
 
     protected TopicStats internalGetStats(boolean authoritative, boolean 
getPreciseBacklog,
                                           boolean subscriptionBacklogSize) {
-        validateAdminAndClientPermission();
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_STATS);
+
         Topic topic = getTopicReference(topicName);
         return topic.getStats(getPreciseBacklog, subscriptionBacklogSize);
     }
 
     protected PersistentTopicInternalStats internalGetInternalStats(boolean 
authoritative, boolean metadata) {
-        validateAdminAndClientPermission();
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_STATS);
+
         Topic topic = getTopicReference(topicName);
         try {
-            boolean includeMetadata = metadata && hasSuperUserAccess();
-            return topic.getInternalStats(includeMetadata).get();
+            if (metadata) {
+                validateTopicOperation(topicName, TopicOperation.GET_METADATA);
+            }
+            return topic.getInternalStats(metadata).get();
         } catch (Exception e) {
             throw new RestException(Status.INTERNAL_SERVER_ERROR,
                     (e instanceof ExecutionException) ? 
e.getCause().getMessage() : e.getMessage());
@@ -1240,7 +1190,7 @@ public class PersistentTopicsBase extends AdminResource {
     protected void 
internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse asyncResponse) 
{
         String managedLedger;
         try {
-            validateAdminAccessForTenant(topicName.getTenant());
+            validateTopicOperation(topicName, TopicOperation.GET_STATS);
             managedLedger = topicName.getPersistenceNamingEncoding();
         } catch (Exception e) {
             log.error("[{}] Failed to get managed info for {}", clientAppId(), 
topicName, e);
@@ -1409,6 +1359,7 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
+        validateTopicOwnership(topicName, authoritative);
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
@@ -1468,7 +1419,9 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
                                                                   String 
subName, boolean authoritative) {
         try {
-            validateAdminAccessForSubscriber(subName, authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
+
             Topic topic = getTopicReference(topicName);
             Subscription sub = topic.getSubscription(subName);
             if (sub == null) {
@@ -1563,7 +1516,9 @@ public class PersistentTopicsBase extends AdminResource {
     private void 
internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncResponse 
asyncResponse,
                                                                             
String subName, boolean authoritative) {
         try {
-            validateAdminAccessForSubscriber(subName, authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE);
+
             Topic topic = getTopicReference(topicName);
             Subscription sub = topic.getSubscription(subName);
             if (sub == null) {
@@ -1600,6 +1555,10 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
+
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.SKIP, subName);
+
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalSkipAllMessagesForNonPartitionedTopic(asyncResponse, 
subName, authoritative);
@@ -1657,7 +1616,9 @@ public class PersistentTopicsBase extends AdminResource {
     private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse 
asyncResponse,
                                                                String subName, 
boolean authoritative) {
         try {
-            validateAdminAccessForSubscriber(subName, authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.SKIP, subName);
+
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
                 if (ex != null) {
@@ -1707,7 +1668,10 @@ public class PersistentTopicsBase extends AdminResource {
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages 
on a partitioned topic is not allowed");
         }
-        validateAdminAccessForSubscriber(subName, authoritative);
+
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.SKIP);
+
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
             if (subName.startsWith(topic.getReplicatorPrefix())) {
@@ -1802,8 +1766,8 @@ public class PersistentTopicsBase extends AdminResource {
         // validate ownership and redirect if current broker is not owner
         PersistentTopic topic;
         try {
-            validateWriteOperationOnTopic(authoritative);
-
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
             topic = (PersistentTopic) getTopicReference(topicName);
         } catch (WebApplicationException wae) {
             if (log.isDebugEnabled()) {
@@ -1864,6 +1828,10 @@ public class PersistentTopicsBase extends AdminResource {
                 return;
             }
         }
+
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.RESET_CURSOR, 
subName);
+
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (topicName.isPartitioned()) {
             internalResetCursorForNonPartitionedTopic(asyncResponse, subName, 
timestamp, authoritative);
@@ -1952,9 +1920,12 @@ public class PersistentTopicsBase extends AdminResource {
     private void internalResetCursorForNonPartitionedTopic(AsyncResponse 
asyncResponse, String subName, long timestamp,
                                        boolean authoritative) {
         try {
-            validateAdminAccessForSubscriber(subName, authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR);
+
             log.info("[{}] [{}] Received reset cursor on subscription {} to 
time {}",
                     clientAppId(), topicName, subName, timestamp);
+
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             if (topic == null) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
@@ -2100,7 +2071,8 @@ public class PersistentTopicsBase extends AdminResource {
             AsyncResponse asyncResponse, String subscriptionName,
             MessageIdImpl targetMessageId, boolean authoritative, boolean 
replicated) {
         try {
-            validateAdminAccessForSubscriber(subscriptionName, authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.SUBSCRIBE);
 
             boolean isAllowAutoTopicCreation = 
pulsar().getConfiguration().isAllowAutoTopicCreation();
             PersistentTopic topic = (PersistentTopic) 
pulsar().getBrokerService()
@@ -2168,9 +2140,9 @@ public class PersistentTopicsBase extends AdminResource {
                     "Reset-cursor at position is not allowed for 
partitioned-topic"));
             return;
         } else {
-            validateAdminAccessForSubscriber(subName, authoritative);
-            // will redirect if the topic not owned by current broker
-            validateReadOperationOnTopic(authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.RESET_CURSOR);
+
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             if (topic == null) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
@@ -2304,7 +2276,12 @@ public class PersistentTopicsBase extends AdminResource {
                                               boolean authoritative) {
         try {
             // will redirect if the topic not owned by current broker
-            validateReadOperationOnTopic(authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+
+            if (topicName.isGlobal()) {
+                validateGlobalNamespaceOwnership(namespaceName);
+            }
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
topic.getManagedLedger();
             ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new 
AsyncCallbacks.ReadEntryCallback() {
@@ -2336,13 +2313,15 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected Response internalPeekNthMessage(String subName, int 
messagePosition, boolean authoritative) {
-        verifyReadOperation(authoritative);
         // If the topic name is a partition name, no need to get partition 
topic metadata again
         if (!topicName.isPartitioned() && 
getPartitionedTopicMetadata(topicName,
                 authoritative, false).partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages 
on a partitioned topic is not allowed");
         }
-        validateAdminAccessForSubscriber(subName, authoritative);
+
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
+
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
                     subName);
@@ -2496,7 +2475,7 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         // Validate that namespace exists, throw 404 if it doesn't exist
-        // note that we do not want to load the topic and hence skip 
validateAdminOperationOnTopic()
+        // note that we do not want to load the topic and hence skip 
authorization check
         try {
             namespaceResources().get(path(POLICIES, namespaceName.toString()));
         } catch 
(org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException e) {
@@ -2555,6 +2534,8 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
                                            BacklogQuota.BacklogQuotaType 
backlogQuotaType, BacklogQuota backlogQuota) {
+        validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
         if (backlogQuotaType == null) {
             backlogQuotaType = 
BacklogQuota.BacklogQuotaType.destination_storage;
         }
@@ -2903,7 +2884,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of 
a partitioned topic is not allowed");
         }
-        validateWriteOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.TERMINATE);
         Topic topic = getTopicReference(topicName);
         try {
             return ((PersistentTopic) topic).terminate().get();
@@ -2917,7 +2899,8 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-      validateAdminOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.TERMINATE);
 
       List<MessageId> messageIds = new ArrayList<>();
 
@@ -3042,8 +3025,8 @@ public class PersistentTopicsBase extends AdminResource {
             throw new IllegalStateException(msg);
         }
 
-        // validate ownership and redirect if current broker is not owner
-        validateAdminAccessForSubscriber(subName, authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
 
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName, subName);
@@ -3101,6 +3084,9 @@ public class PersistentTopicsBase extends AdminResource {
             }
         }
 
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.EXPIRE_MESSAGES);
+
         log.info("[{}][{}] received expire messages on subscription {} to 
position {}", clientAppId(), topicName,
                 subName, messageId);
 
@@ -3119,8 +3105,6 @@ public class PersistentTopicsBase extends AdminResource {
                     "Invalid parameter for expire message by position, 
partition index of message position "
                             + "passed in doesn't match partition index for the 
topic."));
         } else {
-            validateAdminAccessForSubscriber(subName, authoritative);
-            validateReadOperationOnTopic(authoritative);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             if (topic == null) {
                 asyncResponse.resume(new RestException(Status.NOT_FOUND, 
"Topic not found"));
@@ -3264,7 +3248,8 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalTriggerCompactionNonPartitionedTopic(boolean 
authoritative) {
-        validateWriteOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.COMPACT);
 
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
@@ -3278,13 +3263,17 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected LongRunningProcessStatus internalCompactionStatus(boolean 
authoritative) {
-        validateReadOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.COMPACT);
+
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         return topic.compactionStatus();
     }
 
     protected void internalTriggerOffload(boolean authoritative, MessageIdImpl 
messageId) {
-        validateWriteOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.OFFLOAD);
+
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
             topic.triggerOffload(messageId);
@@ -3297,7 +3286,9 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected OffloadProcessStatus internalOffloadStatus(boolean 
authoritative) {
-        validateReadOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.OFFLOAD);
+
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         return topic.offloadStatus();
     }
@@ -3675,7 +3666,8 @@ public class PersistentTopicsBase extends AdminResource {
     protected void internalGetLastMessageId(AsyncResponse asyncResponse, 
boolean authoritative) {
         Topic topic;
         try {
-            validateReadOperationOnTopic(authoritative);
+            validateTopicOwnership(topicName, authoritative);
+            validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
             topic = getTopicReference(topicName);
         } catch (WebApplicationException wae) {
             if (log.isDebugEnabled()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index d9cc845..20fd24d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -50,9 +50,11 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,7 +100,8 @@ public class NonPersistentTopics extends PersistentTopics {
                                             @PathParam("topic") @Encoded 
String encodedTopic,
                                             @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        validateAdminOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_STATS);
         Topic topic = getTopicReference(topicName);
         return ((NonPersistentTopic) topic).getStats(false, false);
     }
@@ -119,7 +122,8 @@ public class NonPersistentTopics extends PersistentTopics {
                                                          
@QueryParam("metadata") @DefaultValue("false")
                                                                      boolean 
metadata) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        validateAdminOperationOnTopic(authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_STATS);
         Topic topic = getTopicReference(topicName);
         try {
             boolean includeMetadata = metadata && hasSuperUserAccess();
@@ -188,7 +192,7 @@ public class NonPersistentTopics extends PersistentTopics {
         Policies policies = null;
         NamespaceName nsName = null;
         try {
-            validateAdminAccessForTenant(property);
+            validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_TOPICS);
             policies = getNamespacePolicies(property, cluster, namespace);
             nsName = NamespaceName.get(property, cluster, namespace);
 
@@ -254,7 +258,7 @@ public class NonPersistentTopics extends PersistentTopics {
                                           @PathParam("bundle") String 
bundleRange) {
         log.info("[{}] list of topics on namespace bundle {}/{}/{}/{}", 
clientAppId(), property, cluster, namespace,
                 bundleRange);
-        validateAdminAccessForTenant(property);
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_BUNDLE);
         Policies policies = getNamespacePolicies(property, cluster, namespace);
         if (!cluster.equals(Constants.GLOBAL_CLUSTER)) {
             validateClusterOwnership(cluster);
@@ -289,11 +293,6 @@ public class NonPersistentTopics extends PersistentTopics {
         }
     }
 
-    protected void validateAdminOperationOnTopic(TopicName topicName, boolean 
authoritative) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        validateTopicOwnership(topicName, authoritative);
-    }
-
     private Topic getTopicReference(TopicName topicName) {
         return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
                 .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic 
not found"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index cfc6f66..c1ed16f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -50,9 +50,11 @@ import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,7 +120,9 @@ public class NonPersistentTopics extends PersistentTopics {
                     + "not to use when there's heavy traffic.")
             @QueryParam("subscriptionBacklogSize") @DefaultValue("false") 
boolean subscriptionBacklogSize) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminOperationOnTopic(topicName, authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_STATS);
+
         Topic topic = getTopicReference(topicName);
         return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, 
subscriptionBacklogSize);
     }
@@ -145,7 +149,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative,
             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminOperationOnTopic(topicName, authoritative);
+        validateTopicOwnership(topicName, authoritative);
+        validateTopicOperation(topicName, TopicOperation.GET_STATS);
         Topic topic = getTopicReference(topicName);
         try {
             boolean includeMetadata = metadata && hasSuperUserAccess();
@@ -188,7 +193,6 @@ public class NonPersistentTopics extends PersistentTopics {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validateTopicName(tenant, namespace, encodedTopic);
-            validateAdminAccessForTenant(topicName.getTenant());
             internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
@@ -252,7 +256,7 @@ public class NonPersistentTopics extends PersistentTopics {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] list of topics on namespace {}", 
clientAppId(), namespaceName);
             }
-            validateAdminAccessForTenant(tenant);
+            validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_TOPICS);
             policies = getNamespacePolicies(namespaceName);
 
             // check cluster ownership for a given global namespace: redirect 
if peer-cluster owns it
@@ -327,7 +331,7 @@ public class NonPersistentTopics extends PersistentTopics {
             log.debug("[{}] list of topics on namespace bundle {}/{}", 
clientAppId(), namespaceName, bundleRange);
         }
 
-        validateAdminAccessForTenant(tenant);
+        validateNamespaceOperation(namespaceName, 
NamespaceOperation.GET_BUNDLE);
         Policies policies = getNamespacePolicies(namespaceName);
 
         // check cluster ownership for a given global namespace: redirect if 
peer-cluster owns it
@@ -374,11 +378,6 @@ public class NonPersistentTopics extends PersistentTopics {
         });
     }
 
-    protected void validateAdminOperationOnTopic(TopicName topicName, boolean 
authoritative) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        validateTopicOwnership(topicName, authoritative);
-    }
-
     private Topic getTopicReference(TopicName topicName) {
         return 
pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
                 .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic 
not found"));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9ee0bf9..052eed6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -65,6 +65,8 @@ import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -232,7 +234,7 @@ public class PersistentTopics extends PersistentTopicsBase {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
-            validateAdminAccessForTenant(topicName.getTenant());
+            validateTopicPolicyOperation(topicName, PolicyName.PARTITION, 
PolicyOperation.WRITE);
             internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", 
clientAppId(), topicName, e);
@@ -525,6 +527,8 @@ public class PersistentTopics extends PersistentTopicsBase {
                     Integer maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation();
+        validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
         
internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, 
ex)
                 -> internalHandleResult(asyncResponse, res, ex, "Failed set 
MaxUnackedMessagesOnSubscription"));
     }
@@ -584,6 +588,8 @@ public class PersistentTopics extends PersistentTopicsBase {
                     DelayedDeliveryPolicies deliveryPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation();
+        validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
         internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
     }
 
@@ -1021,6 +1027,7 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
+            validateTopicOwnership(topicName, authoritative);
             internalDeleteSubscription(asyncResponse, decode(encodedSubName), 
authoritative, force);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index 3dc1be3..44d0ff2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -43,6 +43,8 @@ import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.NamespaceOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,14 +132,15 @@ public class TopicLookupBase extends PulsarWebResource {
 
     private void validateAdminAndClientPermission(TopicName topic) throws 
RestException, Exception {
         try {
-            validateAdminAccessForTenant(topic.getTenant());
+            validateTopicOperation(topic, TopicOperation.LOOKUP);
         } catch (Exception e) {
-            checkConnect(topic);
+            // unknown error marked as internal server error
+            throw new RestException(e);
         }
     }
 
     protected String internalGetNamespaceBundle(TopicName topicName) {
-        validateSuperUserAccess();
+        validateNamespaceOperation(topicName.getNamespaceObject(), 
NamespaceOperation.GET_BUNDLE);
         try {
             NamespaceBundle bundle = 
pulsar().getNamespaceService().getBundle(topicName);
             return bundle.getBundleRange();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 835988e..9566d8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
+import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.path.PolicyPath;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -780,10 +781,6 @@ public abstract class PulsarWebResource {
         return null;
     }
 
-    protected void checkConnect(TopicName topicName) throws Exception {
-        checkAuthorization(pulsar(), topicName, clientAppId(), 
clientAuthData());
-    }
-
     protected static void checkAuthorization(PulsarService pulsarService, 
TopicName topicName, String role,
             AuthenticationDataSource authenticationData) throws Exception {
         if (!pulsarService.getConfiguration().isAuthorizationEnabled()) {
@@ -791,8 +788,8 @@ public abstract class PulsarWebResource {
             return;
         }
         // get zk policy manager
-        if 
(!pulsarService.getBrokerService().getAuthorizationService().canLookup(topicName,
 role,
-                authenticationData)) {
+        if 
(!pulsarService.getBrokerService().getAuthorizationService().allowTopicOperation(topicName,
+                TopicOperation.LOOKUP, null, role, authenticationData)) {
             log.warn("[{}] Role {} is not allowed to lookup topic", topicName, 
role);
             throw new RestException(Status.UNAUTHORIZED, "Don't have 
permission to connect to this namespace");
         }
@@ -1038,6 +1035,50 @@ public abstract class PulsarWebResource {
             URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
             log.debug("[{}] Redirecting the rest call to {}: broker={}", 
clientAppId(), redirect, broker);
             throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
+
+        }
+    }
+
+    public void validateTopicPolicyOperation(TopicName topicName, PolicyName 
policy, PolicyOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+                && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+            }
+
+            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                    .allowTopicPolicyOperation(topicName, policy, operation, 
originalPrincipal(), clientAppId(),
+                            clientAuthData());
+
+            if (!isAuthorized) {
+                throw new RestException(Status.FORBIDDEN, 
String.format("Unauthorized to validateTopicPolicyOperation"
+                        + " for operation [%s] on topic [%s] on policy [%s]", 
operation.toString(),
+                        topicName, policy.toString()));
+            }
+        }
+    }
+
+    public void validateTopicOperation(TopicName topicName, TopicOperation 
operation) {
+        validateTopicOperation(topicName, operation, null);
+    }
+
+    public void validateTopicOperation(TopicName topicName, TopicOperation 
operation, String subscription) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+                && pulsar().getBrokerService().isAuthorizationEnabled()) {
+            if (!isClientAuthenticated(clientAppId())) {
+                throw new RestException(Status.UNAUTHORIZED, "Need to 
authenticate to perform the request");
+            }
+
+            AuthenticationDataHttps authData = clientAuthData();
+            authData.setSubscription(subscription);
+
+            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                    .allowTopicOperation(topicName, operation, 
originalPrincipal(), clientAppId(), authData);
+
+            if (!isAuthorized) {
+                throw new RestException(Status.UNAUTHORIZED, 
String.format("Unauthorized to validateTopicOperation for"
+                        + " operation [%s] on topic [%s]", 
operation.toString(), topicName));
+            }
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23015be..dbf7656 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -236,7 +236,6 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
-
         TopicStats topicStats = persistentTopics.getStats(testTenant, 
testNamespace, testLocalTopicName, true, true, false);
         long msgBacklog = 
topicStats.subscriptions.get(SUB_EARLIEST).msgBacklog;
         System.out.println("Message back log for " + SUB_EARLIEST + " is :" + 
msgBacklog);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index c5a9335..a0a5917 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -549,6 +549,7 @@ public class ServerCnxTest {
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
         
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).isSuperUser(Mockito.anyString(),
 Mockito.any(), Mockito.any());
+        
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider).validateTenantAdminAccess(Mockito.anyString(),
 Mockito.any(), Mockito.any());
         
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider).checkPermission(any(TopicName.class),
 Mockito.anyString(),
                 any(AuthAction.class));
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index 5647e00..f6fa07c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -245,15 +245,8 @@ public class AuthenticatedProducerConsumerTest extends 
ProducerConsumerBase {
         
replacePulsarClient(PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
                 .operationTimeout(1, TimeUnit.SECONDS));
 
-        // unauthorized topic test
-        Exception pulsarClientException = null;
-        try {
-            
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/other-topic")
-                    .subscriptionName("my-subscriber-name").subscribe();
-        } catch (Exception e) {
-            pulsarClientException = e;
-        }
-        Assert.assertTrue(pulsarClientException instanceof 
PulsarClientException);
+        
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/other-topic")
+                .subscriptionName("my-subscriber-name").subscribe();
 
         testSyncProducerAndConsumer(batchMessageDelayMs);
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 895143e..c6751d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.naming.AuthenticationException;
 import lombok.Cleanup;
@@ -52,6 +53,8 @@ import 
org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -541,13 +544,27 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         @Override
         public CompletableFuture<Boolean> allowTopicOperationAsync(
             TopicName topic, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
-            return CompletableFuture.completedFuture(true);
+            CompletableFuture<Boolean> isAuthorizedFuture;
+
+            if (role.equals("plugbleRole")) {
+                isAuthorizedFuture = CompletableFuture.completedFuture(true);
+            } else {
+                isAuthorizedFuture = CompletableFuture.completedFuture(false);
+            }
+
+            return isAuthorizedFuture;
         }
 
         @Override
         public Boolean allowTopicOperation(
             TopicName topicName, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
-            return true;
+            try {
+                return allowTopicOperationAsync(topicName, role, operation, 
authData).get();
+            } catch (InterruptedException e) {
+                throw new RestException(e);
+            } catch (ExecutionException e) {
+                throw new RestException(e);
+            }
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index b10990a..2b11502 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -38,6 +38,7 @@ public enum PolicyName {
     MAX_UNACKED,
     MAX_SUBSCRIPTIONS,
     OFFLOAD,
+    PARTITION,
     PERSISTENCE,
     RATE,
     RETENTION,
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
index 7e54cca..a336dbd 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicOperation.java
@@ -47,4 +47,7 @@ public enum TopicOperation {
     SUBSCRIBE,
     GET_SUBSCRIPTIONS,
     UNSUBSCRIBE,
+
+    GET_STATS,
+    GET_METADATA,
 }

Reply via email to