This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f5a2f  allowTopicOperationAsync should check the original role is 
super user (#1355) (#7788)
48f5a2f is described below

commit 48f5a2f62c148b3df617be060fefed51f3145979
Author: Sijie Guo <[email protected]>
AuthorDate: Wed Aug 12 06:02:51 2020 -0700

    allowTopicOperationAsync should check the original role is super user 
(#1355) (#7788)
    
    * Fix allowTopicOperationAsync logic (#1355)
    
    *Modifications*
    
    - We should use the original role to verify if it is allowed for a given 
topic operation
    - use the original authentication data
    - Authz provider doesn't have to be aware of proxyRole
    - Fix authorization test
    
    * Refactor authorize logic to provide a uniform authorization behavior
---
 .../authorization/AuthorizationProvider.java       | 230 +++++++++++++++++----
 .../broker/authorization/AuthorizationService.java | 217 +++++++++++++------
 .../authorization/PulsarAuthorizationProvider.java | 103 ++++-----
 .../pulsar/broker/web/AuthenticationFilter.java    |   2 -
 .../broker/admin/impl/PersistentTopicsBase.java    |  30 +++
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 229 ++++++++++----------
 .../pulsar/broker/web/PulsarWebResource.java       |  66 +++---
 .../pulsar/broker/service/ServerCnxTest.java       |  11 +-
 .../api/AuthorizationProducerConsumerTest.java     |  47 ++---
 .../pulsar/proxy/server/AdminProxyHandler.java     |   6 +-
 12 files changed, 598 insertions(+), 347 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 57147e7..0403f34 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -225,12 +226,19 @@ public interface AuthorizationProvider extends Closeable {
      * @param authData
      * @return CompletableFuture<Boolean>
      */
+    @Deprecated
     default CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName, String originalRole, String role,
                                                             TenantOperation 
operation,
                                                             
AuthenticationDataSource authData) {
-        return isTenantAdmin(tenantName, role, null, authData);
+        return allowTenantOperationAsync(
+            tenantName,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            operation,
+            authData
+        );
     }
 
+    @Deprecated
     default Boolean allowTenantOperation(String tenantName, String 
originalRole, String role, TenantOperation operation,
                                       AuthenticationDataSource authData) {
         try {
@@ -243,26 +251,93 @@ public interface AuthorizationProvider extends Closeable {
     }
 
     /**
+     * Check if a given <tt>role</tt> is allowed to execute a given 
<tt>operation</tt> on the tenant.
+     *
+     * @param tenantName tenant name
+     * @param role role name
+     * @param operation tenant operation
+     * @param authData authenticated data of the role
+     * @return a completable future represents check result
+     */
+    default CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName, String role,
+                                                                 
TenantOperation operation,
+                                                                 
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+            String.format("allowTenantOperation(%s) on tenant %s is not 
supported by the Authorization" +
+                    " provider you are using.",
+                operation.toString(), tenantName)));
+    }
+
+    default Boolean allowTenantOperation(String tenantName, String role, 
TenantOperation operation,
+                                         AuthenticationDataSource authData) {
+        try {
+            return allowTenantOperationAsync(tenantName, role, operation, 
authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
+     * Check if a given <tt>role</tt> is allowed to execute a given 
<tt>operation</tt> on the namespace.
+     *
+     * @param namespaceName namespace name
+     * @param role role name
+     * @param operation namespace operation
+     * @param authData authenticated data
+     * @return a completable future represents check result
+     */
+    default CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                    String 
role,
+                                                                    
NamespaceOperation operation,
+                                                                    
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+            new IllegalStateException("NamespaceOperation is not supported by 
the Authorization provider you are using."));
+    }
+
+    default Boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                            String role,
+                                            NamespaceOperation operation,
+                                            AuthenticationDataSource authData) 
{
+        try {
+            return allowNamespaceOperationAsync(namespaceName, role, 
operation, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
      * Grant authorization-action permission on a namespace to the given client
+     *
      * @param namespaceName
-     * @param originalRole role not overriden by proxy role if request do pass 
through proxy
-     * @param role originalRole | proxyRole if the request didn't pass through 
proxy
+     * @param role
      * @param operation
      * @param authData
      * @return CompletableFuture<Boolean>
      */
-    default CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
-                                                                 String role, 
NamespaceOperation operation,
-                                                                 
AuthenticationDataSource authData) {
-        return FutureUtil.failedFuture(
-            new IllegalStateException(
-                    String.format("NamespaceOperation(%s) on namespace(%s) by 
role(%s) is not supported" +
-                    " by the Authorization provider you are using.",
-                            operation.toString(), namespaceName.toString(), 
role == null ? "null" : role)));
+    @Deprecated
+    default CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                    String 
originalRole,
+                                                                    String 
role,
+                                                                    
NamespaceOperation operation,
+                                                                    
AuthenticationDataSource authData) {
+        return allowNamespaceOperationAsync(
+            namespaceName,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            operation,
+            authData
+        );
     }
 
-    default Boolean allowNamespaceOperation(NamespaceName namespaceName, 
String originalRole, String role,
-                                         NamespaceOperation operation, 
AuthenticationDataSource authData) {
+    @Deprecated
+    default Boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                            String originalRole,
+                                            String role,
+                                            NamespaceOperation operation,
+                                            AuthenticationDataSource authData) 
{
         try {
             return allowNamespaceOperationAsync(namespaceName, originalRole, 
role, operation, authData).get();
         } catch (InterruptedException e) {
@@ -273,6 +348,39 @@ public interface AuthorizationProvider extends Closeable {
     }
 
     /**
+     * Check if a given <tt>role</tt> is allowed to execute a given policy 
<tt>operation</tt> on the namespace.
+     *
+     * @param namespaceName namespace name
+     * @param policy policy name
+     * @param operation policy operation
+     * @param role role name
+     * @param authData authenticated data
+     * @return a completable future represents check result
+     */
+    default CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                          
PolicyName policy,
+                                                                          
PolicyOperation operation,
+                                                                          
String role,
+                                                                          
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+                new IllegalStateException("NamespacePolicyOperation is not 
supported by the Authorization provider you are using."));
+    }
+
+    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                  PolicyName policy,
+                                                  PolicyOperation operation,
+                                                  String role,
+                                                  AuthenticationDataSource 
authData) {
+        try {
+            return allowNamespacePolicyOperationAsync(namespaceName, policy, 
operation, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
+
+    /**
      * Grant authorization-action permission on a namespace to the given client
      * @param namespaceName
      * @param originalRole role not overriden by proxy role if request do pass 
through proxy
@@ -281,16 +389,32 @@ public interface AuthorizationProvider extends Closeable {
      * @param authData
      * @return CompletableFuture<Boolean>
      */
-    default CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName 
policy,
-                                                                          
PolicyOperation operation, String originalRole,
-                                                                          
String role, AuthenticationDataSource authData) {
-        return isTenantAdmin(namespaceName.getTenant(), role, null, authData);
+    @Deprecated
+    default CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                          
PolicyName policy,
+                                                                          
PolicyOperation operation,
+                                                                          
String originalRole,
+                                                                          
String role,
+                                                                          
AuthenticationDataSource authData) {
+        return allowNamespacePolicyOperationAsync(
+            namespaceName,
+            policy,
+            operation,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            authData
+        );
     }
 
-    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, 
PolicyName policy, PolicyOperation operation,
-                                                  String originalRole, String 
role, AuthenticationDataSource authData) {
+    @Deprecated
+    default Boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                  PolicyName policy,
+                                                  PolicyOperation operation,
+                                                  String originalRole,
+                                                  String role,
+                                                  AuthenticationDataSource 
authData) {
         try {
-            return allowNamespacePolicyOperationAsync(namespaceName, policy, 
operation, originalRole, role, authData).get();
+            return allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, originalRole, role, 
authData).get();
         } catch (InterruptedException e) {
             throw new RestException(e);
         } catch (ExecutionException e) {
@@ -298,6 +422,35 @@ public interface AuthorizationProvider extends Closeable {
         }
     }
 
+    /**
+     * Check if a given <tt>role</tt> is allowed to execute a given topic 
<tt>operation</tt> on the topic.
+     *
+     * @param topic topic name
+     * @param role role name
+     * @param operation topic operation
+     * @param authData authenticated data
+     * @return CompletableFuture<Boolean>
+     */
+    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic,
+                                                                String role,
+                                                                TopicOperation 
operation,
+                                                                
AuthenticationDataSource authData) {
+        return FutureUtil.failedFuture(
+            new IllegalStateException("TopicOperation is not supported by the 
Authorization provider you are using."));
+    }
+
+    default Boolean allowTopicOperation(TopicName topicName,
+                                        String role,
+                                        TopicOperation operation,
+                                        AuthenticationDataSource authData) {
+        try {
+            return allowTopicOperationAsync(topicName, role, operation, 
authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
+        }
+    }
 
     /**
      * Grant authorization-action permission on a topic to the given client
@@ -308,27 +461,26 @@ public interface AuthorizationProvider extends Closeable {
      * @param authData
      * @return CompletableFuture<Boolean>
      */
-    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic, String originalRole, String role,
-                                                             TopicOperation 
operation,
-                                                             
AuthenticationDataSource authData) {
-        switch (operation) {
-            case PRODUCE:
-                return canProduceAsync(topic, role, authData);
-            case CONSUME:
-                return canConsumeAsync(topic, role, authData, null);
-            case LOOKUP:
-                return canLookupAsync(topic, role, authData);
-            default:
-                return FutureUtil.failedFuture(
-                        new IllegalStateException(
-                                String.format("TopicOperation(%s) on topic(%s) 
by role(%s) is not supported" +
-                                                " by the Authorization 
provider you are using.",
-                                        operation.toString(), 
topic.toString(), role == null ? "null" : null)));
-        }
+    @Deprecated
+    default CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic,
+                                                                String 
originalRole,
+                                                                String role,
+                                                                TopicOperation 
operation,
+                                                                
AuthenticationDataSource authData) {
+        return allowTopicOperationAsync(
+            topic,
+            StringUtils.isBlank(originalRole) ? role : originalRole,
+            operation,
+            authData
+        );
     }
 
-    default Boolean allowTopicOperation(TopicName topicName, String 
originalRole, String role, TopicOperation operation,
-                                     AuthenticationDataSource authData) {
+    @Deprecated
+    default Boolean allowTopicOperation(TopicName topicName,
+                                        String originalRole,
+                                        String role,
+                                        TopicOperation operation,
+                                        AuthenticationDataSource authData) {
         try {
             return allowTopicOperationAsync(topicName, originalRole, role, 
operation, authData).get();
         } catch (InterruptedException e) {
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index b91d616..afa85ee 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.authorization;
 
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
-import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -35,12 +37,10 @@ import 
org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
@@ -341,45 +341,84 @@ public class AuthorizationService {
         return provider.allowSinkOpsAsync(namespaceName, role, 
authenticationData);
     }
 
+    private static void validateOriginalPrincipal(Set<String> proxyRoles, 
String authenticatedPrincipal,
+                                                  String originalPrincipal) {
+        if (proxyRoles.contains(authenticatedPrincipal)) {
+            // Request has come from a proxy
+            if (StringUtils.isBlank(originalPrincipal)) {
+                log.warn("Original principal empty in request authenticated as 
{}", authenticatedPrincipal);
+                throw new RestException(Response.Status.UNAUTHORIZED, 
"Original principal cannot be empty if the request is via proxy.");
+            }
+            if (proxyRoles.contains(originalPrincipal)) {
+                log.warn("Original principal {} cannot be a proxy role ({})", 
originalPrincipal, proxyRoles);
+                throw new RestException(Response.Status.UNAUTHORIZED, 
"Original principal cannot be a proxy role");
+            }
+        }
+    }
+
+    private boolean isProxyRole(String role) {
+        return role != null && conf.getProxyRoles().contains(role);
+    }
+
     /**
      * Grant authorization-action permission on a tenant to the given client
      *
-     * @param tenantName
-     * @param operation
-     * @param originalRole
-     * @param role
+     * @param tenantName tenant name
+     * @param operation tenant operation
+     * @param role role name
      * @param authData
      *            additional authdata in json for targeted authorization 
provider
      * @return IllegalArgumentException when tenant not found
      * @throws IllegalStateException
      *             when failed to grant permission
      */
-    public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName, TenantOperation operation,
-                                                                 String 
originalRole, String role,
-                                                                 
AuthenticationDataSource authData) {
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName,
+                                                                
TenantOperation operation,
+                                                                String role,
+                                                                
AuthenticationDataSource authData) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowTenantOperationAsync(tenantName, 
originalRole, role, operation, authData);
+            return provider.allowTenantOperationAsync(tenantName, role, 
operation, authData);
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured for " +
                 "allowTenantOperationAsync"));
     }
 
-    public Boolean allowTenantOperation(String tenantName, TenantOperation 
operation, String orignalRole, String role,
-                                        AuthenticationDataSource authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName,
+                                                                
TenantOperation operation,
+                                                                String 
originalRole,
+                                                                String role,
+                                                                
AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowTenantOperationAsync(
+                tenantName, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowTenantOperationAsync(
+                tenantName, operation, originalRole, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized 
&& isOriginalAuthorized);
+        } else {
+            return allowTenantOperationAsync(tenantName, operation, role, 
authData);
         }
+    }
 
-        if (provider != null) {
-            return provider.allowTenantOperation(tenantName, orignalRole, 
role, operation, authData);
+    public boolean allowTenantOperation(String tenantName,
+                                        TenantOperation operation,
+                                        String originalRole,
+                                        String role,
+                                        AuthenticationDataSource authData) {
+        try {
+            return allowTenantOperationAsync(
+                tenantName, operation, originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
         }
-
-        throw new IllegalStateException("No authorization provider configured 
for allowTenantOperation");
     }
 
     /**
@@ -387,7 +426,6 @@ public class AuthorizationService {
      *
      * @param namespaceName
      * @param operation
-     * @param originalRole
      * @param role
      * @param authData
      *            additional authdata in json for targeted authorization 
provider
@@ -397,31 +435,51 @@ public class AuthorizationService {
      */
     public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName,
                                                                    
NamespaceOperation operation,
-                                                                   String 
originalRole, String role,
+                                                                   String role,
                                                                    
AuthenticationDataSource authData) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowNamespaceOperationAsync(namespaceName, 
originalRole, role, operation, authData);
+            return provider.allowNamespaceOperationAsync(namespaceName, role, 
operation, authData);
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured for " +
                 "allowNamespaceOperationAsync"));
     }
 
-    public Boolean allowNamespaceOperation(NamespaceName namespaceName, 
NamespaceOperation operation,
-                                           String originalPrincipal, String 
role, AuthenticationDataSource authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
+    public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   
NamespaceOperation operation,
+                                                                   String 
originalRole,
+                                                                   String role,
+                                                                   
AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespaceOperationAsync(
+                namespaceName, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespaceOperationAsync(
+                namespaceName, operation, originalRole, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized 
&& isOriginalAuthorized);
+        } else {
+            return allowNamespaceOperationAsync(namespaceName, operation, 
role, authData);
         }
+    }
 
-        if (provider != null) {
-            return provider.allowNamespaceOperation(namespaceName, 
originalPrincipal, role, operation, authData);
+    public boolean allowNamespaceOperation(NamespaceName namespaceName,
+                                           NamespaceOperation operation,
+                                           String originalRole,
+                                           String role,
+                                           AuthenticationDataSource authData) {
+        try {
+            return allowNamespaceOperationAsync(
+                namespaceName, operation, originalRole, role, authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
         }
-
-        throw new IllegalStateException("No authorization provider configured 
for allowNamespaceOperation");
     }
 
     /**
@@ -429,7 +487,6 @@ public class AuthorizationService {
      *
      * @param namespaceName
      * @param operation
-     * @param originalRole
      * @param role
      * @param authData
      *            additional authdata in json for targeted authorization 
provider
@@ -437,33 +494,56 @@ public class AuthorizationService {
      * @throws IllegalStateException
      *             when failed to grant permission
      */
-    public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName 
policy,
-                                                                         
PolicyOperation operation, String originalRole,
-                                                                         
String role, AuthenticationDataSource authData) {
+    public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         
PolicyName policy,
+                                                                         
PolicyOperation operation,
+                                                                         
String role,
+                                                                         
AuthenticationDataSource authData) {
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowNamespacePolicyOperationAsync(namespaceName, 
policy, operation, originalRole, role, authData);
+            return provider.allowNamespacePolicyOperationAsync(namespaceName, 
policy, operation, role, authData);
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured for " +
                 "allowNamespacePolicyOperationAsync"));
     }
 
-    public Boolean allowNamespacePolicyOperation(NamespaceName namespaceName, 
PolicyName policy,
-                                                 PolicyOperation operation, 
String originalPrincipal, String role,
-                                                 AuthenticationDataHttps 
authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
+    public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         
PolicyName policy,
+                                                                         
PolicyOperation operation,
+                                                                         
String originalRole,
+                                                                         
String role,
+                                                                         
AuthenticationDataSource authData) {
+        validateOriginalPrincipal(conf.getProxyRoles(), role, originalRole);
+        if (isProxyRole(role)) {
+            CompletableFuture<Boolean> isRoleAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, role, authData);
+            CompletableFuture<Boolean> isOriginalAuthorizedFuture = 
allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, originalRole, authData);
+            return 
isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture,
+                (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized 
&& isOriginalAuthorized);
+        } else {
+            return allowNamespacePolicyOperationAsync(namespaceName, policy, 
operation, role, authData);
         }
+    }
 
-        if (provider != null) {
-            return provider.allowNamespacePolicyOperation(namespaceName, 
policy, operation, originalPrincipal, role, authData);
+    public boolean allowNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation,
+                                                 String originalRole,
+                                                 String role,
+                                                 AuthenticationDataSource 
authData) {
+        try {
+            return allowNamespacePolicyOperationAsync(
+                namespaceName, policy, operation, originalRole, role, 
authData).get();
+        } catch (InterruptedException e) {
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            throw new RestException(e.getCause());
         }
-
-        throw new IllegalStateException("No authorization provider configured 
for allowNamespacePolicyOperation");
     }
 
     /**
@@ -478,32 +558,43 @@ public class AuthorizationService {
      * @throws IllegalStateException
      *             when failed to grant permission
      */
-    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName, TopicOperation operation,
-                                                               String 
originalRole, String role,
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName,
+                                                               TopicOperation 
operation,
+                                                               String role,
                                                                
AuthenticationDataSource authData) {
+        if (log.isDebugEnabled()) {
+            log.debug("Check if role {} is allowed to execute topic operation 
{} on topic {}",
+                role, operation, topicName);
+        }
         if (!this.conf.isAuthorizationEnabled()) {
             return CompletableFuture.completedFuture(true);
         }
 
         if (provider != null) {
-            return provider.allowTopicOperationAsync(topicName, originalRole, 
role, operation, authData);
+            CompletableFuture<Boolean> allowFuture =
+                provider.allowTopicOperationAsync(topicName, role, operation, 
authData);
+            if (log.isDebugEnabled()) {
+                return allowFuture.whenComplete((allowed, exception) -> {
+                    if (exception == null) {
+                        if (allowed) {
+                            log.debug("Topic operation {} on topic {} is 
allowed: role = {}",
+                                operation, topicName, role);
+                        } else{
+                            log.debug("Topic operation {} on topic {} is NOT 
allowed: role = {}",
+                                operation, topicName, role);
+                        }
+                    } else {
+                        log.debug("Failed to check if topic operation {} on 
topic {} is allowed:"
+                                + " role = {}",
+                            operation, topicName, role, exception);
+                    }
+                });
+            } else {
+                return allowFuture;
+            }
         }
 
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured for " +
                 "allowTopicOperationAsync"));
     }
-
-    public Boolean allowTopicOperation(TopicName topicName, TopicOperation 
operation,
-                                         String orignalRole, String role,
-                                         AuthenticationDataSource authData) {
-        if (!this.conf.isAuthorizationEnabled()) {
-            return true;
-        }
-
-        if (provider != null) {
-            return provider.allowTopicOperation(topicName, orignalRole, role, 
operation, authData);
-        }
-
-        throw new IllegalStateException("No authorization provider configured 
for allowTopicOperation");
-    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index d7cea44..66d0c2e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.authorization;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -29,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import com.google.common.base.Joiner;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -39,8 +39,6 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.Policies;
-import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;
-
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -528,38 +526,43 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
     }
 
     @Override
-    public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName, String originalRole, String role,
-                                                           TenantOperation 
operation,
-                                                           
AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(tenantName, originalRole, role, 
authData);
+    public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName,
+                                                                String role,
+                                                                
TenantOperation operation,
+                                                                
AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(tenantName, role, authData);
     }
 
     @Override
-    public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole,
-                                                              String role, 
NamespaceOperation operation,
-                                                              
AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(namespaceName.getTenant(), 
originalRole, role, authData);
+    public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName,
+                                                                   String role,
+                                                                   
NamespaceOperation operation,
+                                                                   
AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(namespaceName.getTenant(), role, 
authData);
     }
 
     @Override
-    public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName, PolicyName 
policy,
-                                                                         
PolicyOperation operation, String originalRole,
-                                                                         
String role, AuthenticationDataSource authData) {
-        return validateTenantAdminAccess(namespaceName.getTenant(), 
originalRole, role, authData);
+    public CompletableFuture<Boolean> 
allowNamespacePolicyOperationAsync(NamespaceName namespaceName,
+                                                                         
PolicyName policy,
+                                                                         
PolicyOperation operation,
+                                                                         
String role,
+                                                                         
AuthenticationDataSource authData) {
+        return validateTenantAdminAccess(namespaceName.getTenant(), role, 
authData);
     }
 
     @Override
-    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName, String originalRole, String role,
+    public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topicName,
+                                                               String role,
                                                                TopicOperation 
operation,
                                                                
AuthenticationDataSource authData) {
         CompletableFuture<Boolean> isAuthorizedFuture;
 
         switch (operation) {
-            case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, 
StringUtils.isBlank(originalRole) ? role : originalRole, authData);
+            case LOOKUP: isAuthorizedFuture = canLookupAsync(topicName, role, 
authData);
                 break;
-            case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, 
StringUtils.isBlank(originalRole) ? role : originalRole, authData);
+            case PRODUCE: isAuthorizedFuture = canProduceAsync(topicName, 
role, authData);
                 break;
-            case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, 
StringUtils.isBlank(originalRole) ? role : originalRole, authData, 
authData.getSubscription());
+            case CONSUME: isAuthorizedFuture = canConsumeAsync(topicName, 
role, authData, authData.getSubscription());
                 break;
             default: isAuthorizedFuture = FutureUtil.failedFuture(
                     new IllegalStateException("TopicOperation is not 
supported."));
@@ -568,7 +571,14 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, 
authData, conf);
 
         return isSuperUserFuture
-                .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) 
-> isSuperUser || isAuthorized);
+                .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) 
-> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Verify if role {} is allowed to {} to topic 
{}:"
+                                + " isSuperUser={}, isAuthorized={}",
+                            role, operation, topicName, isSuperUser, 
isAuthorized);
+                    }
+                    return isSuperUser || isAuthorized;
+                });
     }
 
     private static String path(String... parts) {
@@ -578,43 +588,20 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return sb.toString();
     }
 
