This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 02df3d2  Allow subscribers to access subscription admin-api (#2981)
02df3d2 is described below

commit 02df3d2bbf6c4dc893aa1ea0f04221e8d50c72a4
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Dec 3 14:17:50 2018 -0800

    Allow subscribers to access subscription admin-api (#2981)
    
    * Allow subscribers to access subscription admin-api
    
    * add sub-authorization check on canConsume
    
    * make per-sub optional
    
    * fix: allowe super-admin to consume
---
 .../authorization/AuthorizationProvider.java       |  23 ++++
 .../broker/authorization/AuthorizationService.java |  35 ++++++
 .../authorization/PulsarAuthorizationProvider.java |  81 ++++++++++++++
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  45 ++++++++
 .../broker/admin/impl/PersistentTopicsBase.java    |  46 ++++++--
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  25 +++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  26 +++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   2 +-
 .../api/AuthorizationProducerConsumerTest.java     | 120 ++++++++++++++++++++-
 .../org/apache/pulsar/client/admin/Namespaces.java |  18 ++++
 .../client/admin/internal/NamespacesImpl.java      |  24 +++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  42 +++++++-
 .../pulsar/common/policies/data/AuthPolicies.java  |   5 +-
 13 files changed, 476 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index cae2415..5d8b930 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -102,6 +102,29 @@ public interface AuthorizationProvider extends Closeable {
             String authDataJson);
 
     /**
+     * Grant permission to roles that can access subscription-admin api
+     * 
+     * @param namespace
+     * @param subscriptionName
+     * @param roles
+     * @param authDataJson
+     *            additional authdata in json format
+     * @return
+     */
+    CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName 
namespace, String subscriptionName, Set<String> roles,
+            String authDataJson);
+    
+    /**
+     * Revoke subscription admin-api access for a role
+     * @param namespace
+     * @param subscriptionName
+     * @param role
+     * @return
+     */
+    CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName 
namespace, String subscriptionName,
+            String role, String authDataJson);
+    
+    /**
      * Grant authorization-action permission on a topic to the given client
      *
      * @param topicName
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 76e98cc..95d012f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -95,6 +95,41 @@ public class AuthorizationService {
     }
 
     /**
+     * Grant permission to roles that can access subscription-admin api
+     * 
+     * @param namespace
+     * @param subscriptionName
+     * @param roles
+     * @param authDataJson
+     *            additional authdata in json for targeted authorization 
provider
+     * @return
+     */
+    public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            Set<String> roles, String authDataJson) {
+
+        if (provider != null) {
+            return provider.grantSubscriptionPermissionAsync(namespace, 
subscriptionName, roles, authDataJson);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
+    }
+
+    /**
+     * Revoke subscription admin-api access for a role
+     * 
+     * @param namespace
+     * @param subscriptionName
+     * @param role
+     * @return
+     */
+    public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            String role, String authDataJson) {
+        if (provider != null) {
+            return provider.revokeSubscriptionPermissionAsync(namespace, 
subscriptionName, role, authDataJson);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
+    }
+    
+    /**
      * Grant authorization-action permission on a topic to the given client
      *
      * @param topicname
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 4af6a0a..229efec 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -23,6 +23,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -44,6 +45,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  * Default authorization provider that stores authorization policies under 
local-zookeeper.
  *
@@ -110,6 +113,19 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                     }
                 } else {
                     if (isNotBlank(subscription) && !isSuperUser(role)) {
+                        // validate if role is authorize to access 
subscription. (skip validatation if authorization
+                        // list is empty)
+                        Set<String> roles = 
policies.get().auth_policies.subscription_auth_roles.get(subscription);
+                        if (roles != null && !roles.isEmpty() && 
!roles.contains(role)) {
+                            log.warn("[{}] is not authorized to subscribe on 
{}-{}", role, topicName, subscription);
+                            PulsarServerException ex = new 
PulsarServerException(
+                                    String.format("%s is not authorized to 
access subscription %s on topic %s", role,
+                                            subscription, topicName));
+                            permissionFuture.complete(false);
+                            return;
+                        }
+
+                        // validate if subscription-auth mode is configured
                         switch (policies.get().subscription_auth_mode) {
                         case Prefix:
                             if (!subscription.startsWith(role)) {
@@ -125,6 +141,7 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
                         }
                     }
                 }
+                // check namespace and topic level consume-permissions
                 checkAuthorization(topicName, role, 
AuthAction.consume).thenAccept(isAuthorized -> {
                     permissionFuture.complete(isAuthorized);
                 });
@@ -241,6 +258,70 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return result;
     }
 
+    @Override
+    public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            Set<String> roles, String authDataJson) {
+        return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
roles, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            String role, String authDataJson) {
+        return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
Collections.singleton(role), true);
+    }
+    
+    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName, Set<String> roles,
+            boolean remove) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        try {
+            validatePoliciesReadOnlyAccess();
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+        }
+
+        ZooKeeper globalZk = configCache.getZooKeeper();
+        final String policiesPath = String.format("/%s/%s/%s", "admin", 
POLICIES, namespace.toString());
+
+        try {
+            Stat nodeStat = new Stat();
+            byte[] content = globalZk.getData(policiesPath, null, nodeStat);
+            Policies policies = getThreadLocal().readValue(content, 
Policies.class);
+            if (remove) {
+                if 
(policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
+                    
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
+                }else {
+                    log.info("[{}] Couldn't find role {} while revoking for 
sub = {}", namespace, subscriptionName, roles);
+                    result.completeExceptionally(new 
IllegalArgumentException("couldn't find subscription"));
+                    return result;
+                }
+            } else {
+                
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
+            }
+
+            // Write back the new policies into zookeeper
+            globalZk.setData(policiesPath, 
getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());
+
+            configCache.policiesCache().invalidate(policiesPath);
+
+            log.info("[{}] Successfully granted access for role {} for sub = 
{}", namespace, subscriptionName, roles);
+            result.complete(null);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to set permissions for namespace {}: does 
not exist", subscriptionName, namespace);
+            result.completeExceptionally(new 
IllegalArgumentException("Namespace does not exist" + namespace));
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to set permissions for {} on namespace {}: 
concurrent modification", subscriptionName, roles, namespace);
+            result.completeExceptionally(new IllegalStateException(
+                    "Concurrent modification on zk path: " + policiesPath + ", 
" + e.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Failed to get permissions for role {} on namespace 
{}", subscriptionName, roles, namespace, e);
+            result.completeExceptionally(
+                    new IllegalStateException("Failed to get permissions for 
namespace " + namespace));
+        }
+
+        return result;
+    }
+
     private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, 
String role, AuthAction action) {
         if (isSuperUser(role)) {
             return CompletableFuture.completedFuture(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 105ca25..8a39f1a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -330,6 +330,38 @@ public abstract class NamespacesBase extends AdminResource 
{
         }
     }
 
+
+    protected void internalGrantPermissionOnSubscription(String subscription, 
Set<String> roles) {
+        /** controlled by system-admin(super-user) to prevent metadata 
footprint size */
+        validateSuperUserAccess();
+
+        try {
+            AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
+            if (null != authService) {
+                authService.grantSubscriptionPermissionAsync(namespaceName, 
subscription, roles,
+                        null/* additional auth-data json */).get();
+            } else {
+                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization 
is not enabled");
+            }
+        } catch (InterruptedException e) {
+            log.error("[{}] Failed to get permissions for namespace {}", 
clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalArgumentException) {
+                log.warn("[{}] Failed to set permissions for namespace {}: 
does not exist", clientAppId(),
+                        namespaceName);
+                throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+            } else if (e.getCause() instanceof IllegalStateException) {
+                log.warn("[{}] Failed to set permissions for namespace {}: 
concurrent modification", clientAppId(),
+                        namespaceName);
+                throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+            } else {
+                log.error("[{}] Failed to get permissions for namespace {}", 
clientAppId(), namespaceName, e);
+                throw new RestException(e);
+            }
+        }
+    }
+
     protected void internalRevokePermissionsOnNamespace(String role) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
