This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f33567e  Enhance Authorization by adding TenantAdmin interface (#6487)
f33567e is described below

commit f33567ec6b805a7ae704520ab93ff87b0abb02b6
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Thu Mar 5 21:19:26 2020 -0800

    Enhance Authorization by adding TenantAdmin interface (#6487)
    
    * Enhance Authorization by adding TenantAdmin interface
    
    * Remove debugging comment
    
    Co-authored-by: Sanjeev Kulkarni <[email protected]>
---
 .../authorization/AuthorizationProvider.java       | 13 +++++++++++
 .../broker/authorization/AuthorizationService.java |  9 ++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 +-
 .../pulsar/broker/web/PulsarWebResource.java       | 26 ++++++++++------------
 .../apache/pulsar/broker/admin/NamespacesTest.java |  2 ++
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  5 +++++
 .../discovery/service/BrokerDiscoveryProvider.java |  2 +-
 .../functions/worker/rest/api/ComponentImpl.java   |  4 ++--
 .../worker/rest/api/FunctionsImplTest.java         | 19 +++++++++++-----
 .../proxy/server/BrokerDiscoveryProvider.java      |  2 +-
 10 files changed, 59 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 9787eae..cb65416 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -29,6 +29,7 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 
 /**
  * Provider of authorization mechanism
@@ -47,6 +48,18 @@ public interface AuthorizationProvider extends Closeable {
     }
 
     /**
+     * Check if specified role is an admin of the tenant
+     * @param tenant the tenant to check
+     * @param role the role to check
+     * @return a CompletableFuture containing a boolean in which true means 
the role is an admin user
+     * and false if it is not
+     */
+    default CompletableFuture<Boolean> isTenantAdmin(String tenant, String 
role, TenantInfo tenantInfo,
+                                                     AuthenticationDataSource 
authenticationData) {
+        return CompletableFuture.completedFuture(role != null && 
tenantInfo.getAdminRoles() != null && tenantInfo.getAdminRoles().contains(role) 
? true : false);
+    }
+
+    /**
      * Perform initialization for the authorization provider
      *
      * @param conf
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 95ff764..381e8cf 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -26,6 +26,7 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +77,14 @@ public class AuthorizationService {
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
     }
 
+    public CompletableFuture<Boolean> isTenantAdmin(String tenant, String 
role, TenantInfo tenantInfo,
+                                                    AuthenticationDataSource 
authenticationData) {
+        if (provider != null) {
+            return provider.isTenantAdmin(tenant, role, tenantInfo, 
authenticationData);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
+    }
+
     /**
      *
      * Grant authorization-action permission on a namespace to the given client
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 503b21b..e37f09d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1797,7 +1797,7 @@ public class PersistentTopicsBase extends AdminResource {
                 checkAuthorization(pulsar, topicName, clientAppId, 
authenticationData);
             } catch (RestException e) {
                 try {
-                    validateAdminAccessForTenant(pulsar, clientAppId, 
originalPrincipal, topicName.getTenant());
+                    validateAdminAccessForTenant(pulsar, clientAppId, 
originalPrincipal, topicName.getTenant(), authenticationData);
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on cluster {}", 
clientAppId, topicName.toString());
                     throw new 
PulsarClientException(String.format("Authorization failed %s on topic %s with 
error %s",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index ca9ec85..497883d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -52,6 +52,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authorization.AuthorizationProvider;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -224,7 +226,7 @@ public abstract class PulsarWebResource {
      */
     protected void validateAdminAccessForTenant(String tenant) {
         try {
-            validateAdminAccessForTenant(pulsar(), clientAppId(), 
originalPrincipal(), tenant);
+            validateAdminAccessForTenant(pulsar(), clientAppId(), 
originalPrincipal(), tenant, clientAuthData());
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
@@ -234,7 +236,8 @@ public abstract class PulsarWebResource {
     }
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
-                                                       String 
originalPrincipal, String tenant)
+                                                       String 
originalPrincipal, String tenant,
+                                                       
AuthenticationDataSource authenticationData)
             throws RestException, Exception {
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- 
role: {}", tenant,
@@ -259,22 +262,17 @@ public abstract class PulsarWebResource {
             
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
 
             if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-
                 CompletableFuture<Boolean> isProxySuperUserFuture;
                 CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
                 try {
-                    isProxySuperUserFuture = pulsar.getBrokerService()
-                            .getAuthorizationService()
-                            .isSuperUser(clientAppId);
+                    AuthorizationService authorizationService = 
pulsar.getBrokerService().getAuthorizationService();
+                    isProxySuperUserFuture = 
authorizationService.isSuperUser(clientAppId);
 
-                    isOriginalPrincipalSuperUserFuture = 
pulsar.getBrokerService()
-                            .getAuthorizationService()
-                            .isSuperUser(originalPrincipal);
+                    isOriginalPrincipalSuperUserFuture = 
authorizationService.isSuperUser(originalPrincipal);
 
-                Set<String> adminRoles = tenantInfo.getAdminRoles();
-                boolean proxyAuthorized = isProxySuperUserFuture.get() || 
adminRoles.contains(clientAppId);
-                boolean originalPrincipalAuthorized
-                    = isOriginalPrincipalSuperUserFuture.get() || 
adminRoles.contains(originalPrincipal);
+                    boolean proxyAuthorized = isProxySuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant, clientAppId, tenantInfo, 
authenticationData).get();
+                    boolean originalPrincipalAuthorized
+                    = isOriginalPrincipalSuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant, originalPrincipal, tenantInfo, 
authenticationData).get();
                     if (!proxyAuthorized || !originalPrincipalAuthorized) {
                         throw new RestException(Status.UNAUTHORIZED,
                                 String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
@@ -290,7 +288,7 @@ public abstract class PulsarWebResource {
                         .getAuthorizationService()
                         .isSuperUser(clientAppId)
                         .join()) {
-                    if (!tenantInfo.getAdminRoles().contains(clientAppId)) {
+                    if 
(!pulsar.getBrokerService().getAuthorizationService().isTenantAdmin(tenant, 
clientAppId, tenantInfo, authenticationData).get()) {
                         throw new RestException(Status.UNAUTHORIZED,
                                 "Don't have permission to administrate 
resources on this tenant");
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index a607921..0f7d100 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -145,6 +145,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         doReturn(false).when(namespaces).isRequestHttps();
         doReturn("test").when(namespaces).clientAppId();
         doReturn(null).when(namespaces).originalPrincipal();
+        doReturn(null).when(namespaces).clientAuthData();
         doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", 
"global"))).when(namespaces).clusters();
         
doNothing().when(namespaces).validateAdminAccessForTenant(this.testTenant);
         
doNothing().when(namespaces).validateAdminAccessForTenant("non-existing-tenant");
@@ -987,6 +988,7 @@ public class NamespacesTest extends 
MockedPulsarServiceBaseTest {
         doReturn(false).when(topics).isRequestHttps();
         doReturn("test").when(topics).clientAppId();
         doReturn(null).when(topics).originalPrincipal();
+        doReturn(null).when(topics).clientAuthData();
         mockWebUrl(localWebServiceUrl, testNs);
         doReturn("persistent").when(topics).domain();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 8b7e92e..ed7a0dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -42,6 +42,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -109,13 +111,16 @@ public class PulsarFunctionTlsTest {
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderTls.class.getName());
         config.setAuthenticationEnabled(true);
+        config.setAuthorizationEnabled(true);
         config.setAuthenticationProviders(providers);
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsAllowInsecureConnection(true);
         functionsWorkerService = spy(createPulsarFunctionWorker(config));
         AuthenticationService authenticationService = new 
AuthenticationService(config);
+        AuthorizationService authorizationService = new 
AuthorizationService(config, mock(ConfigurationCacheService.class));
         
when(functionsWorkerService.getAuthenticationService()).thenReturn(authenticationService);
+        
when(functionsWorkerService.getAuthorizationService()).thenReturn(authorizationService);
         when(functionsWorkerService.isInitialized()).thenReturn(true);
 
         PulsarAdmin admin = mock(PulsarAdmin.class);
diff --git 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 8b3a012..de3fef1 100644
--- 
a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ 
b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -155,7 +155,7 @@ public class BrokerDiscoveryProvider implements Closeable {
                 throw new IllegalAccessException(String.format("Failed to get 
property %s admin data due to %s",
                         topicName.getTenant(), e.getMessage()));
             }
-            if (!tenantInfo.getAdminRoles().contains(role)) {
+            if 
(!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, 
tenantInfo, authenticationData).get()) {
                 throw new IllegalAccessException("Don't have permission to 
administrate resources on this property");
             }
         }
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 98cc3a5..cab2b07 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1449,10 +1449,10 @@ public abstract class ComponentImpl {
             if (clientRole != null) {
                 try {
                     TenantInfo tenantInfo = 
worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-                    if (tenantInfo != null && tenantInfo.getAdminRoles() != 
null && tenantInfo.getAdminRoles().contains(clientRole)) {
+                    if (tenantInfo != null && 
worker().getAuthorizationService().isTenantAdmin(tenant, clientRole, 
tenantInfo, authenticationData).get()) {
                         return true;
                     }
-                } catch (PulsarAdminException.NotFoundException e) {
+                } catch (PulsarAdminException.NotFoundException | 
InterruptedException | ExecutionException e) {
 
                 }
             }
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 258c1237..cd55b2a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker.rest.api;
 
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -60,6 +61,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyInt;
@@ -230,51 +232,56 @@ public class FunctionsImplTest {
     }
 
     @Test
-    public void testIsAuthorizedRole() throws PulsarAdminException {
-
+    public void testIsAuthorizedRole() throws PulsarAdminException, 
InterruptedException, ExecutionException {
 
+        TenantInfo tenantInfo = new TenantInfo();
+        AuthenticationDataSource authenticationDataSource = 
mock(AuthenticationDataSource.class);
         FunctionsImpl functionImpl = spy(new FunctionsImpl(() -> 
mockedWorkerService));
+        AuthorizationService authorizationService = 
mock(AuthorizationService.class);
+        
doReturn(authorizationService).when(mockedWorkerService).getAuthorizationService();
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setAuthorizationEnabled(true);
         workerConfig.setSuperUserRoles(Collections.singleton(superUser));
         doReturn(workerConfig).when(mockedWorkerService).getWorkerConfig();
 
         // test super user
-        AuthenticationDataSource authenticationDataSource = 
mock(AuthenticationDataSource.class);
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", 
superUser, authenticationDataSource));
 
         // test normal user
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), 
any());
         Tenants tenants = mock(Tenants.class);
-        when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
+        when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
         PulsarAdmin admin = mock(PulsarAdmin.class);
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
+        when(authorizationService.isTenantAdmin("test-tenant", "test-user", 
tenantInfo, 
authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(false));
         assertFalse(functionImpl.isAuthorizedRole("test-tenant", "test-ns", 
"test-user", authenticationDataSource));
 
         // if user is tenant admin
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(false).when(functionImpl).allowFunctionOps(any(), any(), 
any());
         tenants = mock(Tenants.class);
-        TenantInfo tenantInfo = new TenantInfo();
         tenantInfo.setAdminRoles(Collections.singleton("test-user"));
         when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
 
         admin = mock(PulsarAdmin.class);
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
+        when(authorizationService.isTenantAdmin("test-tenant", "test-user", 
tenantInfo, 
authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", 
"test-user", authenticationDataSource));
 
         // test user allow function action
         functionImpl = spy(new FunctionsImpl(() -> mockedWorkerService));
         doReturn(true).when(functionImpl).allowFunctionOps(any(), any(), 
any());
         tenants = mock(Tenants.class);
-        when(tenants.getTenantInfo(any())).thenReturn(new TenantInfo());
+        tenantInfo.setAdminRoles(Collections.emptySet());
+        when(tenants.getTenantInfo(any())).thenReturn(tenantInfo);
 
         admin = mock(PulsarAdmin.class);
         when(admin.tenants()).thenReturn(tenants);
         when(this.mockedWorkerService.getBrokerAdmin()).thenReturn(admin);
+        when(authorizationService.isTenantAdmin("test-tenant", "test-user", 
tenantInfo, 
authenticationDataSource)).thenReturn(CompletableFuture.completedFuture(true));
         assertTrue(functionImpl.isAuthorizedRole("test-tenant", "test-ns", 
"test-user", authenticationDataSource));
 
         // test role is null
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
index 28f28cd..e055872 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/BrokerDiscoveryProvider.java
@@ -153,7 +153,7 @@ public class BrokerDiscoveryProvider implements Closeable {
                 throw new IllegalAccessException(String.format("Failed to get 
property %s admin data due to %s",
                         topicName.getTenant(), e.getMessage()));
             }
-            if (!tenantInfo.getAdminRoles().contains(role)) {
+            if 
(!service.getAuthorizationService().isTenantAdmin(topicName.getTenant(), role, 
tenantInfo, authenticationData).get()) {
                 throw new IllegalAccessException("Don't have permission to 
administrate resources on this tenant");
             }
         }

Reply via email to