-    private CompletableFuture<Boolean> validateTenantAdminAccess(String 
tenantName, String originalRole, String role,
+    private CompletableFuture<Boolean> validateTenantAdminAccess(String 
tenantName,
+                                                                 String role,
                                                                 
AuthenticationDataSource authData) {
         try {
             TenantInfo tenantInfo = configCache.propertiesCache()
                     .get(path(POLICIES, tenantName))
                     .orElseThrow(() -> new 
RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));
 
-            validateOriginalPrincipal(conf.getProxyRoles(), role, 
originalRole);
-
-            if (role != null && conf.getProxyRoles().contains(role)) {
-                // role check
-                CompletableFuture<Boolean> isRoleSuperUserFuture = 
isSuperUser(role, authData, conf);
-                CompletableFuture<Boolean> isRoleTenantAdminFuture = 
isTenantAdmin(tenantName, role, tenantInfo, authData);
-                CompletableFuture<Boolean> isRoleAuthorizedFuture = 
isRoleSuperUserFuture
-                        .thenCombine(isRoleTenantAdminFuture, 
(isRoleSuperUser, isRoleTenantAdmin) ->
-                                isRoleSuperUser || isRoleTenantAdmin);
-
-                // originalRole check
-                CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = 
isSuperUser(originalRole, authData, conf);
-                CompletableFuture<Boolean> isOriginalRoleTenantAdminFuture = 
isTenantAdmin(tenantName, originalRole,
-                        tenantInfo, authData);
-                CompletableFuture<Boolean> isOriginalRoleAuthorizedFuture = 
isOriginalRoleSuperUserFuture
-                        .thenCombine(isOriginalRoleTenantAdminFuture, 
(isOriginalRoleSuperUser, isOriginalRoleTenantAdmin) ->
-                                isOriginalRoleSuperUser || 
isOriginalRoleTenantAdmin);
-
-                // merging
-                return isRoleAuthorizedFuture
-                        .thenCombine(isOriginalRoleAuthorizedFuture, 
(isRoleAuthorized, isOriginalRoleAuthorized) ->
-                                isRoleAuthorized && isOriginalRoleAuthorized);
-            } else {
-                // role check
-                CompletableFuture<Boolean> isRoleSuperUserFuture = 
isSuperUser(role, authData, conf);
-                CompletableFuture<Boolean> isRoleTenantAdminFuture = 
isTenantAdmin(tenantName, role, tenantInfo, authData);
-                return isRoleSuperUserFuture
-                        .thenCombine(isRoleTenantAdminFuture, 
(isRoleSuperUser, isRoleTenantAdmin) ->
-                                isRoleSuperUser || isRoleTenantAdmin);
-            }
+            // role check
+            CompletableFuture<Boolean> isRoleSuperUserFuture = 
isSuperUser(role, authData, conf);
+            CompletableFuture<Boolean> isRoleTenantAdminFuture = 
isTenantAdmin(tenantName, role, tenantInfo, authData);
+            return isRoleSuperUserFuture
+                    .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, 
isRoleTenantAdmin) ->
+                            isRoleSuperUser || isRoleTenantAdmin);
         } catch (KeeperException.NoNodeException e) {
             log.warn("Failed to get tenant info data for non existing tenant 
{}", tenantName);
             throw new RestException(Response.Status.NOT_FOUND, "Tenant does 
not exist");
@@ -624,18 +611,4 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         }
     }
 
-    private static void validateOriginalPrincipal(Set<String> proxyRoles, 
String authenticatedPrincipal,
-                                                  String originalPrincipal) {
-        if (proxyRoles.contains(authenticatedPrincipal)) {
-            // Request has come from a proxy
-            if (StringUtils.isBlank(originalPrincipal)) {
-                log.warn("Original principal empty in request authenticated as 
{}", authenticatedPrincipal);
-                throw new RestException(Response.Status.UNAUTHORIZED, 
"Original principal cannot be empty if the request is via proxy.");
-            }
-            if (proxyRoles.contains(originalPrincipal)) {
-                log.warn("Original principal {} cannot be a proxy role ({})", 
originalPrincipal, proxyRoles);
-                throw new RestException(Response.Status.UNAUTHORIZED, 
"Original principal cannot be a proxy role");
-            }
-        }
-    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
index 6b4fc8c..8555f34 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/AuthenticationFilter.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.web;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.IOException;
 
 import javax.servlet.Filter;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 612c7a3..e66d29c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2617,6 +2617,36 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
+     * Get partitioned topic metadata without checking the permission.
+     */
+    public static CompletableFuture<PartitionedTopicMetadata> 
unsafeGetPartitionedTopicMetadataAsync(
+        PulsarService pulsar, TopicName topicName) {
+        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture();
+
+        String path = path(PARTITIONED_TOPIC_PATH_ZNODE, 
topicName.getNamespace(),
+            topicName.getDomain().toString(), topicName.getEncodedLocalName());
+
+        // validates global-namespace contains local/peer cluster: if 
peer/local cluster present then lookup can
+        // serve/redirect request else fail partitioned-metadata-request so, 
client fails while creating
+        // producer/consumer
+        checkLocalOrGetPeerReplicationCluster(pulsar, 
topicName.getNamespaceObject())
+            .thenCompose(res -> pulsar.getBrokerService()
+                
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
+            .thenAccept(metadata -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("Total number of partitions for topic {} is {}", 
topicName,
+                        metadata.partitions);
+                }
+                metadataFuture.complete(metadata);
+            }).exceptionally(ex -> {
+            metadataFuture.completeExceptionally(ex.getCause());
+            return null;
+        });
+
+        return metadataFuture;
+    }
+
+    /**
      * Get the Topic object reference from the Pulsar broker
      */
     private Topic getTopicReference(TopicName topicName) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 2131b18..03bbe12 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -152,7 +152,7 @@ public class Consumer {
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
         this.appId = appId;
-        this.authenticationData = cnx.authenticationData;
+        this.authenticationData = cnx.getAuthenticationData();
         this.preciseDispatcherFlowControl = 
cnx.isPreciseDispatcherFlowControl();
 
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 771cd21..258186b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -101,7 +101,7 @@ public class Producer {
         this.epoch = epoch;
         this.closeFuture = new CompletableFuture<>();
         this.appId = appId;
-        this.authenticationData = cnx.authenticationData;
+        this.authenticationData = cnx.getAuthenticationData();
         this.msgIn = new Rate();
         this.chuckedMessageRate = new Rate();
         this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 15b3c97..8949db4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.getPartitionedTopicMetadata;
+import static 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
 import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 import static 
org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
@@ -141,6 +142,7 @@ public class ServerCnx extends PulsarHandler {
     // In case of proxy, if the authentication credentials are forwardable,
     // it will hold the credentials of the original client
     AuthenticationState originalAuthState;
+    AuthenticationDataSource originalAuthData;
     private boolean pendingAuthChallengeResponse = false;
 
     // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
@@ -273,6 +275,65 @@ public class ServerCnx extends PulsarHandler {
     // // Incoming commands handling
     // ////
 
+    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName 
topicName, TopicOperation operation) {
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        CompletableFuture<Boolean> isAuthorizedFuture;
+        if (service.isAuthorizationEnabled()) {
+            if (originalPrincipal != null) {
+                isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(
+                    topicName, operation, originalPrincipal, 
getAuthenticationData());
+            } else {
+                isProxyAuthorizedFuture = 
CompletableFuture.completedFuture(true);
+            }
+            isAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(
+                topicName, operation, authRole, authenticationData);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            isAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, 
(isProxyAuthorized, isAuthorized) -> {
+            if (!isProxyAuthorized) {
+                log.error("OriginalRole {} is not authorized to perform 
operation {} on topic {}",
+                    originalPrincipal, operation, topicName);
+            }
+            if (!isAuthorized) {
+                log.error("Role {} is not authorized to perform operation {} 
on topic {}",
+                    authRole, operation, topicName);
+            }
+            return isProxyAuthorized && isAuthorized;
+        });
+    }
+
+    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName 
topicName, String subscriptionName, TopicOperation operation) {
+        CompletableFuture<Boolean> isProxyAuthorizedFuture;
+        CompletableFuture<Boolean> isAuthorizedFuture;
+        if (service.isAuthorizationEnabled()) {
+            if (authenticationData == null) {
+                authenticationData = new AuthenticationDataCommand("", 
subscriptionName);
+            } else {
+                authenticationData.setSubscription(subscriptionName);
+            }
+            if (originalAuthData != null) {
+                originalAuthData.setSubscription(subscriptionName);
+            }
+            return isTopicOperationAllowed(topicName, operation);
+        } else {
+            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
+            isAuthorizedFuture = CompletableFuture.completedFuture(true);
+        }
+        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, 
(isProxyAuthorized, isAuthorized) -> {
+            if (!isProxyAuthorized) {
+                log.error("OriginalRole {} is not authorized to perform 
operation {} on topic {}, subscription {}",
+                    originalPrincipal, operation, topicName, subscriptionName);
+            }
+            if (!isAuthorized) {
+                log.error("Role {} is not authorized to perform operation {} 
on topic {}, subscription {}",
+                    authRole, operation, topicName, subscriptionName);
+            }
+            return isProxyAuthorized && isAuthorized;
+        });
+    }
+
     @Override
     protected void handleLookup(CommandLookupTopic lookup) {
         final long requestId = lookup.getRequestId();
@@ -297,18 +358,10 @@ public class ServerCnx extends PulsarHandler {
                 lookupSemaphore.release();
                 return;
             }
-            CompletableFuture<Boolean> isProxyAuthorizedFuture;
-            if (service.isAuthorizationEnabled() && originalPrincipal != null) 
{
-                isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                        TopicOperation.LOOKUP, originalPrincipal, authRole, 
authenticationData);
-            } else {
-                isProxyAuthorizedFuture = 
CompletableFuture.completedFuture(true);
-            }
-            String finalOriginalPrincipal = originalPrincipal;
-            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-                if (isProxyAuthorized) {
+            isTopicOperationAllowed(topicName, 
TopicOperation.LOOKUP).thenApply(isAuthorized -> {
+                if (isAuthorized) {
                     lookupTopicAsync(getBrokerService().pulsar(), topicName, 
authoritative,
-                            finalOriginalPrincipal != null ? 
finalOriginalPrincipal : authRole, authenticationData,
+                            getPrincipal(), getAuthenticationData(),
                             requestId, 
advertisedListenerName).handle((lookupResponse, ex) -> {
                                 if (ex == null) {
                                     ctx.writeAndFlush(lookupResponse);
@@ -324,14 +377,14 @@ public class ServerCnx extends PulsarHandler {
                             });
                 } else {
                     final String msg = "Proxy Client is not authorized to 
Lookup";
-                    log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, authRole, topicName);
+                    log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, getPrincipal(), topicName);
                     
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
                     lookupSemaphore.release();
                 }
                 return null;
             }).exceptionally(ex -> {
                 final String msg = "Exception occured while trying to 
authorize lookup";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, 
msg, authRole, topicName, ex);
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, 
msg, getPrincipal(), topicName, ex);
                 
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthorizationError, msg, 
requestId));
                 lookupSemaphore.release();
                 return null;
@@ -369,19 +422,10 @@ public class ServerCnx extends PulsarHandler {
                 lookupSemaphore.release();
                 return;
             }
-            CompletableFuture<Boolean> isProxyAuthorizedFuture;
-            if (service.isAuthorizationEnabled() && originalPrincipal != null) 
{
-                isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                        TopicOperation.LOOKUP, originalPrincipal, authRole, 
authenticationData);
-            } else {
-                isProxyAuthorizedFuture = 
CompletableFuture.completedFuture(true);
-            }
-            String finalOriginalPrincipal = originalPrincipal;
-            isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-                if (isProxyAuthorized) {
-                    getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                            authRole, finalOriginalPrincipal, 
authenticationData,
-                            topicName).handle((metadata, ex) -> {
+            isTopicOperationAllowed(topicName, 
TopicOperation.LOOKUP).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    
unsafeGetPartitionedTopicMetadataAsync(getBrokerService().pulsar(), topicName)
+                        .handle((metadata, ex) -> {
                                 if (ex == null) {
                                     int partitions = metadata.partitions;
                                     
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
@@ -407,7 +451,7 @@ public class ServerCnx extends PulsarHandler {
                             });
                 } else {
                     final String msg = "Proxy Client is not authorized to Get 
Partition Metadata";
-                    log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, authRole, topicName);
+                    log.warn("[{}] {} with role {} on topic {}", 
remoteAddress, msg, getPrincipal(), topicName);
                     ctx.writeAndFlush(
                             
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, 
requestId));
                     lookupSemaphore.release();
@@ -415,7 +459,7 @@ public class ServerCnx extends PulsarHandler {
                 return null;
             }).exceptionally(ex -> {
                 final String msg = "Exception occured while trying to 
authorize get Partition Metadata";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, 
msg, authRole, topicName);
+                log.warn("[{}] {} with role {} on topic {}", remoteAddress, 
msg, getPrincipal(), topicName);
                 
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
 msg, requestId));
                 lookupSemaphore.release();
                 return null;
@@ -505,6 +549,9 @@ public class ServerCnx extends PulsarHandler {
         String authRole = useOriginalAuthState ? originalPrincipal : 
this.authRole;
         AuthData brokerData = authState.authenticate(clientData);
 
+        if (log.isDebugEnabled()) {
+            log.debug("Authenticate using original auth state : {}, role = 
{}", useOriginalAuthState, authRole);
+        }
 
         if (authState.isComplete()) {
             // Authentication has completed. It was either:
@@ -520,7 +567,7 @@ public class ServerCnx extends PulsarHandler {
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Client successfully authenticated with {} role 
{} and originalPrincipal {}",
-                        remoteAddress, authMethod, authRole, 
originalPrincipal);
+                        remoteAddress, authMethod, this.authRole, 
originalPrincipal);
             }
 
             if (state != State.Connected) {
@@ -607,8 +654,12 @@ public class ServerCnx extends PulsarHandler {
         checkArgument(state == State.Start);
 
         if (log.isDebugEnabled()) {
-            log.debug("Received CONNECT from {}, auth enabled: {}",
-                remoteAddress, service.isAuthenticationEnabled());
+            log.debug("Received CONNECT from {}, auth enabled: {}:"
+                    + " has original principal = {}, original principal = {}",
+                remoteAddress,
+                service.isAuthenticationEnabled(),
+                connect.hasOriginalPrincipal(),
+                connect.getOriginalPrincipal());
         }
 
         String clientVersion = connect.getClientVersion();
@@ -656,6 +707,12 @@ public class ServerCnx extends PulsarHandler {
 
             authState = authenticationProvider.newAuthState(clientData, 
remoteAddress, sslSession);
             authenticationData = authState.getAuthDataSource();
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Authenticate role : {}", remoteAddress,
+                    authState != null ? authState.getAuthRole() : null);
+            }
+
             state = doAuthentication(clientData, clientProtocolVersion, 
clientVersion);
 
             // This will fail the check if:
@@ -684,9 +741,18 @@ public class ServerCnx extends PulsarHandler {
                         AuthData.of(connect.getOriginalAuthData().getBytes()),
                         remoteAddress,
                         sslSession);
+                originalAuthData = originalAuthState.getAuthDataSource();
                 originalPrincipal = originalAuthState.getAuthRole();
+
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Authenticate original role : {}", 
remoteAddress, originalPrincipal);
+                }
             } else {
                 originalPrincipal = connect.hasOriginalPrincipal() ? 
connect.getOriginalPrincipal() : null;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Authenticate original role (forwarded from 
proxy): {}",
+                        remoteAddress, originalPrincipal);
+                }
             }
         } catch (Exception e) {
             String msg = "Unable to authenticate";
@@ -737,6 +803,11 @@ public class ServerCnx extends PulsarHandler {
             return;
         }
 
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Handle subscribe command: auth role = {}, original 
auth role = {}",
+                remoteAddress, authRole, originalPrincipal);
+        }
+
         if (invalidOriginalPrincipal(originalPrincipal)) {
             final String msg = "Valid Proxy Client role should be provided 
while subscribing ";
             log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic 
{}", remoteAddress, msg, authRole,
@@ -765,33 +836,15 @@ public class ServerCnx extends PulsarHandler {
         final boolean forceTopicCreation = subscribe.getForceTopicCreation();
         final PulsarApi.KeySharedMeta keySharedMeta = 
subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null;
 
-        CompletableFuture<Boolean> isProxyAuthorizedFuture;
-        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            authenticationData.setSubscription(subscriptionName);
-            isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                    TopicOperation.CONSUME, originalPrincipal, authRole, 
authenticationData);
-        } else {
-            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-        }
-        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-            if (isProxyAuthorized) {
-                CompletableFuture<Boolean> authorizationFuture;
-                if (service.isAuthorizationEnabled()) {
-                    if (authenticationData == null) {
-                        authenticationData = new AuthenticationDataCommand("", 
subscriptionName);
-                    } else {
-                        authenticationData.setSubscription(subscriptionName);
-                    }
-                    authorizationFuture = 
service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                            TopicOperation.CONSUME, originalPrincipal, 
authRole, authenticationData);
-                } else {
-                    authorizationFuture = 
CompletableFuture.completedFuture(true);
-                }
-
-                authorizationFuture.thenApply(isAuthorized -> {
+        CompletableFuture<Boolean> isAuthorizedFuture = 
isTopicOperationAllowed(
+            topicName,
+            subscriptionName,
+            TopicOperation.CONSUME
+        );
+        isAuthorizedFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Client is authorized to subscribe 
with role {}", remoteAddress, authRole);
+                            log.debug("[{}] Client is authorized to subscribe 
with role {}", remoteAddress, getPrincipal());
                         }
 
                         log.info("[{}] Subscribing on topic {} / {}", 
remoteAddress, topicName, subscriptionName);
@@ -919,24 +972,12 @@ public class ServerCnx extends PulsarHandler {
                                 });
                     } else {
                         String msg = "Client is not authorized to subscribe";
-                        log.warn("[{}] {} with role {}", remoteAddress, msg, 
authRole);
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
                         ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
                     }
                     return null;
