merlimat closed pull request #2981: Allow subscribers to access subscription 
admin-api
URL: https://github.com/apache/pulsar/pull/2981
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index cae2415dd1..5d8b930e85 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -101,6 +101,29 @@
     CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, 
Set<AuthAction> actions, String role,
             String authDataJson);
 
+    /**
+     * Grant permission to roles that can access subscription-admin api
+     * 
+     * @param namespace
+     * @param subscriptionName
+     * @param roles
+     * @param authDataJson
+     *            additional authdata in json format
+     * @return
+     */
+    CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName 
namespace, String subscriptionName, Set<String> roles,
+            String authDataJson);
+    
+    /**
+     * Revoke subscription admin-api access for a role
+     * @param namespace
+     * @param subscriptionName
+     * @param role
+     * @return
+     */
+    CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName 
namespace, String subscriptionName,
+            String role, String authDataJson);
+    
     /**
      * Grant authorization-action permission on a topic to the given client
      *
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 76e98ccfd9..95d012f502 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -94,6 +94,41 @@ public AuthorizationService(ServiceConfiguration conf, 
ConfigurationCacheService
         return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
     }
 
+    /**
+     * Grant permission to roles that can access subscription-admin api
+     * 
+     * @param namespace
+     * @param subscriptionName
+     * @param roles
+     * @param authDataJson
+     *            additional authdata in json for targeted authorization 
provider
+     * @return
+     */
+    public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            Set<String> roles, String authDataJson) {
+
+        if (provider != null) {
+            return provider.grantSubscriptionPermissionAsync(namespace, 
subscriptionName, roles, authDataJson);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
+    }
+
+    /**
+     * Revoke subscription admin-api access for a role
+     * 
+     * @param namespace
+     * @param subscriptionName
+     * @param role
+     * @return
+     */
+    public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            String role, String authDataJson) {
+        if (provider != null) {
+            return provider.revokeSubscriptionPermissionAsync(namespace, 
subscriptionName, role, authDataJson);
+        }
+        return FutureUtil.failedFuture(new IllegalStateException("No 
authorization provider configured"));
+    }
+    
     /**
      * Grant authorization-action permission on a topic to the given client
      *
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index fcd89b49b9..0f2df0be54 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -23,6 +23,7 @@
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -44,6 +45,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 /**
  * Default authorization provider that stores authorization policies under 
local-zookeeper.
  *
@@ -110,6 +113,19 @@ public void initialize(ServiceConfiguration conf, 
ConfigurationCacheService conf
                     }
                 } else {
                     if (isNotBlank(subscription) && !isSuperUser(role)) {
+                        // validate if role is authorize to access 
subscription. (skip validatation if authorization
+                        // list is empty)
+                        Set<String> roles = 
policies.get().auth_policies.subscription_auth_roles.get(subscription);
+                        if (roles != null && !roles.isEmpty() && 
!roles.contains(role)) {
+                            log.warn("[{}] is not authorized to subscribe on 
{}-{}", role, topicName, subscription);
+                            PulsarServerException ex = new 
PulsarServerException(
+                                    String.format("%s is not authorized to 
access subscription %s on topic %s", role,
+                                            subscription, topicName));
+                            permissionFuture.complete(false);
+                            return;
+                        }
+
+                        // validate if subscription-auth mode is configured
                         switch (policies.get().subscription_auth_mode) {
                         case Prefix:
                             if (!subscription.startsWith(role)) {
@@ -125,6 +141,7 @@ public void initialize(ServiceConfiguration conf, 
ConfigurationCacheService conf
                         }
                     }
                 }
+                // check namespace and topic level consume-permissions
                 checkAuthorization(topicName, role, 
AuthAction.consume).thenAccept(isAuthorized -> {
                     permissionFuture.complete(isAuthorized);
                 });
@@ -241,6 +258,70 @@ public void initialize(ServiceConfiguration conf, 
ConfigurationCacheService conf
         return result;
     }
 
+    @Override
+    public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            Set<String> roles, String authDataJson) {
+        return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
roles, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+            String role, String authDataJson) {
+        return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
Collections.singleton(role), true);
+    }
+    
+    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName, Set<String> roles,
+            boolean remove) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        try {
+            validatePoliciesReadOnlyAccess();
+        } catch (Exception e) {
+            result.completeExceptionally(e);
+        }
+
+        ZooKeeper globalZk = configCache.getZooKeeper();
+        final String policiesPath = String.format("/%s/%s/%s", "admin", 
POLICIES, namespace.toString());
+
+        try {
+            Stat nodeStat = new Stat();
+            byte[] content = globalZk.getData(policiesPath, null, nodeStat);
+            Policies policies = getThreadLocal().readValue(content, 
Policies.class);
+            if (remove) {
+                if 
(policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
+                    
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
+                }else {
+                    log.info("[{}] Couldn't find role {} while revoking for 
sub = {}", namespace, subscriptionName, roles);
+                    result.completeExceptionally(new 
IllegalArgumentException("couldn't find subscription"));
+                    return result;
+                }
+            } else {
+                
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
+            }
+
+            // Write back the new policies into zookeeper
+            globalZk.setData(policiesPath, 
getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());
+
+            configCache.policiesCache().invalidate(policiesPath);
+
+            log.info("[{}] Successfully granted access for role {} for sub = 
{}", namespace, subscriptionName, roles);
+            result.complete(null);
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to set permissions for namespace {}: does 
not exist", subscriptionName, namespace);
+            result.completeExceptionally(new 
IllegalArgumentException("Namespace does not exist" + namespace));
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to set permissions for {} on namespace {}: 
concurrent modification", subscriptionName, roles, namespace);
+            result.completeExceptionally(new IllegalStateException(
+                    "Concurrent modification on zk path: " + policiesPath + ", 
" + e.getMessage()));
+        } catch (Exception e) {
+            log.error("[{}] Failed to get permissions for role {} on namespace 
{}", subscriptionName, roles, namespace, e);
+            result.completeExceptionally(
+                    new IllegalStateException("Failed to get permissions for 
namespace " + namespace));
+        }
+
+        return result;
+    }
+
     private CompletableFuture<Boolean> checkAuthorization(TopicName topicName, 
String role, AuthAction action) {
         if (isSuperUser(role)) {
             return CompletableFuture.completedFuture(true);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a00535430a..113a910fd6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -329,6 +329,38 @@ protected void internalGrantPermissionOnNamespace(String 
role, Set<AuthAction> a
         }
     }
 
+
+    protected void internalGrantPermissionOnSubscription(String subscription, 
Set<String> roles) {
+        /** controlled by system-admin(super-user) to prevent metadata 
footprint size */
+        validateSuperUserAccess();
+
+        try {
+            AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
+            if (null != authService) {
+                authService.grantSubscriptionPermissionAsync(namespaceName, 
subscription, roles,
+                        null/* additional auth-data json */).get();
+            } else {
+                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization 
is not enabled");
+            }
+        } catch (InterruptedException e) {
+            log.error("[{}] Failed to get permissions for namespace {}", 
clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IllegalArgumentException) {
+                log.warn("[{}] Failed to set permissions for namespace {}: 
does not exist", clientAppId(),
+                        namespaceName);
+                throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+            } else if (e.getCause() instanceof IllegalStateException) {
+                log.warn("[{}] Failed to set permissions for namespace {}: 
concurrent modification", clientAppId(),
+                        namespaceName);
+                throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+            } else {
+                log.error("[{}] Failed to get permissions for namespace {}", 
clientAppId(), namespaceName, e);
+                throw new RestException(e);
+            }
+        }
+    }
+
     protected void internalRevokePermissionsOnNamespace(String role) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
@@ -359,6 +391,19 @@ protected void internalRevokePermissionsOnNamespace(String 
role) {
         }
     }
 
