This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02df3d2 Allow subscribers to access subscription admin-api (#2981)
02df3d2 is described below
commit 02df3d2bbf6c4dc893aa1ea0f04221e8d50c72a4
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Dec 3 14:17:50 2018 -0800
Allow subscribers to access subscription admin-api (#2981)
* Allow subscribers to access subscription admin-api
* add sub-authorization check on canConsume
* make per-sub optional
* fix: allowe super-admin to consume
---
.../authorization/AuthorizationProvider.java | 23 ++++
.../broker/authorization/AuthorizationService.java | 35 ++++++
.../authorization/PulsarAuthorizationProvider.java | 81 ++++++++++++++
.../pulsar/broker/admin/impl/NamespacesBase.java | 45 ++++++++
.../broker/admin/impl/PersistentTopicsBase.java | 46 ++++++--
.../apache/pulsar/broker/admin/v1/Namespaces.java | 25 +++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 26 +++++
.../apache/pulsar/broker/admin/AdminApiTest.java | 2 +-
.../api/AuthorizationProducerConsumerTest.java | 120 ++++++++++++++++++++-
.../org/apache/pulsar/client/admin/Namespaces.java | 18 ++++
.../client/admin/internal/NamespacesImpl.java | 24 +++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 42 +++++++-
.../pulsar/common/policies/data/AuthPolicies.java | 5 +-
13 files changed, 476 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index cae2415..5d8b930 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -102,6 +102,29 @@ public interface AuthorizationProvider extends Closeable {
String authDataJson);
/**
+ * Grant permission to roles that can access subscription-admin api
+ *
+ * @param namespace
+ * @param subscriptionName
+ * @param roles
+ * @param authDataJson
+ * additional authdata in json format
+ * @return
+ */
+ CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName
namespace, String subscriptionName, Set<String> roles,
+ String authDataJson);
+
+ /**
+ * Revoke subscription admin-api access for a role
+ * @param namespace
+ * @param subscriptionName
+ * @param role
+ * @return
+ */
+ CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName
namespace, String subscriptionName,
+ String role, String authDataJson);
+
+ /**
* Grant authorization-action permission on a topic to the given client
*
* @param topicName
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 76e98cc..95d012f 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -95,6 +95,41 @@ public class AuthorizationService {
}
/**
+ * Grant permission to roles that can access subscription-admin api
+ *
+ * @param namespace
+ * @param subscriptionName
+ * @param roles
+ * @param authDataJson
+ * additional authdata in json for targeted authorization
provider
+ * @return
+ */
+ public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ Set<String> roles, String authDataJson) {
+
+ if (provider != null) {
+ return provider.grantSubscriptionPermissionAsync(namespace,
subscriptionName, roles, authDataJson);
+ }
+ return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured"));
+ }
+
+ /**
+ * Revoke subscription admin-api access for a role
+ *
+ * @param namespace
+ * @param subscriptionName
+ * @param role
+ * @return
+ */
+ public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ String role, String authDataJson) {
+ if (provider != null) {
+ return provider.revokeSubscriptionPermissionAsync(namespace,
subscriptionName, role, authDataJson);
+ }
+ return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured"));
+ }
+
+ /**
* Grant authorization-action permission on a topic to the given client
*
* @param topicname
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 4af6a0a..229efec 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -23,6 +23,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -44,6 +45,8 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
/**
* Default authorization provider that stores authorization policies under
local-zookeeper.
*
@@ -110,6 +113,19 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
}
} else {
if (isNotBlank(subscription) && !isSuperUser(role)) {
+ // validate if role is authorize to access
subscription. (skip validatation if authorization
+ // list is empty)
+ Set<String> roles =
policies.get().auth_policies.subscription_auth_roles.get(subscription);
+ if (roles != null && !roles.isEmpty() &&
!roles.contains(role)) {
+ log.warn("[{}] is not authorized to subscribe on
{}-{}", role, topicName, subscription);
+ PulsarServerException ex = new
PulsarServerException(
+ String.format("%s is not authorized to
access subscription %s on topic %s", role,
+ subscription, topicName));
+ permissionFuture.complete(false);
+ return;
+ }
+
+ // validate if subscription-auth mode is configured
switch (policies.get().subscription_auth_mode) {
case Prefix:
if (!subscription.startsWith(role)) {
@@ -125,6 +141,7 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
}
}
}
+ // check namespace and topic level consume-permissions
checkAuthorization(topicName, role,
AuthAction.consume).thenAccept(isAuthorized -> {
permissionFuture.complete(isAuthorized);
});
@@ -241,6 +258,70 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
return result;
}
+ @Override
+ public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ Set<String> roles, String authDataJson) {
+ return updateSubscriptionPermissionAsync(namespace, subscriptionName,
roles, false);
+ }
+
+ @Override
+ public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ String role, String authDataJson) {
+ return updateSubscriptionPermissionAsync(namespace, subscriptionName,
Collections.singleton(role), true);
+ }
+
+ private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName, Set<String> roles,
+ boolean remove) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+
+ try {
+ validatePoliciesReadOnlyAccess();
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+
+ ZooKeeper globalZk = configCache.getZooKeeper();
+ final String policiesPath = String.format("/%s/%s/%s", "admin",
POLICIES, namespace.toString());
+
+ try {
+ Stat nodeStat = new Stat();
+ byte[] content = globalZk.getData(policiesPath, null, nodeStat);
+ Policies policies = getThreadLocal().readValue(content,
Policies.class);
+ if (remove) {
+ if
(policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
+
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
+ }else {
+ log.info("[{}] Couldn't find role {} while revoking for
sub = {}", namespace, subscriptionName, roles);
+ result.completeExceptionally(new
IllegalArgumentException("couldn't find subscription"));
+ return result;
+ }
+ } else {
+
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
+ }
+
+ // Write back the new policies into zookeeper
+ globalZk.setData(policiesPath,
getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());
+
+ configCache.policiesCache().invalidate(policiesPath);
+
+ log.info("[{}] Successfully granted access for role {} for sub =
{}", namespace, subscriptionName, roles);
+ result.complete(null);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to set permissions for namespace {}: does
not exist", subscriptionName, namespace);
+ result.completeExceptionally(new
IllegalArgumentException("Namespace does not exist" + namespace));
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to set permissions for {} on namespace {}:
concurrent modification", subscriptionName, roles, namespace);
+ result.completeExceptionally(new IllegalStateException(
+ "Concurrent modification on zk path: " + policiesPath + ",
" + e.getMessage()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to get permissions for role {} on namespace
{}", subscriptionName, roles, namespace, e);
+ result.completeExceptionally(
+ new IllegalStateException("Failed to get permissions for
namespace " + namespace));
+ }
+
+ return result;
+ }
+
private CompletableFuture<Boolean> checkAuthorization(TopicName topicName,
String role, AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 105ca25..8a39f1a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -330,6 +330,38 @@ public abstract class NamespacesBase extends AdminResource
{
}
}
+
+ protected void internalGrantPermissionOnSubscription(String subscription,
Set<String> roles) {
+ /** controlled by system-admin(super-user) to prevent metadata
footprint size */
+ validateSuperUserAccess();
+
+ try {
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ authService.grantSubscriptionPermissionAsync(namespaceName,
subscription, roles,
+ null/* additional auth-data json */).get();
+ } else {
+ throw new RestException(Status.NOT_IMPLEMENTED, "Authorization
is not enabled");
+ }
+ } catch (InterruptedException e) {
+ log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for namespace {}:
does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ } else if (e.getCause() instanceof IllegalStateException) {
+ log.warn("[{}] Failed to set permissions for namespace {}:
concurrent modification", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } else {
+ log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+ }
+
protected void internalRevokePermissionsOnNamespace(String role) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
@@ -360,6 +392,19 @@ public abstract class NamespacesBase extends AdminResource
{
}
}
+ protected void internalRevokePermissionsOnSubscription(String
subscriptionName, String role) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ authService.revokeSubscriptionPermissionAsync(namespaceName,
subscriptionName, role,
+ null/* additional auth-data json */);
+ } else {
+ throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is
not enabled");
+ }
+ }
+
protected Set<String> internalGetNamespaceReplicationClusters() {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 34c324a..24c30a7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -235,6 +235,36 @@ public class PersistentTopicsBase extends AdminResource {
validateTopicOwnership(topicName, authoritative);
}
+ protected void validateAdminAccessForSubscriber(String subscriptionName,
boolean authoritative) {
+ validateTopicOwnership(topicName, authoritative);
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] failed to validate admin access for {}",
topicName, clientAppId());
+ }
+ validateAdminAccessForSubscriber(subscriptionName);
+ }
+ }
+
+ private void validateAdminAccessForSubscriber(String subscriptionName) {
+ try {
+ if
(!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName,
clientAppId(),
+ clientAuthData(), subscriptionName)) {
+ log.warn("[{}} Subscriber {} is not authorized to access api",
topicName, clientAppId());
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Subscriber %s is not authorized to
access this operation", clientAppId()));
+ }
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ // unknown error marked as internal server error
+ log.warn("Unexpected error while authorizing request. topic={},
role={}. Error: {}", topicName,
+ clientAppId(), e.getMessage(), e);
+ throw new RestException(e);
+ }
+ }
+
protected void internalGrantPermissionsOnTopic(String role,
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
@@ -654,7 +684,7 @@ public class PersistentTopicsBase extends AdminResource {
}
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
try {
Subscription sub = topic.getSubscription(subName);
@@ -690,7 +720,7 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(e);
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
@@ -721,7 +751,7 @@ public class PersistentTopicsBase extends AdminResource {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages
on a partitioned topic is not allowed");
}
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
@@ -810,7 +840,7 @@ public class PersistentTopicsBase extends AdminResource {
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
log.info("[{}][{}] received reset cursor on subscription {} to
time {}", clientAppId(), topicName,
subName, timestamp);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
@@ -883,7 +913,7 @@ public class PersistentTopicsBase extends AdminResource {
throw exception.get();
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subscriptionName,
authoritative);
PersistentTopic topic = (PersistentTopic)
getOrCreateTopic(topicName);
@@ -925,7 +955,7 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for
partitioned-topic");
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
if (topic == null) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
@@ -960,7 +990,7 @@ public class PersistentTopicsBase extends AdminResource {
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages
on a partitioned topic is not allowed");
}
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName,
subName);
@@ -1112,7 +1142,7 @@ public class PersistentTopicsBase extends AdminResource {
}
} else {
// validate ownership and redirect if current broker is not owner
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent
topic {} {}", clientAppId(), topicName,
subName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 561351d..1f2c8c9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -226,6 +226,19 @@ public class Namespaces extends NamespacesBase {
internalGrantPermissionOnNamespace(role, actions);
}
+ @POST
+
@Path("/{property}/{cluster}/{namespace}/permissions/subscription/{subscription}")
+ @ApiOperation(hidden = true, value = "Grant a new permission to roles for
a subscription.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 501, message = "Authorization is not
enabled")})
+ public void grantPermissionOnSubscription(@PathParam("property") String
property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription, Set<String> roles) {
+ validateNamespaceName(property, cluster, namespace);
+ internalGrantPermissionOnSubscription(subscription, roles);
+ }
+
@DELETE
@Path("/{property}/{cluster}/{namespace}/permissions/{role}")
@ApiOperation(hidden = true, value = "Revoke all permissions to a role on
a namespace.")
@@ -238,6 +251,18 @@ public class Namespaces extends NamespacesBase {
internalRevokePermissionsOnNamespace(role);
}
+ @DELETE
+
@Path("/{property}/{cluster}/{namespace}/permissions/{subscription}/{role}")
+ @ApiOperation(hidden = true, value = "Revoke subscription admin-api access
permission for a role.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
+ public void revokePermissionOnSubscription(@PathParam("property") String
property,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
+ @PathParam("subscription") String subscription, @PathParam("role")
String role) {
+ validateNamespaceName(property, cluster, namespace);
+ internalRevokePermissionsOnSubscription(subscription, role);
+ }
+
@GET
@Path("/{property}/{cluster}/{namespace}/replication")
@ApiOperation(hidden = true, value = "Get the replication clusters for a
namespace.", response = String.class, responseContainer = "List")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 58686d8..b116003 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -172,6 +172,20 @@ public class Namespaces extends NamespacesBase {
internalGrantPermissionOnNamespace(role, actions);
}
+ @POST
+ @Path("/{property}/{namespace}/permissions/subscription/{subscription}")
+ @ApiOperation(hidden = true, value = "Grant a new permission to roles for
a subscription.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 501, message = "Authorization is not enabled")
})
+ public void grantPermissionOnSubscription(@PathParam("property") String
property,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
+ Set<String> roles) {
+ validateNamespaceName(property, namespace);
+ internalGrantPermissionOnSubscription(subscription, roles);
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/permissions/{role}")
@ApiOperation(value = "Revoke all permissions to a role on a namespace.")
@@ -183,6 +197,18 @@ public class Namespaces extends NamespacesBase {
internalRevokePermissionsOnNamespace(role);
}
+ @DELETE
+ @Path("/{property}/{namespace}/permissions/{subscription}/{role}")
+ @ApiOperation(hidden = true, value = "Revoke subscription admin-api access
permission for a role.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
+ public void revokePermissionOnSubscription(@PathParam("property") String
property,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
+ @PathParam("role") String role) {
+ validateNamespaceName(property, namespace);
+ internalRevokePermissionsOnSubscription(subscription, role);
+ }
+
@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.",
response = String.class, responseContainer = "List")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 6b17a73..01160fe 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -587,7 +587,7 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
}
}
- @Test(invocationCount = 1)
+ @Test
public void namespaces() throws PulsarAdminException,
PulsarServerException, Exception {
admin.clusters().createCluster("usw", new ClusterData());
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e31e163..033c638 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,9 +20,11 @@ package org.apache.pulsar.client.api;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -37,10 +39,11 @@ import
org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
@@ -49,7 +52,6 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -57,6 +59,7 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
private final static String clientRole = "plugbleRole";
+ private final static Set<String> clientAuthProviderSupportedRoles =
Sets.newHashSet(clientRole);
protected void setup() throws Exception {
@@ -146,6 +149,101 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
}
@Test
+ public void testSubscriberPermission() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ setup();
+
+ final String tenantRole = "tenant-role";
+ final String subscriptionRole = "sub1-role";
+ final String subscriptionName = "sub1";
+ final String namespace = "my-property/my-ns-sub-auth";
+ final String topicName = "persistent://" + namespace + "/my-topic";
+ Authentication adminAuthentication = new
ClientAuthentication("superUser");
+
+ clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+ PulsarAdmin superAdmin = spy(
+
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
+
+ Authentication tenantAdminAuthentication = new
ClientAuthentication(tenantRole);
+ PulsarAdmin tenantAdmin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(tenantAdminAuthentication).build());
+
+ Authentication subAdminAuthentication = new
ClientAuthentication(subscriptionRole);
+ PulsarAdmin sub1Admin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(subAdminAuthentication).build());
+
+ String lookupUrl;
+ lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+ Authentication authentication = new
ClientAuthentication(subscriptionRole);
+
+ superAdmin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet(tenantRole),
Sets.newHashSet("test")));
+ superAdmin.namespaces().createNamespace(namespace,
Sets.newHashSet("test"));
+ tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
+ Collections.singleton(AuthAction.consume));
+
+ pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();
+ // (1) Create subscription name
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ consumer.close();
+
+ // verify tenant is able to perform all subscription-admin api
+ tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
+ tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
+ tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+ tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
+ tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
+ tenantAdmin.topics().resetCursor(topicName, subscriptionName,
MessageId.earliest);
+
+ // grant namespace-level authorization to the subscriptionRole
+ tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
+ Collections.singleton(AuthAction.consume));
+
+ // subscriptionRole has namespace-level authorization
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+
+ // grant subscription access to specific different role and only that
role can access the subscription
+ String otherPrincipal = "Principal-1-to-access-sub";
+ superAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscriptionName,
+ Collections.singleton(otherPrincipal));
+
+ // now, subscriptionRole doesn't have subscription level access so, it
will fail to access subscription
+ try {
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+ fail("should have fail with authorization exception");
+ } catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+ // Ok
+ }
+
+ // now, grant subscription-access to subscriptionRole as well
+ superAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscriptionName,
+ Sets.newHashSet(otherPrincipal, subscriptionRole));
+
+ sub1Admin.topics().skipAllMessages(topicName, subscriptionName);
+ sub1Admin.topics().skipMessages(topicName, subscriptionName, 1);
+ sub1Admin.topics().expireMessages(topicName, subscriptionName, 10);
+ sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+ sub1Admin.topics().resetCursor(topicName, subscriptionName,
MessageId.earliest);
+
+ superAdmin.namespaces().revokePermissionOnSubscription(namespace,
subscriptionName, subscriptionRole);
+
+ try {
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+ fail("should have fail with authorization exception");
+ } catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+ // Ok
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ @Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -317,19 +415,19 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
@Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName,
String role,
AuthenticationDataSource authenticationData) {
- return CompletableFuture.completedFuture(clientRole.equals(role));
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName,
String role,
AuthenticationDataSource authenticationData, String
subscription) {
- return CompletableFuture.completedFuture(clientRole.equals(role));
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canLookupAsync(TopicName topicName,
String role,
AuthenticationDataSource authenticationData) {
- return CompletableFuture.completedFuture(clientRole.equals(role));
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
@@ -343,6 +441,18 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
String authenticationData) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, Set<String> roles, String
authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, String role, String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
}
/**
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index cbd5506..db5a775 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -377,6 +377,24 @@ public interface Namespaces {
void revokePermissionsOnNamespace(String namespace, String role) throws
PulsarAdminException;
/**
+ * Grant permission to role to access subscription's admin-api.
+ * @param namespace
+ * @param subscription
+ * @param roles
+ * @throws PulsarAdminException
+ */
+ void grantPermissionOnSubscription(String namespace, String subscription,
Set<String> roles) throws PulsarAdminException;
+
+ /**
+ * Revoke permissions on a subscription's admin-api access.
+ * @param namespace
+ * @param subscription
+ * @param role
+ * @throws PulsarAdminException
+ */
+ void revokePermissionOnSubscription(String namespace, String subscription,
String role) throws PulsarAdminException;
+
+ /**
* Get the replication clusters for a namespace.
* <p>
* Response example:
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 833c2c3..6146e11 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -233,6 +233,30 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
}
}
+
+ @Override
+ public void grantPermissionOnSubscription(String namespace, String
subscription, Set<String> roles)
+ throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", "subscription",
subscription);
+ request(path).post(Entity.entity(roles,
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void revokePermissionOnSubscription(String namespace, String
subscription, String role) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", subscription,
role);
+ request(path).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
@Override
public List<String> getNamespaceReplicationClusters(String namespace)
throws PulsarAdminException {
try {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 6650618..faf04e5 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.admin.cli.utils.IOUtils;
@@ -198,6 +199,42 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Grant permissions to access subscription
admin-api")
+ private class GrantSubscriptionPermissions extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--subscription", description = "Subscription name
for which permission will be granted to roles", required = true)
+ private String subscription;
+
+ @Parameter(names = "--roles", description = "Client roles to which
grant permissions (comma separated roles)", required = true, splitter =
CommaParameterSplitter.class)
+ private List<String> roles;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().grantPermissionOnSubscription(namespace,
subscription, Sets.newHashSet(roles));
+ }
+ }
+
+ @Parameters(commandDescription = "Revoke permissions on a namespace")
+ private class RevokeSubscriptionPermissions extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--subscription", description = "Subscription name
for which permission will be granted to roles", required = true)
+ private String subscription;
+
+ @Parameter(names = "--role", description = "Client role to which
revoke permissions", required = true)
+ private String role;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().revokePermissionOnSubscription(namespace,
subscription, role);
+ }
+ }
+
@Parameters(commandDescription = "Get the permissions on a namespace")
private class Permissions extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
@@ -987,7 +1024,10 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("permissions", new Permissions());
jcommander.addCommand("grant-permission", new GrantPermissions());
jcommander.addCommand("revoke-permission", new RevokePermissions());
-
+
+ jcommander.addCommand("grant-subscription-permission", new
GrantSubscriptionPermissions());
+ jcommander.addCommand("revoke-subscription-permission", new
RevokeSubscriptionPermissions());
+
jcommander.addCommand("set-clusters", new SetReplicationClusters());
jcommander.addCommand("get-clusters", new GetReplicationClusters());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
index 553b576..bd0c61b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
@@ -27,10 +27,12 @@ import com.google.common.collect.Maps;
public class AuthPolicies {
public final Map<String, Set<AuthAction>> namespace_auth;
public final Map<String, Map<String, Set<AuthAction>>> destination_auth;
+ public final Map<String, Set<String>> subscription_auth_roles;
public AuthPolicies() {
namespace_auth = Maps.newTreeMap();
destination_auth = Maps.newTreeMap();
+ subscription_auth_roles = Maps.newTreeMap();
}
@Override
@@ -38,7 +40,8 @@ public class AuthPolicies {
if (obj instanceof AuthPolicies) {
AuthPolicies other = (AuthPolicies) obj;
return Objects.equals(namespace_auth, other.namespace_auth)
- && Objects.equals(destination_auth,
other.destination_auth);
+ && Objects.equals(destination_auth, other.destination_auth)
+ && Objects.equals(subscription_auth_roles,
other.subscription_auth_roles);
}
return false;