@@ -360,6 +392,19 @@ public abstract class NamespacesBase extends AdminResource 
{
         }
     }
 
+    protected void internalRevokePermissionsOnSubscription(String 
subscriptionName, String role) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            authService.revokeSubscriptionPermissionAsync(namespaceName, 
subscriptionName, role,
+                    null/* additional auth-data json */);
+        } else {
+            throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is 
not enabled");
+        }
+    }
+    
     protected Set<String> internalGetNamespaceReplicationClusters() {
         if (!namespaceName.isGlobal()) {
             throw new RestException(Status.PRECONDITION_FAILED,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 34c324a..24c30a7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -235,6 +235,36 @@ public class PersistentTopicsBase extends AdminResource {
         validateTopicOwnership(topicName, authoritative);
     }
 
+    protected void validateAdminAccessForSubscriber(String subscriptionName, 
boolean authoritative) {
+        validateTopicOwnership(topicName, authoritative);
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] failed to validate admin access for {}", 
topicName, clientAppId());
+            }
+            validateAdminAccessForSubscriber(subscriptionName);
+        }
+    }
+
+    private void validateAdminAccessForSubscriber(String subscriptionName) {
+        try {
+            if 
(!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName, 
clientAppId(),
+                    clientAuthData(), subscriptionName)) {
+                log.warn("[{}} Subscriber {} is not authorized to access api", 
topicName, clientAppId());
+                throw new RestException(Status.UNAUTHORIZED,
+                        String.format("Subscriber %s is not authorized to 
access this operation", clientAppId()));
+            }
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            // unknown error marked as internal server error
+            log.warn("Unexpected error while authorizing request. topic={}, 
role={}. Error: {}", topicName,
+                    clientAppId(), e.getMessage(), e);
+            throw new RestException(e);
+        }
+    }
+
     protected void internalGrantPermissionsOnTopic(String role, 
Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -654,7 +684,7 @@ public class PersistentTopicsBase extends AdminResource {
                 }
             }
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             Topic topic = getTopicReference(topicName);
             try {
                 Subscription sub = topic.getSubscription(subName);
@@ -690,7 +720,7 @@ public class PersistentTopicsBase extends AdminResource {
                 throw new RestException(e);
             }
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             try {
                 if (subName.startsWith(topic.replicatorPrefix)) {
@@ -721,7 +751,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages 
on a partitioned topic is not allowed");
         }
-        validateAdminOperationOnTopic(authoritative);
+        validateAdminAccessForSubscriber(subName, authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
             if (subName.startsWith(topic.replicatorPrefix)) {
@@ -810,7 +840,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
 
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             log.info("[{}][{}] received reset cursor on subscription {} to 
time {}", clientAppId(), topicName,
                     subName, timestamp);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
@@ -883,7 +913,7 @@ public class PersistentTopicsBase extends AdminResource {
                     throw exception.get();
                 }
             } else {
-                validateAdminOperationOnTopic(authoritative);
+                validateAdminAccessForSubscriber(subscriptionName, 
authoritative);
 
                 PersistentTopic topic = (PersistentTopic) 
getOrCreateTopic(topicName);
 
@@ -925,7 +955,7 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.METHOD_NOT_ALLOWED,
                     "Reset-cursor at position is not allowed for 
partitioned-topic");
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             if (topic == null) {
                 throw new RestException(Status.NOT_FOUND, "Topic not found");
@@ -960,7 +990,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages 
on a partitioned topic is not allowed");
         }
-        validateAdminOperationOnTopic(authoritative);
+        validateAdminAccessForSubscriber(subName, authoritative);
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
                     subName);
@@ -1112,7 +1142,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
         } else {
             // validate ownership and redirect if current broker is not owner
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
                 log.error("[{}] Not supported operation of non-persistent 
topic {} {}", clientAppId(), topicName,
                         subName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 561351d..1f2c8c9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -226,6 +226,19 @@ public class Namespaces extends NamespacesBase {
         internalGrantPermissionOnNamespace(role, actions);
     }
 
+    @POST
+    
@Path("/{property}/{cluster}/{namespace}/permissions/subscription/{subscription}")
+    @ApiOperation(hidden = true, value = "Grant a new permission to roles for 
a subscription.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 501, message = "Authorization is not 
enabled")})
+    public void grantPermissionOnSubscription(@PathParam("property") String 
property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription, Set<String> roles) {
+        validateNamespaceName(property, cluster, namespace);
+        internalGrantPermissionOnSubscription(subscription, roles);
+    }
+
     @DELETE
     @Path("/{property}/{cluster}/{namespace}/permissions/{role}")
     @ApiOperation(hidden = true, value = "Revoke all permissions to a role on 
a namespace.")
@@ -238,6 +251,18 @@ public class Namespaces extends NamespacesBase {
         internalRevokePermissionsOnNamespace(role);
     }
 
+    @DELETE
+    
@Path("/{property}/{cluster}/{namespace}/permissions/{subscription}/{role}")
+    @ApiOperation(hidden = true, value = "Revoke subscription admin-api access 
permission for a role.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+    public void revokePermissionOnSubscription(@PathParam("property") String 
property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
+            @PathParam("subscription") String subscription, @PathParam("role") 
String role) {
+        validateNamespaceName(property, cluster, namespace);
+        internalRevokePermissionsOnSubscription(subscription, role);
+    }
+
     @GET
     @Path("/{property}/{cluster}/{namespace}/replication")
     @ApiOperation(hidden = true, value = "Get the replication clusters for a 
namespace.", response = String.class, responseContainer = "List")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 58686d8..b116003 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -172,6 +172,20 @@ public class Namespaces extends NamespacesBase {
         internalGrantPermissionOnNamespace(role, actions);
     }
 
+    @POST
+    @Path("/{property}/{namespace}/permissions/subscription/{subscription}")
+    @ApiOperation(hidden = true, value = "Grant a new permission to roles for 
a subscription.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 501, message = "Authorization is not enabled") 
})
+    public void grantPermissionOnSubscription(@PathParam("property") String 
property,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
+            Set<String> roles) {
+        validateNamespaceName(property, namespace);
+        internalGrantPermissionOnSubscription(subscription, roles);
+    }
+    
     @DELETE
     @Path("/{tenant}/{namespace}/permissions/{role}")
     @ApiOperation(value = "Revoke all permissions to a role on a namespace.")
@@ -183,6 +197,18 @@ public class Namespaces extends NamespacesBase {
         internalRevokePermissionsOnNamespace(role);
     }
 
+    @DELETE
+    @Path("/{property}/{namespace}/permissions/{subscription}/{role}")
+    @ApiOperation(hidden = true, value = "Revoke subscription admin-api access 
permission for a role.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+    public void revokePermissionOnSubscription(@PathParam("property") String 
property,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
+            @PathParam("role") String role) {
+        validateNamespaceName(property, namespace);
+        internalRevokePermissionsOnSubscription(subscription, role);
+    }
+    
     @GET
     @Path("/{tenant}/{namespace}/replication")
     @ApiOperation(value = "Get the replication clusters for a namespace.", 
response = String.class, responseContainer = "List")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 6b17a73..01160fe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -587,7 +587,7 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @Test(invocationCount = 1)
+    @Test
     public void namespaces() throws PulsarAdminException, 
PulsarServerException, Exception {
         admin.clusters().createCluster("usw", new ClusterData());
         TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e31e163..033c638 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,9 +20,11 @@ package org.apache.pulsar.client.api;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -37,10 +39,11 @@ import 
org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authorization.AuthorizationProvider;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.slf4j.Logger;
@@ -49,7 +52,6 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -57,6 +59,7 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
 
     private final static String clientRole = "plugbleRole";
+    private final static Set<String> clientAuthProviderSupportedRoles = 
Sets.newHashSet(clientRole);
 
     protected void setup() throws Exception {
 
@@ -146,6 +149,101 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test
+    public void testSubscriberPermission() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        setup();
+
+        final String tenantRole = "tenant-role";
+        final String subscriptionRole = "sub1-role";
+        final String subscriptionName = "sub1";
+        final String namespace = "my-property/my-ns-sub-auth";
+        final String topicName = "persistent://" + namespace + "/my-topic";
+        Authentication adminAuthentication = new 
ClientAuthentication("superUser");
+
+        clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+        PulsarAdmin superAdmin = spy(
+                
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
+
+        Authentication tenantAdminAuthentication = new 
ClientAuthentication(tenantRole);
+        PulsarAdmin tenantAdmin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(tenantAdminAuthentication).build());
+
+        Authentication subAdminAuthentication = new 
ClientAuthentication(subscriptionRole);
+        PulsarAdmin sub1Admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(subAdminAuthentication).build());
+
+        String lookupUrl;
+        lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+        Authentication authentication = new 
ClientAuthentication(subscriptionRole);
+
+        superAdmin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet(tenantRole), 
Sets.newHashSet("test")));
+        superAdmin.namespaces().createNamespace(namespace, 
Sets.newHashSet("test"));
+        tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
+                Collections.singleton(AuthAction.consume));
+
+        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();
+        // (1) Create subscription name
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        consumer.close();
+
+        // verify tenant is able to perform all subscription-admin api
+        tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
+        tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
+        tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+        tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
+        tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
+        tenantAdmin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
+
+        // grant namespace-level authorization to the subscriptionRole
+        tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
+                Collections.singleton(AuthAction.consume));
+
+        // subscriptionRole has namespace-level authorization
+        sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+        
+        // grant subscription access to specific different role and only that 
role can access the subscription
+        String otherPrincipal = "Principal-1-to-access-sub";
+        superAdmin.namespaces().grantPermissionOnSubscription(namespace, 
subscriptionName,
+                Collections.singleton(otherPrincipal));
+        
+        // now, subscriptionRole doesn't have subscription level access so, it 
will fail to access subscription
+        try {
+            sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+            fail("should have fail with authorization exception");
+        } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+            // Ok
+        }
+
+        // now, grant subscription-access to subscriptionRole as well
+        superAdmin.namespaces().grantPermissionOnSubscription(namespace, 
subscriptionName,
+                Sets.newHashSet(otherPrincipal, subscriptionRole));
+
+        sub1Admin.topics().skipAllMessages(topicName, subscriptionName);
+        sub1Admin.topics().skipMessages(topicName, subscriptionName, 1);
+        sub1Admin.topics().expireMessages(topicName, subscriptionName, 10);
+        sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);
+        sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+        sub1Admin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
+
+        superAdmin.namespaces().revokePermissionOnSubscription(namespace, 
subscriptionName, subscriptionRole);
+        
+        try {
+            sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+            fail("should have fail with authorization exception");
+        } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+            // Ok
+        }
+        
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
+    @Test
     public void testSubscriptionPrefixAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
 
@@ -317,19 +415,19 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         @Override
         public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, 
String role,
                 AuthenticationDataSource authenticationData) {
-            return CompletableFuture.completedFuture(clientRole.equals(role));
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
         public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, 
String role,
                 AuthenticationDataSource authenticationData, String 
subscription) {
-            return CompletableFuture.completedFuture(clientRole.equals(role));
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
         public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, 
String role,
                 AuthenticationDataSource authenticationData) {
-            return CompletableFuture.completedFuture(clientRole.equals(role));
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
@@ -343,6 +441,18 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
                 String authenticationData) {
             return CompletableFuture.completedFuture(null);
         }