+    protected void internalRevokePermissionsOnSubscription(String 
subscriptionName, String role) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            authService.revokeSubscriptionPermissionAsync(namespaceName, 
subscriptionName, role,
+                    null/* additional auth-data json */);
+        } else {
+            throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is 
not enabled");
+        }
+    }
+    
     protected Set<String> internalGetNamespaceReplicationClusters() {
         if (!namespaceName.isGlobal()) {
             throw new RestException(Status.PRECONDITION_FAILED,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 34c324a522..24c30a7be7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -235,6 +235,36 @@ public void validateAdminOperationOnTopic(boolean 
authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
+    protected void validateAdminAccessForSubscriber(String subscriptionName, 
boolean authoritative) {
+        validateTopicOwnership(topicName, authoritative);
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] failed to validate admin access for {}", 
topicName, clientAppId());
+            }
+            validateAdminAccessForSubscriber(subscriptionName);
+        }
+    }
+
+    private void validateAdminAccessForSubscriber(String subscriptionName) {
+        try {
+            if 
(!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName, 
clientAppId(),
+                    clientAuthData(), subscriptionName)) {
+                log.warn("[{}} Subscriber {} is not authorized to access api", 
topicName, clientAppId());
+                throw new RestException(Status.UNAUTHORIZED,
+                        String.format("Subscriber %s is not authorized to 
access this operation", clientAppId()));
+            }
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            // unknown error marked as internal server error
+            log.warn("Unexpected error while authorizing request. topic={}, 
role={}. Error: {}", topicName,
+                    clientAppId(), e.getMessage(), e);
+            throw new RestException(e);
+        }
+    }
+
     protected void internalGrantPermissionsOnTopic(String role, 
Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -654,7 +684,7 @@ protected void internalDeleteSubscription(String subName, 
boolean authoritative)
                 }
             }
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             Topic topic = getTopicReference(topicName);
             try {
                 Subscription sub = topic.getSubscription(subName);
@@ -690,7 +720,7 @@ protected void internalSkipAllMessages(String subName, 
boolean authoritative) {
                 throw new RestException(e);
             }
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             try {
                 if (subName.startsWith(topic.replicatorPrefix)) {
@@ -721,7 +751,7 @@ protected void internalSkipMessages(String subName, int 
numMessages, boolean aut
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages 
on a partitioned topic is not allowed");
         }
-        validateAdminOperationOnTopic(authoritative);
+        validateAdminAccessForSubscriber(subName, authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         try {
             if (subName.startsWith(topic.replicatorPrefix)) {
@@ -810,7 +840,7 @@ protected void internalResetCursor(String subName, long 
timestamp, boolean autho
             }
 
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             log.info("[{}][{}] received reset cursor on subscription {} to 
time {}", clientAppId(), topicName,
                     subName, timestamp);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
@@ -883,7 +913,7 @@ protected void internalCreateSubscription(String 
subscriptionName, MessageIdImpl
                     throw exception.get();
                 }
             } else {
-                validateAdminOperationOnTopic(authoritative);
+                validateAdminAccessForSubscriber(subscriptionName, 
authoritative);
 
                 PersistentTopic topic = (PersistentTopic) 
getOrCreateTopic(topicName);
 
@@ -925,7 +955,7 @@ protected void internalResetCursorOnPosition(String 
subName, boolean authoritati
             throw new RestException(Status.METHOD_NOT_ALLOWED,
                     "Reset-cursor at position is not allowed for 
partitioned-topic");
         } else {
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             PersistentTopic topic = (PersistentTopic) 
getTopicReference(topicName);
             if (topic == null) {
                 throw new RestException(Status.NOT_FOUND, "Topic not found");
@@ -960,7 +990,7 @@ protected Response internalPeekNthMessage(String subName, 
int messagePosition, b
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages 
on a partitioned topic is not allowed");
         }
-        validateAdminOperationOnTopic(authoritative);
+        validateAdminAccessForSubscriber(subName, authoritative);
         if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
             log.error("[{}] Not supported operation of non-persistent topic {} 
{}", clientAppId(), topicName,
                     subName);
@@ -1112,7 +1142,7 @@ protected void internalExpireMessages(String subName, int 
expireTimeInSeconds, b
             }
         } else {
             // validate ownership and redirect if current broker is not owner
-            validateAdminOperationOnTopic(authoritative);
+            validateAdminAccessForSubscriber(subName, authoritative);
             if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
                 log.error("[{}] Not supported operation of non-persistent 
topic {} {}", clientAppId(), topicName,
                         subName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 561351da91..1f2c8c93c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -226,6 +226,19 @@ public void 
grantPermissionOnNamespace(@PathParam("property") String property, @
         internalGrantPermissionOnNamespace(role, actions);
     }
 
+    @POST
+    
@Path("/{property}/{cluster}/{namespace}/permissions/subscription/{subscription}")
+    @ApiOperation(hidden = true, value = "Grant a new permission to roles for 
a subscription.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 501, message = "Authorization is not 
enabled")})
+    public void grantPermissionOnSubscription(@PathParam("property") String 
property, @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription, Set<String> roles) {
+        validateNamespaceName(property, cluster, namespace);
+        internalGrantPermissionOnSubscription(subscription, roles);
+    }
+
     @DELETE
     @Path("/{property}/{cluster}/{namespace}/permissions/{role}")
     @ApiOperation(hidden = true, value = "Revoke all permissions to a role on 
a namespace.")
@@ -238,6 +251,18 @@ public void 
revokePermissionsOnNamespace(@PathParam("property") String property,
         internalRevokePermissionsOnNamespace(role);
     }
 
+    @DELETE
+    
@Path("/{property}/{cluster}/{namespace}/permissions/{subscription}/{role}")
+    @ApiOperation(hidden = true, value = "Revoke subscription admin-api access 
permission for a role.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+    public void revokePermissionOnSubscription(@PathParam("property") String 
property,
+            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
+            @PathParam("subscription") String subscription, @PathParam("role") 
String role) {
+        validateNamespaceName(property, cluster, namespace);
+        internalRevokePermissionsOnSubscription(subscription, role);
+    }
+
     @GET
     @Path("/{property}/{cluster}/{namespace}/replication")
     @ApiOperation(hidden = true, value = "Get the replication clusters for a 
namespace.", response = String.class, responseContainer = "List")
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e335521053..0e7953146d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -171,6 +171,20 @@ public void 
grantPermissionOnNamespace(@PathParam("tenant") String tenant,
         internalGrantPermissionOnNamespace(role, actions);
     }
 
+    @POST
+    @Path("/{property}/{namespace}/permissions/subscription/{subscription}")
+    @ApiOperation(hidden = true, value = "Grant a new permission to roles for 
a subscription.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 501, message = "Authorization is not enabled") 
})
+    public void grantPermissionOnSubscription(@PathParam("property") String 
property,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
+            Set<String> roles) {
+        validateNamespaceName(property, namespace);
+        internalGrantPermissionOnSubscription(subscription, roles);
+    }
+    
     @DELETE
     @Path("/{tenant}/{namespace}/permissions/{role}")
     @ApiOperation(value = "Revoke all permissions to a role on a namespace.")
@@ -182,6 +196,18 @@ public void 
revokePermissionsOnNamespace(@PathParam("tenant") String tenant,
         internalRevokePermissionsOnNamespace(role);
     }
 
+    @DELETE
+    @Path("/{property}/{namespace}/permissions/{subscription}/{role}")
+    @ApiOperation(hidden = true, value = "Revoke subscription admin-api access 
permission for a role.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+    public void revokePermissionOnSubscription(@PathParam("property") String 
property,
+            @PathParam("namespace") String namespace, 
@PathParam("subscription") String subscription,
+            @PathParam("role") String role) {
+        validateNamespaceName(property, namespace);
+        internalRevokePermissionsOnSubscription(subscription, role);
+    }
+    
     @GET
     @Path("/{tenant}/{namespace}/replication")
     @ApiOperation(value = "Get the replication clusters for a namespace.", 
response = String.class, responseContainer = "List")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 6b17a7320f..01160fec8d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -587,7 +587,7 @@ public void properties() throws PulsarAdminException {
         }
     }
 
-    @Test(invocationCount = 1)
+    @Test
     public void namespaces() throws PulsarAdminException, 
PulsarServerException, Exception {
         admin.clusters().createCluster("usw", new ClusterData());
         TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e31e1630d7..033c638fdd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,9 +20,11 @@
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.fail;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -37,10 +39,11 @@
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authorization.AuthorizationProvider;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.slf4j.Logger;
@@ -49,7 +52,6 @@
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -57,6 +59,7 @@
     private static final Logger log = 
LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
 
     private final static String clientRole = "plugbleRole";
+    private final static Set<String> clientAuthProviderSupportedRoles = 
Sets.newHashSet(clientRole);
 
     protected void setup() throws Exception {
 
@@ -145,6 +148,101 @@ public void testProducerAndConsumerAuthorization() throws 
Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testSubscriberPermission() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        setup();
+
+        final String tenantRole = "tenant-role";
+        final String subscriptionRole = "sub1-role";
+        final String subscriptionName = "sub1";
+        final String namespace = "my-property/my-ns-sub-auth";
+        final String topicName = "persistent://" + namespace + "/my-topic";
+        Authentication adminAuthentication = new 
ClientAuthentication("superUser");
+
+        clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+        PulsarAdmin superAdmin = spy(
+                
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
+
+        Authentication tenantAdminAuthentication = new 
ClientAuthentication(tenantRole);
+        PulsarAdmin tenantAdmin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(tenantAdminAuthentication).build());
+
+        Authentication subAdminAuthentication = new 
ClientAuthentication(subscriptionRole);
+        PulsarAdmin sub1Admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(subAdminAuthentication).build());
+
+        String lookupUrl;
+        lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+        Authentication authentication = new 
ClientAuthentication(subscriptionRole);
+
+        superAdmin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet(tenantRole), 
Sets.newHashSet("test")));
+        superAdmin.namespaces().createNamespace(namespace, 
Sets.newHashSet("test"));
+        tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
+                Collections.singleton(AuthAction.consume));
+
+        pulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();
+        // (1) Create subscription name
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+                .subscribe();
+        consumer.close();
+
+        // verify tenant is able to perform all subscription-admin api
+        tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
+        tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
+        tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+        tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
+        tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
+        tenantAdmin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
+
+        // grant namespace-level authorization to the subscriptionRole
+        tenantAdmin.namespaces().grantPermissionOnNamespace(namespace, 
subscriptionRole,
+                Collections.singleton(AuthAction.consume));
+
+        // subscriptionRole has namespace-level authorization
+        sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+        
+        // grant subscription access to specific different role and only that 
role can access the subscription
+        String otherPrincipal = "Principal-1-to-access-sub";
+        superAdmin.namespaces().grantPermissionOnSubscription(namespace, 
subscriptionName,
+                Collections.singleton(otherPrincipal));
+        
+        // now, subscriptionRole doesn't have subscription level access so, it 
will fail to access subscription
+        try {
+            sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+            fail("should have fail with authorization exception");
+        } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+            // Ok
+        }
+
+        // now, grant subscription-access to subscriptionRole as well
+        superAdmin.namespaces().grantPermissionOnSubscription(namespace, 
subscriptionName,
+                Sets.newHashSet(otherPrincipal, subscriptionRole));
+
+        sub1Admin.topics().skipAllMessages(topicName, subscriptionName);
+        sub1Admin.topics().skipMessages(topicName, subscriptionName, 1);
+        sub1Admin.topics().expireMessages(topicName, subscriptionName, 10);
+        sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);
+        sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+        sub1Admin.topics().resetCursor(topicName, subscriptionName, 
MessageId.earliest);
+
+        superAdmin.namespaces().revokePermissionOnSubscription(namespace, 
subscriptionName, subscriptionRole);
+        
+        try {
+            sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+            fail("should have fail with authorization exception");
+        } catch 
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+            // Ok
+        }
+        
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
     @Test
     public void testSubscriptionPrefixAuthorization() throws Exception {
         log.info("-- Starting {} test --", methodName);
@@ -317,19 +415,19 @@ public void initialize(ServiceConfiguration conf, 
ConfigurationCacheService conf
         @Override
         public CompletableFuture<Boolean> canProduceAsync(TopicName topicName, 
String role,
                 AuthenticationDataSource authenticationData) {
-            return CompletableFuture.completedFuture(clientRole.equals(role));
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
         public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, 
String role,
                 AuthenticationDataSource authenticationData, String 
subscription) {
-            return CompletableFuture.completedFuture(clientRole.equals(role));
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
         public CompletableFuture<Boolean> canLookupAsync(TopicName topicName, 
String role,
                 AuthenticationDataSource authenticationData) {
-            return CompletableFuture.completedFuture(clientRole.equals(role));
+            return 
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
         }
 
         @Override
@@ -343,6 +441,18 @@ public void initialize(ServiceConfiguration conf, 
ConfigurationCacheService conf
                 String authenticationData) {
             return CompletableFuture.completedFuture(null);
         }
+
+        @Override
+        public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace,
+                String subscriptionName, Set<String> roles, String 
authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<Void> 
revokeSubscriptionPermissionAsync(NamespaceName namespace,
+                String subscriptionName, String role, String authDataJson) {
+            return CompletableFuture.completedFuture(null);
+        }
     }
 
     /**
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 6460c1ff5c..f9b57e3710 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -375,6 +375,24 @@
      */
     void revokePermissionsOnNamespace(String namespace, String role) throws 
PulsarAdminException;
 
+    /**
+     * Grant permission to role to access subscription's admin-api.
+     * @param namespace
+     * @param subscription
+     * @param roles
+     * @throws PulsarAdminException
+     */
+    void grantPermissionOnSubscription(String namespace, String subscription, 
Set<String> roles) throws PulsarAdminException;
+    
+    /**
+     * Revoke permissions on a subscription's admin-api access.
+     * @param namespace
+     * @param subscription
+     * @param role
+     * @throws PulsarAdminException
+     */
+    void revokePermissionOnSubscription(String namespace, String subscription, 
String role) throws PulsarAdminException;
+    
     /**
      * Get the replication clusters for a namespace.
      * <p>
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c9845a918c..be3f548522 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -232,6 +232,30 @@ public void revokePermissionsOnNamespace(String namespace, 
String role) throws P
         }
     }
 
+    
+    @Override
+    public void grantPermissionOnSubscription(String namespace, String 
subscription, Set<String> roles)
+            throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "permissions", "subscription", 
subscription);
+            request(path).post(Entity.entity(roles, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void revokePermissionOnSubscription(String namespace, String 
subscription, String role) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "permissions", subscription, 
role);
+            request(path).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+    
     @Override
     public List<String> getNamespaceReplicationClusters(String namespace) 
throws PulsarAdminException {
         try {
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index cc1654bc8c..a29908aaa4 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.admin.cli.utils.IOUtils;
@@ -196,6 +197,42 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Grant permissions to access subscription 
admin-api")
+    private class GrantSubscriptionPermissions extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--subscription", description = "Subscription name 
for which permission will be granted to roles", required = true)
+        private String subscription;
+
+        @Parameter(names = "--roles", description = "Client roles to which 
grant permissions (comma separated roles)", required = true, splitter = 
CommaParameterSplitter.class)
+        private List<String> roles;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().grantPermissionOnSubscription(namespace, 
subscription, Sets.newHashSet(roles));
+        }
+    }
+
+    @Parameters(commandDescription = "Revoke permissions on a namespace")
+    private class RevokeSubscriptionPermissions extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = "--subscription", description = "Subscription name 
for which permission will be granted to roles", required = true)
+        private String subscription;
+        
+        @Parameter(names = "--role", description = "Client role to which 
revoke permissions", required = true)
+        private String role;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().revokePermissionOnSubscription(namespace, 
subscription, role);
+        }
+    }
+    
     @Parameters(commandDescription = "Get the permissions on a namespace")
     private class Permissions extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
@@ -942,7 +979,10 @@ public CmdNamespaces(PulsarAdmin admin) {
         jcommander.addCommand("permissions", new Permissions());
         jcommander.addCommand("grant-permission", new GrantPermissions());
         jcommander.addCommand("revoke-permission", new RevokePermissions());
-
+        
+        jcommander.addCommand("grant-subscription-permission", new 
GrantSubscriptionPermissions());
+        jcommander.addCommand("revoke-subscription-permission", new 
RevokeSubscriptionPermissions());
+        
         jcommander.addCommand("set-clusters", new SetReplicationClusters());
         jcommander.addCommand("get-clusters", new GetReplicationClusters());
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
index 553b576543..bd0c61b008 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
@@ -27,10 +27,12 @@
 public class AuthPolicies {
     public final Map<String, Set<AuthAction>> namespace_auth;
     public final Map<String, Map<String, Set<AuthAction>>> destination_auth;
+    public final Map<String, Set<String>> subscription_auth_roles;
 
     public AuthPolicies() {
         namespace_auth = Maps.newTreeMap();
         destination_auth = Maps.newTreeMap();
+        subscription_auth_roles = Maps.newTreeMap();
     }
 
     @Override
@@ -38,7 +40,8 @@ public boolean equals(Object obj) {
         if (obj instanceof AuthPolicies) {
             AuthPolicies other = (AuthPolicies) obj;
             return Objects.equals(namespace_auth, other.namespace_auth)
-                    && Objects.equals(destination_auth, 
other.destination_auth);
+                    && Objects.equals(destination_auth, other.destination_auth)
+                    && Objects.equals(subscription_auth_roles, 
other.subscription_auth_roles);
         }
 
         return false;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to