-                }).exceptionally(e -> {
-                    String msg = String.format("[%s] %s with role %s", 
remoteAddress, e.getMessage(), authRole);
-                    log.warn(msg);
-                    ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, e.getMessage()));
-                    return null;
-                });
-            } else {
-                final String msg = "Proxy Client is not authorized to 
subscribe";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, 
msg, authRole, topicName);
-                ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
-            }
-            return null;
         }).exceptionally(ex -> {
-            String msg = String.format("[%s] %s with role %s", remoteAddress, 
ex.getMessage(), authRole);
+            String msg = String.format("[%s] %s with role %s", remoteAddress, 
ex.getMessage(), getPrincipal());
             if (ex.getCause() instanceof PulsarServerException) {
                 log.info(msg);
             } else {
@@ -989,27 +1030,13 @@ public class ServerCnx extends PulsarHandler {
             return;
         }
 
-        CompletableFuture<Boolean> isProxyAuthorizedFuture;
-        if (service.isAuthorizationEnabled() && originalPrincipal != null) {
-            isProxyAuthorizedFuture = 
service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                    TopicOperation.PRODUCE, originalPrincipal, authRole, 
authenticationData);
-        } else {
-            isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
-        }
-        isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-            if (isProxyAuthorized) {
-                CompletableFuture<Boolean> authorizationFuture;
-                if (service.isAuthorizationEnabled()) {
-                    authorizationFuture = 
service.getAuthorizationService().allowTopicOperationAsync(topicName,
-                            TopicOperation.PRODUCE, originalPrincipal, 
authRole, authenticationData);
-                } else {
-                    authorizationFuture = 
CompletableFuture.completedFuture(true);
-                }
-
-                authorizationFuture.thenApply(isAuthorized -> {
+        CompletableFuture<Boolean> isAuthorizedFuture = 
isTopicOperationAllowed(
+            topicName, TopicOperation.PRODUCE
+        );
+        isAuthorizedFuture.thenApply(isAuthorized -> {
                     if (isAuthorized) {
                         if (log.isDebugEnabled()) {
-                            log.debug("[{}] Client is authorized to Produce 
with role {}", remoteAddress, authRole);
+                            log.debug("[{}] Client is authorized to Produce 
with role {}", remoteAddress, getPrincipal());
                         }
                         CompletableFuture<Producer> producerFuture = new 
CompletableFuture<>();
                         CompletableFuture<Producer> existingProducerFuture = 
producers.putIfAbsent(producerId,
@@ -1092,7 +1119,7 @@ public class ServerCnx extends PulsarHandler {
                             });
 
                             schemaVersionFuture.thenAccept(schemaVersion -> {
-                                Producer producer = new Producer(topic, 
ServerCnx.this, producerId, producerName, authRole,
+                                Producer producer = new Producer(topic, 
ServerCnx.this, producerId, producerName, getPrincipal(),
                                     isEncrypted, metadata, schemaVersion, 
epoch, userProvidedProducerName);
 
                                 try {
@@ -1153,24 +1180,12 @@ public class ServerCnx extends PulsarHandler {
                         });
                     } else {
                         String msg = "Client is not authorized to Produce";
-                        log.warn("[{}] {} with role {}", remoteAddress, msg, 
authRole);
+                        log.warn("[{}] {} with role {}", remoteAddress, msg, 
getPrincipal());
                         ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
                     }
                     return null;
-                }).exceptionally(e -> {
-                    String msg = String.format("[%s] %s with role %s", 
remoteAddress, e.getMessage(), authRole);
-                    log.warn(msg);
-                    ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, e.getMessage()));
-                    return null;
-                });
-            } else {
-                final String msg = "Proxy Client is not authorized to Produce";
-                log.warn("[{}] {} with role {} on topic {}", remoteAddress, 
msg, authRole, topicName);
-                ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, msg));
-            }
-            return null;
         }).exceptionally(ex -> {
-            String msg = String.format("[%s] %s with role %s", remoteAddress, 
ex.getMessage(), authRole);
+            String msg = String.format("[%s] %s with role %s", remoteAddress, 
ex.getMessage(), getPrincipal());
             log.warn(msg);
             ctx.writeAndFlush(Commands.newError(requestId, 
ServerError.AuthorizationError, ex.getMessage()));
             return null;
@@ -2043,7 +2058,11 @@ public class ServerCnx extends PulsarHandler {
     }
 
     public AuthenticationDataSource getAuthenticationData() {
-        return authenticationData;
+        return originalAuthData != null ? originalAuthData : 
authenticationData;
+    }
+
+    public String getPrincipal() {
+        return originalPrincipal != null ? originalPrincipal : authRole;
     }
 
     public AuthenticationProvider getAuthenticationProvider() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index fb7877e..edbc93c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -68,7 +68,6 @@ import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
-import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -422,10 +421,12 @@ public abstract class PulsarWebResource {
      * will throw an exception to redirect to assigned owner or leader; if 
authoritative is true then it will try to
      * acquire all the namespace bundles.
      *
-     * @param fqnn
-     * @param authoritative
-     * @param readOnly
-     * @param bundleData
+     * @param tenant tenant name
+     * @param cluster cluster name
+     * @param namespace namespace name
+     * @param authoritative if it is an authoritative request
+     * @param readOnly if the request is read-only
+     * @param bundleData bundle data
      */
     protected void validateNamespaceOwnershipWithBundles(String tenant, String 
cluster, String namespace,
             boolean authoritative, boolean readOnly, BundlesData bundleData) {
@@ -582,11 +583,8 @@ public abstract class PulsarWebResource {
      * client to the appropriate broker. If no broker owns the namespace yet, 
this function will try to acquire the
      * ownership by default.
      *
+     * @param topicName topic name
      * @param authoritative
-     *
-     * @param tenant
-     * @param cluster
-     * @param namespace
      */
     protected void validateTopicOwnership(TopicName topicName, boolean 
authoritative) {
         NamespaceService nsService = pulsar().getNamespaceService();
@@ -794,31 +792,33 @@ public abstract class PulsarWebResource {
     protected static final int NOT_IMPLEMENTED = 501;
 
     public void validateTenantOperation(String tenant, TenantOperation 
operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && 
pulsar().getBrokerService().isAuthorizationEnabled()) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+            && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.UNAUTHORIZED, "Need to 
authenticate to perform the request");
             }
 
-            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .allowTenantOperation(
-                            tenant, operation, originalPrincipal(), 
clientAppId(), clientAuthData());
-
+            boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                .allowTenantOperation(tenant, operation, originalPrincipal(), 
clientAppId(), clientAuthData());
             if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, 
String.format("Unauthorized to validateTenantOperation for" +
-                                " originalPrincipal [%s] and clientAppId [%s] 
about operation [%s] on tenant [%s]",
+                throw new RestException(Status.UNAUTHORIZED,
+                    String.format("Unauthorized to validateTenantOperation for"
+                            + " originalPrincipal [%s] and clientAppId [%s] 
about operation [%s] on tenant [%s]",
                         originalPrincipal(), clientAppId(), 
operation.toString(), tenant));
             }
         }
     }
 
     public void validateNamespaceOperation(NamespaceName namespaceName, 
NamespaceOperation operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && 
pulsar().getBrokerService().isAuthorizationEnabled()) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+            && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
             }
 
-            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .allowNamespaceOperation(namespaceName, operation, 
originalPrincipal(), clientAppId(), clientAuthData());
+            boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespaceOperation(namespaceName, operation, 
originalPrincipal(),
+                        clientAppId(), clientAuthData());
 
             if (!isAuthorized) {
                 throw new RestException(Status.FORBIDDEN, 
String.format("Unauthorized to validateNamespaceOperation for" +
@@ -827,14 +827,18 @@ public abstract class PulsarWebResource {
         }
     }
 
-    public void validateNamespacePolicyOperation(NamespaceName namespaceName, 
PolicyName policy, PolicyOperation operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && 
pulsar().getBrokerService().isAuthorizationEnabled()) {
+    public void validateNamespacePolicyOperation(NamespaceName namespaceName,
+                                                 PolicyName policy,
+                                                 PolicyOperation operation) {
+        if (pulsar().getConfiguration().isAuthenticationEnabled()
+            && pulsar().getBrokerService().isAuthorizationEnabled()) {
             if (!isClientAuthenticated(clientAppId())) {
                 throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
             }
 
-            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .allowNamespacePolicyOperation(namespaceName, policy, 
operation, originalPrincipal(), clientAppId(), clientAuthData());
+            boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
+                    .allowNamespacePolicyOperation(namespaceName, policy, 
operation,
+                        originalPrincipal(), clientAppId(), clientAuthData());
 
             if (!isAuthorized) {
                 throw new RestException(Status.FORBIDDEN, 
String.format("Unauthorized to validateNamespacePolicyOperation for" +
@@ -842,20 +846,4 @@ public abstract class PulsarWebResource {
             }
         }
     }
-
-    public void validateTopicOperation(TopicName topicName, TopicOperation 
operation) {
-        if (pulsar().getConfiguration().isAuthenticationEnabled() && 
pulsar().getBrokerService().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId())) {
-                throw new RestException(Status.UNAUTHORIZED, "Need to 
authenticate to perform the request");
-            }
-
-            Boolean isAuthorized = 
pulsar().getBrokerService().getAuthorizationService()
-                    .allowTopicOperation(topicName, operation, 
originalPrincipal(), clientAppId(), clientAuthData());
-
-            if (!isAuthorized) {
-                throw new RestException(Status.UNAUTHORIZED, 
String.format("Unauthorized to validateTopicOperation for" +
-                        " operation [%s] on topic [%s]", operation.toString(), 
topicName));
-            }
-        }
-    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 94a02f6..fd56340 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -75,6 +75,7 @@ import 
org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -180,6 +181,8 @@ public class ServerCnxTest {
         doReturn(zkCache).when(pulsar).getLocalZkCacheService();
 
         brokerService = spy(new BrokerService(pulsar));
+        BrokerInterceptor interceptor = mock(BrokerInterceptor.class);
+        doReturn(interceptor).when(brokerService).getInterceptor();
         doReturn(brokerService).when(pulsar).getBrokerService();
         doReturn(executor).when(pulsar).getOrderedExecutor();
 
@@ -474,7 +477,7 @@ public class ServerCnxTest {
     public void testProducerCommandWithAuthorizationPositive() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         resetChannel();
@@ -605,7 +608,7 @@ public class ServerCnxTest {
     public void testProducerCommandWithAuthorizationNegative() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1195,7 +1198,7 @@ public class ServerCnxTest {
     public void testSubscribeCommandWithAuthorizationPositive() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(true)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
@@ -1217,7 +1220,7 @@ public class ServerCnxTest {
     public void testSubscribeCommandWithAuthorizationNegative() throws 
Exception {
         AuthorizationService authorizationService = 
mock(AuthorizationService.class);
         
doReturn(CompletableFuture.completedFuture(false)).when(authorizationService).allowTopicOperationAsync(Mockito.any(),
-                Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+                Mockito.any(), Mockito.any(), Mockito.any());
         
doReturn(authorizationService).when(brokerService).getAuthorizationService();
         doReturn(true).when(brokerService).isAuthenticationEnabled();
         doReturn(true).when(brokerService).isAuthorizationEnabled();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 180142c..687b08d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -22,20 +22,18 @@ import static 
org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.net.URI;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
 
 import lombok.Cleanup;
-
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
@@ -54,16 +52,12 @@ import 
org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
-import org.apache.pulsar.common.util.RestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
 
@@ -435,7 +429,8 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         @Override
-        public CompletableFuture<Boolean> isSuperUser(String role, 
ServiceConfiguration serviceConfiguration) {
+        public CompletableFuture<Boolean> isSuperUser(String role,
+                                                      ServiceConfiguration 
serviceConfiguration) {
             Set<String> superUserRoles = 
serviceConfiguration.getSuperUserRoles();
             return CompletableFuture.completedFuture(role != null && 
superUserRoles.contains(role) ? true : false);
         }
@@ -509,32 +504,38 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         @Override
-        public CompletableFuture<Boolean> allowTenantOperationAsync(String 
tenantName, String originalRole, String role, TenantOperation operation, 
AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowTenantOperationAsync(
+            String tenantName, String role, TenantOperation operation, 
AuthenticationDataSource authData) {
             return CompletableFuture.completedFuture(true);
         }
 
         @Override
-        public Boolean allowTenantOperation(String tenantName, String 
originalRole, String role, TenantOperation operation, AuthenticationDataSource 
authData) {
+        public Boolean allowTenantOperation(
+            String tenantName, String role, TenantOperation operation, 
AuthenticationDataSource authData) {
             return true;
         }
 
         @Override
-        public CompletableFuture<Boolean> 
allowNamespaceOperationAsync(NamespaceName namespaceName, String originalRole, 
String role, NamespaceOperation operation, AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowNamespaceOperationAsync(
+            NamespaceName namespaceName, String role, NamespaceOperation 
operation, AuthenticationDataSource authData) {
             return CompletableFuture.completedFuture(true);
         }
 
         @Override
-        public Boolean allowNamespaceOperation(NamespaceName namespaceName, 
String originalRole, String role, NamespaceOperation operation, 
AuthenticationDataSource authData) {
+        public Boolean allowNamespaceOperation(
+            NamespaceName namespaceName, String role, NamespaceOperation 
operation, AuthenticationDataSource authData) {
             return null;
         }
 
         @Override
-        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic, String originalRole, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowTopicOperationAsync(
+            TopicName topic, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
             return CompletableFuture.completedFuture(true);
         }
 
         @Override
-        public Boolean allowTopicOperation(TopicName topicName, String 
originalRole, String role, TopicOperation operation, AuthenticationDataSource 
authData) {
+        public Boolean allowTopicOperation(
+            TopicName topicName, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
             return true;
         }
     }
@@ -566,18 +567,10 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
 
     public static class TestAuthorizationProviderWithSubscriptionPrefix 
extends TestAuthorizationProvider {
         @Override
-        public Boolean allowTopicOperation(TopicName topicName, String 
originalRole, String role, TopicOperation operation, AuthenticationDataSource 
authData) {
-            try {
-                return allowTopicOperationAsync(topicName, originalRole, role, 
operation, authData).get();
-            } catch (InterruptedException e) {
-                throw new RestException(e);
-            } catch (ExecutionException e) {
-                throw new RestException(e.getCause());
-            }
-        }
-
-        @Override
-        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic, String originalRole, String role, TopicOperation operation, 
AuthenticationDataSource authData) {
+        public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName 
topic,
+                                                                   String role,
+                                                                   
TopicOperation operation,
+                                                                   
AuthenticationDataSource authData) {
             CompletableFuture<Boolean> future = new CompletableFuture<>();
             if (authData.hasSubscription()) {
                 String subscription = authData.getSubscription();
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 56a933b..697ddf9 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -62,7 +62,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class AdminProxyHandler extends ProxyServlet {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(AdminProxyHandler.class);
+
+    private static final String ORIGINAL_PRINCIPAL_HEADER = 
"X-Original-Principal";
+
     private static final Set<String> functionRoutes = new 
HashSet<>(Arrays.asList(
         "/admin/v3/function",
         "/admin/v2/function",
@@ -334,7 +338,7 @@ class AdminProxyHandler extends ProxyServlet {
         super.addProxyHeaders(clientRequest, proxyRequest);
         String user = (String) 
clientRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName);
         if (user != null) {
-            proxyRequest.header("X-Original-Principal", user);
+            proxyRequest.header(ORIGINAL_PRINCIPAL_HEADER, user);
         }
     }
 }

Reply via email to