+
+        @Override
+        public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace,
+                String subscriptionName, Set<String> roles, String 
authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace,
+                String subscriptionName, String role, String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
     }
 
     /**
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index cbd5506..db5a775 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -377,6 +377,24 @@ public interface Namespaces {
     void revokePermissionsOnNamespace(String namespace, String role) throws 
PulsarAdminException;
 
     /**
+     * Grant permission to role to access subscription's admin-api.
+     * @param namespace
+     * @param subscription
+     * @param roles
+     * @throws PulsarAdminException
+     */
+    void grantPermissionOnSubscription(String namespace, String subscription, 
Set<String> roles) throws PulsarAdminException;
+    
+    /**
+     * Revoke permissions on a subscription's admin-api access.
+     * @param namespace
+     * @param subscription
+     * @param role
+     * @throws PulsarAdminException
+     */
+    void revokePermissionOnSubscription(String namespace, String subscription, 
String role) throws PulsarAdminException;
+    
+    /**
      * Get the replication clusters for a namespace.
      * <p>
      * Response example:
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 833c2c3..6146e11 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -233,6 +233,30 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         }
     }
 
+    
+    @Override
+    public void grantPermissionOnSubscription(String namespace, String 
subscription, Set<String> roles)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "permissions", "subscription", 
subscription);
+            request(path).post(Entity.entity(roles, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void revokePermissionOnSubscription(String namespace, String 
subscription, String role) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "permissions", subscription, 
role);
+            request(path).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+    
     @Override
     public List<String> getNamespaceReplicationClusters(String namespace) 
throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6650618..faf04e5 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.admin.cli.utils.IOUtils;
@@ -198,6 +199,42 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Grant permissions to access subscription 
admin-api")
+    private class GrantSubscriptionPermissions extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--subscription", description = "Subscription name 
for which permission will be granted to roles", required = true)
+        private String subscription;
+
+        @Parameter(names = "--roles", description = "Client roles to which 
grant permissions (comma separated roles)", required = true, splitter = 
CommaParameterSplitter.class)
+        private List<String> roles;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().grantPermissionOnSubscription(namespace, 
subscription, Sets.newHashSet(roles));
+        }
+    }
+
+    @Parameters(commandDescription = "Revoke permissions on a namespace")
+    private class RevokeSubscriptionPermissions extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--subscription", description = "Subscription name 
for which permission will be granted to roles", required = true)
+        private String subscription;
+        
+        @Parameter(names = "--role", description = "Client role to which 
revoke permissions", required = true)
+        private String role;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().revokePermissionOnSubscription(namespace, 
subscription, role);
+        }
+    }
+    
     @Parameters(commandDescription = "Get the permissions on a namespace")
     private class Permissions extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -987,7 +1024,10 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("permissions", new Permissions());
         jcommander.addCommand("grant-permission", new GrantPermissions());
         jcommander.addCommand("revoke-permission", new RevokePermissions());
-
+        
+        jcommander.addCommand("grant-subscription-permission", new 
GrantSubscriptionPermissions());
+        jcommander.addCommand("revoke-subscription-permission", new 
RevokeSubscriptionPermissions());
+        
         jcommander.addCommand("set-clusters", new SetReplicationClusters());
         jcommander.addCommand("get-clusters", new GetReplicationClusters());
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
index 553b576..bd0c61b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
@@ -27,10 +27,12 @@ import com.google.common.collect.Maps;
 public class AuthPolicies {
     public final Map<String, Set<AuthAction>> namespace_auth;
     public final Map<String, Map<String, Set<AuthAction>>> destination_auth;
+    public final Map<String, Set<String>> subscription_auth_roles;
 
     public AuthPolicies() {
         namespace_auth = Maps.newTreeMap();
         destination_auth = Maps.newTreeMap();
+        subscription_auth_roles = Maps.newTreeMap();
     }
 
     @Override
@@ -38,7 +40,8 @@ public class AuthPolicies {
         if (obj instanceof AuthPolicies) {
             AuthPolicies other = (AuthPolicies) obj;
             return Objects.equals(namespace_auth, other.namespace_auth)
-                    && Objects.equals(destination_auth, 
other.destination_auth);
+                    && Objects.equals(destination_auth, other.destination_auth)
+                    && Objects.equals(subscription_auth_roles, 
other.subscription_auth_roles);
         }
 
         return false;

Reply via email to