This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e220a5d04ae [improve][broker] Support revoking permission for
AuthorizationProvider (#20456)
e220a5d04ae is described below
commit e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jun 2 14:16:44 2023 +0800
[improve][broker] Support revoking permission for AuthorizationProvider
(#20456)
---
.../authorization/AuthorizationProvider.java | 29 ++++++++++++-
.../broker/authorization/AuthorizationService.java | 29 +++++++++++--
.../authorization/PulsarAuthorizationProvider.java | 50 ++++++++++++++++++++++
.../api/AuthorizationProducerConsumerTest.java | 48 +++++++++++++++++++++
4 files changed, 150 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 67e096bee63..b54b2089e1e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -185,6 +185,18 @@ public interface AuthorizationProvider extends Closeable {
CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace,
Set<AuthAction> actions, String role,
String authDataJson);
+ /**
+ * Revoke authorization-action permission on a namespace to the given
client.
+ * @param namespace
+ * @param role
+ * @return CompletableFuture<Void>
+ */
+ default CompletableFuture<Void> revokePermissionAsync(NamespaceName
namespace, String role) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("revokePermissionAsync on namespace %s is not
supported by the Authorization",
+ namespace)));
+ }
+
/**
* Grant permission to roles that can access subscription-admin api.
*
@@ -193,7 +205,7 @@ public interface AuthorizationProvider extends Closeable {
* @param roles
* @param authDataJson
* additional authdata in json format
- * @return
+ * @return CompletableFuture<Void>
*/
CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName
namespace, String subscriptionName,
Set<String>
roles, String authDataJson);
@@ -203,7 +215,7 @@ public interface AuthorizationProvider extends Closeable {
* @param namespace
* @param subscriptionName
* @param role
- * @return
+ * @return CompletableFuture<Void>
*/
CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName
namespace, String subscriptionName,
String role, String authDataJson);
@@ -226,6 +238,19 @@ public interface AuthorizationProvider extends Closeable {
CompletableFuture<Void> grantPermissionAsync(TopicName topicName,
Set<AuthAction> actions, String role,
String authDataJson);
+
+ /**
+ * Revoke authorization-action permission on a topic to the given client.
+ * @param topicName
+ * @param role
+ * @return CompletableFuture<Void>
+ */
+ default CompletableFuture<Void> revokePermissionAsync(TopicName topicName,
String role) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("revokePermissionAsync on topicName %s is not
supported by the Authorization",
+ topicName)));
+ }
+
/**
* Check if a given <tt>role</tt> is allowed to execute a given
<tt>operation</tt> on the tenant.
*
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6f303e2117f..29abcc1eee4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -123,6 +123,17 @@ public class AuthorizationService {
return provider.grantPermissionAsync(namespace, actions, role,
authDataJson);
}
+ /**
+ *
+ * Revoke authorization-action permission on a namespace to the given
client.
+ *
+ * @param namespace
+ * @param role
+ */
+ public CompletableFuture<Void> revokePermissionAsync(NamespaceName
namespace, String role) {
+ return provider.revokePermissionAsync(namespace, role);
+ }
+
/**
* Grant permission to roles that can access subscription-admin api.
*
@@ -157,16 +168,26 @@ public class AuthorizationService {
* NOTE: used to complete with {@link IllegalArgumentException} when
namespace not found or with
* {@link IllegalStateException} when failed to grant permission.
*
- * @param topicname
+ * @param topicName
* @param role
* @param authDataJson
* additional authdata in json for targeted authorization
provider
* @completesWith null when the permissions are updated successfully.
* @completesWith {@link MetadataStoreException} when the MetadataStore is
not updated.
*/
- public CompletableFuture<Void> grantPermissionAsync(TopicName topicname,
Set<AuthAction> actions, String role,
+ public CompletableFuture<Void> grantPermissionAsync(TopicName topicName,
Set<AuthAction> actions, String role,
String authDataJson) {
- return provider.grantPermissionAsync(topicname, actions, role,
authDataJson);
+ return provider.grantPermissionAsync(topicName, actions, role,
authDataJson);
+ }
+
+ /**
+ * Revoke authorization-action permission on a topic to the given client.
+ *
+ * @param topicName
+ * @param role
+ */
+ public CompletableFuture<Void> revokePermissionAsync(TopicName topicName,
String role) {
+ return provider.revokePermissionAsync(topicName, role);
}
/**
@@ -418,7 +439,7 @@ public class AuthorizationService {
/**
* Whether the authenticatedPrincipal and the originalPrincipal form a
valid pair. This method assumes that
* authenticatedPrincipal and originalPrincipal can be equal, as long as
they are not a proxy role. This use
- * case is relvant for the admin server because of the way the proxy
handles authentication. The binary protocol
+ * case is relevant for the admin server because of the way the proxy
handles authentication. The binary protocol
* should not use this method.
* @return true when roles are a valid combination and false when roles
are an invalid combination
*/
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 3f6d3819471..203f7fe4277 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -249,6 +249,33 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
});
}
+ @Override
+ public CompletableFuture<Void> revokePermissionAsync(TopicName topicName,
String role) {
+ return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly
mode");
+ }
+ return pulsarResources.getNamespaceResources()
+ .setPoliciesAsync(topicName.getNamespaceObject(), policies
-> {
+ policies.auth_policies.getTopicAuthentication()
+ .computeIfPresent(topicName.toString(), (k, v)
-> {
+ v.remove(role);
+ return null;
+ });
+ return policies;
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to revoke permissions for role
{} on topic {}", role, topicName, ex);
+ } else {
+ log.info("Successfully revoke permissions for role
{} on topic {}", role, topicName);
+ }
+ });
+ });
+ }
+
@Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName
namespaceName, Set<AuthAction> actions,
String role, String
authDataJson) {
@@ -274,6 +301,29 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
});
}
+ @Override
+ public CompletableFuture<Void> revokePermissionAsync(NamespaceName
namespaceName, String role) {
+ return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot do
read-write operations");
+ }
+ throw new IllegalStateException("policies are in readonly
mode");
+ }
+ return pulsarResources.getNamespaceResources()
+ .setPoliciesAsync(namespaceName, policies -> {
+
policies.auth_policies.getNamespaceAuthentication().remove(role);
+ return policies;
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to revoke permissions for role
{} namespace {}", role, namespaceName, ex);
+ } else {
+ log.info("Successfully revoke permissions for role
{} namespace {}", role, namespaceName);
+ }
+ });
+ });
+ }
+
@Override
public CompletableFuture<Void>
grantSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
Set<String> roles, String authDataJson) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0ce3b7df07d..01fa64b1bc6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -667,6 +667,54 @@ public class AuthorizationProducerConsumerTest extends
ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+ @Test
+ public void testRevokePermission() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ cleanup();
+
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ setup();
+
+ Authentication adminAuthentication = new
ClientAuthentication("superUser");
+
+ @Cleanup
+ PulsarAdmin admin = spy(
+
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
+
+ Authentication authentication = new ClientAuthentication(clientRole);
+
+ replacePulsarClient(PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+ .authentication(authentication));
+
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("public/default",
Sets.newHashSet("test"));
+
+ AuthorizationService authorizationService = new
AuthorizationService(conf, pulsar.getPulsarResources());
+ TopicName topicName = TopicName.get("persistent://public/default/t1");
+ NamespaceName namespaceName = NamespaceName.get("public/default");
+ String role = "test-role";
+ Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce,
AuthAction.consume);
+ Assert.assertFalse(authorizationService.canProduce(topicName, role,
null));
+ Assert.assertFalse(authorizationService.canConsume(topicName, role,
null, "sub1"));
+ authorizationService.grantPermissionAsync(topicName, actions, role,
"auth-json").get();
+ Assert.assertTrue(authorizationService.canProduce(topicName, role,
null));
+ Assert.assertTrue(authorizationService.canConsume(topicName, role,
null, "sub1"));
+
+ authorizationService.revokePermissionAsync(topicName, role).get();
+ Assert.assertFalse(authorizationService.canProduce(topicName, role,
null));
+ Assert.assertFalse(authorizationService.canConsume(topicName, role,
null, "sub1"));
+
+ authorizationService.grantPermissionAsync(namespaceName, actions,
role, null).get();
+
Assert.assertTrue(authorizationService.allowNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPIC, role, null).get());
+ authorizationService.revokePermissionAsync(namespaceName, role).get();
+
Assert.assertFalse(authorizationService.allowNamespaceOperationAsync(namespaceName,
NamespaceOperation.GET_TOPIC, role, null).get());
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
@Test
public void testAuthData() throws Exception {
log.info("-- Starting {} test --", methodName);