merlimat closed pull request #2981: Allow subscribers to access subscription
admin-api
URL: https://github.com/apache/pulsar/pull/2981
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index cae2415dd1..5d8b930e85 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -101,6 +101,29 @@
CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace,
Set<AuthAction> actions, String role,
String authDataJson);
+ /**
+ * Grant permission to roles that can access subscription-admin api
+ *
+ * @param namespace
+ * @param subscriptionName
+ * @param roles
+ * @param authDataJson
+ * additional authdata in json format
+ * @return
+ */
+ CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName
namespace, String subscriptionName, Set<String> roles,
+ String authDataJson);
+
+ /**
+ * Revoke subscription admin-api access for a role
+ * @param namespace
+ * @param subscriptionName
+ * @param role
+ * @return
+ */
+ CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName
namespace, String subscriptionName,
+ String role, String authDataJson);
+
/**
* Grant authorization-action permission on a topic to the given client
*
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 76e98ccfd9..95d012f502 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -94,6 +94,41 @@ public AuthorizationService(ServiceConfiguration conf,
ConfigurationCacheService
return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured"));
}
+ /**
+ * Grant permission to roles that can access subscription-admin api
+ *
+ * @param namespace
+ * @param subscriptionName
+ * @param roles
+ * @param authDataJson
+ * additional authdata in json for targeted authorization
provider
+ * @return
+ */
+ public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ Set<String> roles, String authDataJson) {
+
+ if (provider != null) {
+ return provider.grantSubscriptionPermissionAsync(namespace,
subscriptionName, roles, authDataJson);
+ }
+ return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured"));
+ }
+
+ /**
+ * Revoke subscription admin-api access for a role
+ *
+ * @param namespace
+ * @param subscriptionName
+ * @param role
+ * @return
+ */
+ public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ String role, String authDataJson) {
+ if (provider != null) {
+ return provider.revokeSubscriptionPermissionAsync(namespace,
subscriptionName, role, authDataJson);
+ }
+ return FutureUtil.failedFuture(new IllegalStateException("No
authorization provider configured"));
+ }
+
/**
* Grant authorization-action permission on a topic to the given client
*
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index fcd89b49b9..0f2df0be54 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -23,6 +23,7 @@
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -44,6 +45,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Sets;
+
/**
* Default authorization provider that stores authorization policies under
local-zookeeper.
*
@@ -110,6 +113,19 @@ public void initialize(ServiceConfiguration conf,
ConfigurationCacheService conf
}
} else {
if (isNotBlank(subscription) && !isSuperUser(role)) {
+ // validate if role is authorize to access
subscription. (skip validatation if authorization
+ // list is empty)
+ Set<String> roles =
policies.get().auth_policies.subscription_auth_roles.get(subscription);
+ if (roles != null && !roles.isEmpty() &&
!roles.contains(role)) {
+ log.warn("[{}] is not authorized to subscribe on
{}-{}", role, topicName, subscription);
+ PulsarServerException ex = new
PulsarServerException(
+ String.format("%s is not authorized to
access subscription %s on topic %s", role,
+ subscription, topicName));
+ permissionFuture.complete(false);
+ return;
+ }
+
+ // validate if subscription-auth mode is configured
switch (policies.get().subscription_auth_mode) {
case Prefix:
if (!subscription.startsWith(role)) {
@@ -125,6 +141,7 @@ public void initialize(ServiceConfiguration conf,
ConfigurationCacheService conf
}
}
}
+ // check namespace and topic level consume-permissions
checkAuthorization(topicName, role,
AuthAction.consume).thenAccept(isAuthorized -> {
permissionFuture.complete(isAuthorized);
});
@@ -241,6 +258,70 @@ public void initialize(ServiceConfiguration conf,
ConfigurationCacheService conf
return result;
}
+ @Override
+ public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ Set<String> roles, String authDataJson) {
+ return updateSubscriptionPermissionAsync(namespace, subscriptionName,
roles, false);
+ }
+
+ @Override
+ public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+ String role, String authDataJson) {
+ return updateSubscriptionPermissionAsync(namespace, subscriptionName,
Collections.singleton(role), true);
+ }
+
+ private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName, Set<String> roles,
+ boolean remove) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+
+ try {
+ validatePoliciesReadOnlyAccess();
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+
+ ZooKeeper globalZk = configCache.getZooKeeper();
+ final String policiesPath = String.format("/%s/%s/%s", "admin",
POLICIES, namespace.toString());
+
+ try {
+ Stat nodeStat = new Stat();
+ byte[] content = globalZk.getData(policiesPath, null, nodeStat);
+ Policies policies = getThreadLocal().readValue(content,
Policies.class);
+ if (remove) {
+ if
(policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
+
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
+ }else {
+ log.info("[{}] Couldn't find role {} while revoking for
sub = {}", namespace, subscriptionName, roles);
+ result.completeExceptionally(new
IllegalArgumentException("couldn't find subscription"));
+ return result;
+ }
+ } else {
+
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
+ }
+
+ // Write back the new policies into zookeeper
+ globalZk.setData(policiesPath,
getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());
+
+ configCache.policiesCache().invalidate(policiesPath);
+
+ log.info("[{}] Successfully granted access for role {} for sub =
{}", namespace, subscriptionName, roles);
+ result.complete(null);
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to set permissions for namespace {}: does
not exist", subscriptionName, namespace);
+ result.completeExceptionally(new
IllegalArgumentException("Namespace does not exist" + namespace));
+ } catch (KeeperException.BadVersionException e) {
+ log.warn("[{}] Failed to set permissions for {} on namespace {}:
concurrent modification", subscriptionName, roles, namespace);
+ result.completeExceptionally(new IllegalStateException(
+ "Concurrent modification on zk path: " + policiesPath + ",
" + e.getMessage()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to get permissions for role {} on namespace
{}", subscriptionName, roles, namespace, e);
+ result.completeExceptionally(
+ new IllegalStateException("Failed to get permissions for
namespace " + namespace));
+ }
+
+ return result;
+ }
+
private CompletableFuture<Boolean> checkAuthorization(TopicName topicName,
String role, AuthAction action) {
if (isSuperUser(role)) {
return CompletableFuture.completedFuture(true);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a00535430a..113a910fd6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -329,6 +329,38 @@ protected void internalGrantPermissionOnNamespace(String
role, Set<AuthAction> a
}
}
+
+ protected void internalGrantPermissionOnSubscription(String subscription,
Set<String> roles) {
+ /** controlled by system-admin(super-user) to prevent metadata
footprint size */
+ validateSuperUserAccess();
+
+ try {
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ authService.grantSubscriptionPermissionAsync(namespaceName,
subscription, roles,
+ null/* additional auth-data json */).get();
+ } else {
+ throw new RestException(Status.NOT_IMPLEMENTED, "Authorization
is not enabled");
+ }
+ } catch (InterruptedException e) {
+ log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
+ throw new RestException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for namespace {}:
does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
+ } else if (e.getCause() instanceof IllegalStateException) {
+ log.warn("[{}] Failed to set permissions for namespace {}:
concurrent modification", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } else {
+ log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
+ throw new RestException(e);
+ }
+ }
+ }
+
protected void internalRevokePermissionsOnNamespace(String role) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
@@ -359,6 +391,19 @@ protected void internalRevokePermissionsOnNamespace(String
role) {
}
}
+ protected void internalRevokePermissionsOnSubscription(String
subscriptionName, String role) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ authService.revokeSubscriptionPermissionAsync(namespaceName,
subscriptionName, role,
+ null/* additional auth-data json */);
+ } else {
+ throw new RestException(Status.NOT_IMPLEMENTED, "Authorization is
not enabled");
+ }
+ }
+
protected Set<String> internalGetNamespaceReplicationClusters() {
if (!namespaceName.isGlobal()) {
throw new RestException(Status.PRECONDITION_FAILED,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 34c324a522..24c30a7be7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -235,6 +235,36 @@ public void validateAdminOperationOnTopic(boolean
authoritative) {
validateTopicOwnership(topicName, authoritative);
}
+ protected void validateAdminAccessForSubscriber(String subscriptionName,
boolean authoritative) {
+ validateTopicOwnership(topicName, authoritative);
+ try {
+ validateAdminAccessForTenant(topicName.getTenant());
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] failed to validate admin access for {}",
topicName, clientAppId());
+ }
+ validateAdminAccessForSubscriber(subscriptionName);
+ }
+ }
+
+ private void validateAdminAccessForSubscriber(String subscriptionName) {
+ try {
+ if
(!pulsar().getBrokerService().getAuthorizationService().canConsume(topicName,
clientAppId(),
+ clientAuthData(), subscriptionName)) {
+ log.warn("[{}} Subscriber {} is not authorized to access api",
topicName, clientAppId());
+ throw new RestException(Status.UNAUTHORIZED,
+ String.format("Subscriber %s is not authorized to
access this operation", clientAppId()));
+ }
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ // unknown error marked as internal server error
+ log.warn("Unexpected error while authorizing request. topic={},
role={}. Error: {}", topicName,
+ clientAppId(), e.getMessage(), e);
+ throw new RestException(e);
+ }
+ }
+
protected void internalGrantPermissionsOnTopic(String role,
Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
@@ -654,7 +684,7 @@ protected void internalDeleteSubscription(String subName,
boolean authoritative)
}
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
Topic topic = getTopicReference(topicName);
try {
Subscription sub = topic.getSubscription(subName);
@@ -690,7 +720,7 @@ protected void internalSkipAllMessages(String subName,
boolean authoritative) {
throw new RestException(e);
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
@@ -721,7 +751,7 @@ protected void internalSkipMessages(String subName, int
numMessages, boolean aut
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages
on a partitioned topic is not allowed");
}
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
@@ -810,7 +840,7 @@ protected void internalResetCursor(String subName, long
timestamp, boolean autho
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
log.info("[{}][{}] received reset cursor on subscription {} to
time {}", clientAppId(), topicName,
subName, timestamp);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
@@ -883,7 +913,7 @@ protected void internalCreateSubscription(String
subscriptionName, MessageIdImpl
throw exception.get();
}
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subscriptionName,
authoritative);
PersistentTopic topic = (PersistentTopic)
getOrCreateTopic(topicName);
@@ -925,7 +955,7 @@ protected void internalResetCursorOnPosition(String
subName, boolean authoritati
throw new RestException(Status.METHOD_NOT_ALLOWED,
"Reset-cursor at position is not allowed for
partitioned-topic");
} else {
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
PersistentTopic topic = (PersistentTopic)
getTopicReference(topicName);
if (topic == null) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
@@ -960,7 +990,7 @@ protected Response internalPeekNthMessage(String subName,
int messagePosition, b
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages
on a partitioned topic is not allowed");
}
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent topic {}
{}", clientAppId(), topicName,
subName);
@@ -1112,7 +1142,7 @@ protected void internalExpireMessages(String subName, int
expireTimeInSeconds, b
}
} else {
// validate ownership and redirect if current broker is not owner
- validateAdminOperationOnTopic(authoritative);
+ validateAdminAccessForSubscriber(subName, authoritative);
if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
log.error("[{}] Not supported operation of non-persistent
topic {} {}", clientAppId(), topicName,
subName);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 561351da91..1f2c8c93c1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -226,6 +226,19 @@ public void
grantPermissionOnNamespace(@PathParam("property") String property, @
internalGrantPermissionOnNamespace(role, actions);
}
+ @POST
+
@Path("/{property}/{cluster}/{namespace}/permissions/subscription/{subscription}")
+ @ApiOperation(hidden = true, value = "Grant a new permission to roles for
a subscription.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 501, message = "Authorization is not
enabled")})
+ public void grantPermissionOnSubscription(@PathParam("property") String
property, @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription, Set<String> roles) {
+ validateNamespaceName(property, cluster, namespace);
+ internalGrantPermissionOnSubscription(subscription, roles);
+ }
+
@DELETE
@Path("/{property}/{cluster}/{namespace}/permissions/{role}")
@ApiOperation(hidden = true, value = "Revoke all permissions to a role on
a namespace.")
@@ -238,6 +251,18 @@ public void
revokePermissionsOnNamespace(@PathParam("property") String property,
internalRevokePermissionsOnNamespace(role);
}
+ @DELETE
+
@Path("/{property}/{cluster}/{namespace}/permissions/{subscription}/{role}")
+ @ApiOperation(hidden = true, value = "Revoke subscription admin-api access
permission for a role.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
+ public void revokePermissionOnSubscription(@PathParam("property") String
property,
+ @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
+ @PathParam("subscription") String subscription, @PathParam("role")
String role) {
+ validateNamespaceName(property, cluster, namespace);
+ internalRevokePermissionsOnSubscription(subscription, role);
+ }
+
@GET
@Path("/{property}/{cluster}/{namespace}/replication")
@ApiOperation(hidden = true, value = "Get the replication clusters for a
namespace.", response = String.class, responseContainer = "List")
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index e335521053..0e7953146d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -171,6 +171,20 @@ public void
grantPermissionOnNamespace(@PathParam("tenant") String tenant,
internalGrantPermissionOnNamespace(role, actions);
}
+ @POST
+ @Path("/{property}/{namespace}/permissions/subscription/{subscription}")
+ @ApiOperation(hidden = true, value = "Grant a new permission to roles for
a subscription.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification"),
+ @ApiResponse(code = 501, message = "Authorization is not enabled")
})
+ public void grantPermissionOnSubscription(@PathParam("property") String
property,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
+ Set<String> roles) {
+ validateNamespaceName(property, namespace);
+ internalGrantPermissionOnSubscription(subscription, roles);
+ }
+
@DELETE
@Path("/{tenant}/{namespace}/permissions/{role}")
@ApiOperation(value = "Revoke all permissions to a role on a namespace.")
@@ -182,6 +196,18 @@ public void
revokePermissionsOnNamespace(@PathParam("tenant") String tenant,
internalRevokePermissionsOnNamespace(role);
}
+ @DELETE
+ @Path("/{property}/{namespace}/permissions/{subscription}/{role}")
+ @ApiOperation(hidden = true, value = "Revoke subscription admin-api access
permission for a role.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 404, message = "Property or cluster or
namespace doesn't exist") })
+ public void revokePermissionOnSubscription(@PathParam("property") String
property,
+ @PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
+ @PathParam("role") String role) {
+ validateNamespaceName(property, namespace);
+ internalRevokePermissionsOnSubscription(subscription, role);
+ }
+
@GET
@Path("/{tenant}/{namespace}/replication")
@ApiOperation(value = "Get the replication clusters for a namespace.",
response = String.class, responseContainer = "List")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 6b17a7320f..01160fec8d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -587,7 +587,7 @@ public void properties() throws PulsarAdminException {
}
}
- @Test(invocationCount = 1)
+ @Test
public void namespaces() throws PulsarAdminException,
PulsarServerException, Exception {
admin.clusters().createCluster("usw", new ClusterData());
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"),
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index e31e1630d7..033c638fdd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -20,9 +20,11 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -37,10 +39,11 @@
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationProvider;
import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
@@ -49,7 +52,6 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -57,6 +59,7 @@
private static final Logger log =
LoggerFactory.getLogger(AuthorizationProducerConsumerTest.class);
private final static String clientRole = "plugbleRole";
+ private final static Set<String> clientAuthProviderSupportedRoles =
Sets.newHashSet(clientRole);
protected void setup() throws Exception {
@@ -145,6 +148,101 @@ public void testProducerAndConsumerAuthorization() throws
Exception {
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testSubscriberPermission() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ setup();
+
+ final String tenantRole = "tenant-role";
+ final String subscriptionRole = "sub1-role";
+ final String subscriptionName = "sub1";
+ final String namespace = "my-property/my-ns-sub-auth";
+ final String topicName = "persistent://" + namespace + "/my-topic";
+ Authentication adminAuthentication = new
ClientAuthentication("superUser");
+
+ clientAuthProviderSupportedRoles.add(subscriptionRole);
+
+ PulsarAdmin superAdmin = spy(
+
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
+
+ Authentication tenantAdminAuthentication = new
ClientAuthentication(tenantRole);
+ PulsarAdmin tenantAdmin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(tenantAdminAuthentication).build());
+
+ Authentication subAdminAuthentication = new
ClientAuthentication(subscriptionRole);
+ PulsarAdmin sub1Admin =
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(subAdminAuthentication).build());
+
+ String lookupUrl;
+ lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
+
+ Authentication authentication = new
ClientAuthentication(subscriptionRole);
+
+ superAdmin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet(tenantRole),
Sets.newHashSet("test")));
+ superAdmin.namespaces().createNamespace(namespace,
Sets.newHashSet("test"));
+ tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
+ Collections.singleton(AuthAction.consume));
+
+ pulsarClient =
PulsarClient.builder().serviceUrl(lookupUrl).authentication(authentication).build();
+ // (1) Create subscription name
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
+ .subscribe();
+ consumer.close();
+
+ // verify tenant is able to perform all subscription-admin api
+ tenantAdmin.topics().skipAllMessages(topicName, subscriptionName);
+ tenantAdmin.topics().skipMessages(topicName, subscriptionName, 1);
+ tenantAdmin.topics().expireMessages(topicName, subscriptionName, 10);
+ tenantAdmin.topics().peekMessages(topicName, subscriptionName, 1);
+ tenantAdmin.topics().resetCursor(topicName, subscriptionName, 10);
+ tenantAdmin.topics().resetCursor(topicName, subscriptionName,
MessageId.earliest);
+
+ // grant namespace-level authorization to the subscriptionRole
+ tenantAdmin.namespaces().grantPermissionOnNamespace(namespace,
subscriptionRole,
+ Collections.singleton(AuthAction.consume));
+
+ // subscriptionRole has namespace-level authorization
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+
+ // grant subscription access to specific different role and only that
role can access the subscription
+ String otherPrincipal = "Principal-1-to-access-sub";
+ superAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscriptionName,
+ Collections.singleton(otherPrincipal));
+
+ // now, subscriptionRole doesn't have subscription level access so, it
will fail to access subscription
+ try {
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+ fail("should have fail with authorization exception");
+ } catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+ // Ok
+ }
+
+ // now, grant subscription-access to subscriptionRole as well
+ superAdmin.namespaces().grantPermissionOnSubscription(namespace,
subscriptionName,
+ Sets.newHashSet(otherPrincipal, subscriptionRole));
+
+ sub1Admin.topics().skipAllMessages(topicName, subscriptionName);
+ sub1Admin.topics().skipMessages(topicName, subscriptionName, 1);
+ sub1Admin.topics().expireMessages(topicName, subscriptionName, 10);
+ sub1Admin.topics().peekMessages(topicName, subscriptionName, 1);
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+ sub1Admin.topics().resetCursor(topicName, subscriptionName,
MessageId.earliest);
+
+ superAdmin.namespaces().revokePermissionOnSubscription(namespace,
subscriptionName, subscriptionRole);
+
+ try {
+ sub1Admin.topics().resetCursor(topicName, subscriptionName, 10);
+ fail("should have fail with authorization exception");
+ } catch
(org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException e) {
+ // Ok
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
@Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -317,19 +415,19 @@ public void initialize(ServiceConfiguration conf,
ConfigurationCacheService conf
@Override
public CompletableFuture<Boolean> canProduceAsync(TopicName topicName,
String role,
AuthenticationDataSource authenticationData) {
- return CompletableFuture.completedFuture(clientRole.equals(role));
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName,
String role,
AuthenticationDataSource authenticationData, String
subscription) {
- return CompletableFuture.completedFuture(clientRole.equals(role));
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
public CompletableFuture<Boolean> canLookupAsync(TopicName topicName,
String role,
AuthenticationDataSource authenticationData) {
- return CompletableFuture.completedFuture(clientRole.equals(role));
+ return
CompletableFuture.completedFuture(clientAuthProviderSupportedRoles.contains(role));
}
@Override
@@ -343,6 +441,18 @@ public void initialize(ServiceConfiguration conf,
ConfigurationCacheService conf
String authenticationData) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, Set<String> roles, String
authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void>
revokeSubscriptionPermissionAsync(NamespaceName namespace,
+ String subscriptionName, String role, String authDataJson) {
+ return CompletableFuture.completedFuture(null);
+ }
}
/**
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 6460c1ff5c..f9b57e3710 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -375,6 +375,24 @@
*/
void revokePermissionsOnNamespace(String namespace, String role) throws
PulsarAdminException;
+ /**
+ * Grant permission to role to access subscription's admin-api.
+ * @param namespace
+ * @param subscription
+ * @param roles
+ * @throws PulsarAdminException
+ */
+ void grantPermissionOnSubscription(String namespace, String subscription,
Set<String> roles) throws PulsarAdminException;
+
+ /**
+ * Revoke permissions on a subscription's admin-api access.
+ * @param namespace
+ * @param subscription
+ * @param role
+ * @throws PulsarAdminException
+ */
+ void revokePermissionOnSubscription(String namespace, String subscription,
String role) throws PulsarAdminException;
+
/**
* Get the replication clusters for a namespace.
* <p>
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c9845a918c..be3f548522 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -232,6 +232,30 @@ public void revokePermissionsOnNamespace(String namespace,
String role) throws P
}
}
+
+ @Override
+ public void grantPermissionOnSubscription(String namespace, String
subscription, Set<String> roles)
+ throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", "subscription",
subscription);
+ request(path).post(Entity.entity(roles,
MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void revokePermissionOnSubscription(String namespace, String
subscription, String role) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "permissions", subscription,
role);
+ request(path).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
@Override
public List<String> getNamespaceReplicationClusters(String namespace)
throws PulsarAdminException {
try {
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index cc1654bc8c..a29908aaa4 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.admin.cli.utils.IOUtils;
@@ -196,6 +197,42 @@ void run() throws PulsarAdminException {
}
}
+ @Parameters(commandDescription = "Grant permissions to access subscription
admin-api")
+ private class GrantSubscriptionPermissions extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--subscription", description = "Subscription name
for which permission will be granted to roles", required = true)
+ private String subscription;
+
+ @Parameter(names = "--roles", description = "Client roles to which
grant permissions (comma separated roles)", required = true, splitter =
CommaParameterSplitter.class)
+ private List<String> roles;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().grantPermissionOnSubscription(namespace,
subscription, Sets.newHashSet(roles));
+ }
+ }
+
+ @Parameters(commandDescription = "Revoke permissions on a namespace")
+ private class RevokeSubscriptionPermissions extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = "--subscription", description = "Subscription name
for which permission will be granted to roles", required = true)
+ private String subscription;
+
+ @Parameter(names = "--role", description = "Client role to which
revoke permissions", required = true)
+ private String role;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().revokePermissionOnSubscription(namespace,
subscription, role);
+ }
+ }
+
@Parameters(commandDescription = "Get the permissions on a namespace")
private class Permissions extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
@@ -942,7 +979,10 @@ public CmdNamespaces(PulsarAdmin admin) {
jcommander.addCommand("permissions", new Permissions());
jcommander.addCommand("grant-permission", new GrantPermissions());
jcommander.addCommand("revoke-permission", new RevokePermissions());
-
+
+ jcommander.addCommand("grant-subscription-permission", new
GrantSubscriptionPermissions());
+ jcommander.addCommand("revoke-subscription-permission", new
RevokeSubscriptionPermissions());
+
jcommander.addCommand("set-clusters", new SetReplicationClusters());
jcommander.addCommand("get-clusters", new GetReplicationClusters());
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
index 553b576543..bd0c61b008 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthPolicies.java
@@ -27,10 +27,12 @@
public class AuthPolicies {
public final Map<String, Set<AuthAction>> namespace_auth;
public final Map<String, Map<String, Set<AuthAction>>> destination_auth;
+ public final Map<String, Set<String>> subscription_auth_roles;
public AuthPolicies() {
namespace_auth = Maps.newTreeMap();
destination_auth = Maps.newTreeMap();
+ subscription_auth_roles = Maps.newTreeMap();
}
@Override
@@ -38,7 +40,8 @@ public boolean equals(Object obj) {
if (obj instanceof AuthPolicies) {
AuthPolicies other = (AuthPolicies) obj;
return Objects.equals(namespace_auth, other.namespace_auth)
- && Objects.equals(destination_auth,
other.destination_auth);
+ && Objects.equals(destination_auth, other.destination_auth)
+ && Objects.equals(subscription_auth_roles,
other.subscription_auth_roles);
}
return false;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services