This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e220a5d04ae [improve][broker] Support revoking permission for 
AuthorizationProvider (#20456)
e220a5d04ae is described below

commit e220a5d04ae16d1b8dfd7e35cdddf43f3a43fe86
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jun 2 14:16:44 2023 +0800

    [improve][broker] Support revoking permission for AuthorizationProvider 
(#20456)
---
 .../authorization/AuthorizationProvider.java       | 29 ++++++++++++-
 .../broker/authorization/AuthorizationService.java | 29 +++++++++++--
 .../authorization/PulsarAuthorizationProvider.java | 50 ++++++++++++++++++++++
 .../api/AuthorizationProducerConsumerTest.java     | 48 +++++++++++++++++++++
 4 files changed, 150 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 67e096bee63..b54b2089e1e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -185,6 +185,18 @@ public interface AuthorizationProvider extends Closeable {
     CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, 
Set<AuthAction> actions, String role,
             String authDataJson);
 
+    /**
+     * Revoke authorization-action permission on a namespace to the given 
client.
+     * @param namespace
+     * @param role
+     * @return CompletableFuture<Void>
+     */
+    default CompletableFuture<Void> revokePermissionAsync(NamespaceName 
namespace, String role) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+                String.format("revokePermissionAsync on namespace %s is not 
supported by the Authorization",
+                        namespace)));
+    }
+
     /**
      * Grant permission to roles that can access subscription-admin api.
      *
@@ -193,7 +205,7 @@ public interface AuthorizationProvider extends Closeable {
      * @param roles
      * @param authDataJson
      *            additional authdata in json format
-     * @return
+     * @return CompletableFuture<Void>
      */
     CompletableFuture<Void> grantSubscriptionPermissionAsync(NamespaceName 
namespace, String subscriptionName,
                                                              Set<String> 
roles, String authDataJson);
@@ -203,7 +215,7 @@ public interface AuthorizationProvider extends Closeable {
      * @param namespace
      * @param subscriptionName
      * @param role
-     * @return
+     * @return CompletableFuture<Void>
      */
     CompletableFuture<Void> revokeSubscriptionPermissionAsync(NamespaceName 
namespace, String subscriptionName,
             String role, String authDataJson);
@@ -226,6 +238,19 @@ public interface AuthorizationProvider extends Closeable {
     CompletableFuture<Void> grantPermissionAsync(TopicName topicName, 
Set<AuthAction> actions, String role,
             String authDataJson);
 
+
+    /**
+     * Revoke authorization-action permission on a topic to the given client.
+     * @param topicName
+     * @param role
+     * @return CompletableFuture<Void>
+     */
+    default CompletableFuture<Void> revokePermissionAsync(TopicName topicName, 
String role) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+                String.format("revokePermissionAsync on topicName %s is not 
supported by the Authorization",
+                        topicName)));
+    }
+
     /**
      * Check if a given <tt>role</tt> is allowed to execute a given 
<tt>operation</tt> on the tenant.
      *
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 6f303e2117f..29abcc1eee4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -123,6 +123,17 @@ public class AuthorizationService {
         return provider.grantPermissionAsync(namespace, actions, role, 
authDataJson);
     }
 
+    /**
+     *
+     * Revoke authorization-action permission on a namespace to the given 
client.
+     *
+     * @param namespace
+     * @param role
+     */
+    public CompletableFuture<Void> revokePermissionAsync(NamespaceName 
namespace, String role) {
+        return provider.revokePermissionAsync(namespace, role);
+    }
+
     /**
      * Grant permission to roles that can access subscription-admin api.
      *
@@ -157,16 +168,26 @@ public class AuthorizationService {
      * NOTE: used to complete with {@link IllegalArgumentException} when 
namespace not found or with
      * {@link IllegalStateException} when failed to grant permission.
      *
-     * @param topicname
+     * @param topicName
      * @param role
      * @param authDataJson
      *            additional authdata in json for targeted authorization 
provider
      * @completesWith null when the permissions are updated successfully.
      * @completesWith {@link MetadataStoreException} when the MetadataStore is 
not updated.
      */
-    public CompletableFuture<Void> grantPermissionAsync(TopicName topicname, 
Set<AuthAction> actions, String role,
+    public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, 
Set<AuthAction> actions, String role,
                                                         String authDataJson) {
-        return provider.grantPermissionAsync(topicname, actions, role, 
authDataJson);
+        return provider.grantPermissionAsync(topicName, actions, role, 
authDataJson);
+    }
+
+    /**
+     * Revoke authorization-action permission on a topic to the given client.
+     *
+     * @param topicName
+     * @param role
+     */
+    public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, 
String role) {
+        return provider.revokePermissionAsync(topicName, role);
     }
 
     /**
@@ -418,7 +439,7 @@ public class AuthorizationService {
     /**
      * Whether the authenticatedPrincipal and the originalPrincipal form a 
valid pair. This method assumes that
      * authenticatedPrincipal and originalPrincipal can be equal, as long as 
they are not a proxy role. This use
-     * case is relvant for the admin server because of the way the proxy 
handles authentication. The binary protocol
+     * case is relevant for the admin server because of the way the proxy 
handles authentication. The binary protocol
      * should not use this method.
      * @return true when roles are a valid combination and false when roles 
are an invalid combination
      */
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 3f6d3819471..203f7fe4277 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -249,6 +249,33 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         });
     }
 
+    @Override
+    public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, 
String role) {
+        return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+            if (readonly) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Policies are read-only. Broker cannot do 
read-write operations");
+                }
+                throw new IllegalStateException("policies are in readonly 
mode");
+            }
+            return pulsarResources.getNamespaceResources()
+                    .setPoliciesAsync(topicName.getNamespaceObject(), policies 
-> {
+                        policies.auth_policies.getTopicAuthentication()
+                                .computeIfPresent(topicName.toString(), (k, v) 
-> {
+                                        v.remove(role);
+                                        return null;
+                                });
+                        return policies;
+                    }).whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            log.error("Failed to revoke permissions for role 
{} on topic {}", role, topicName, ex);
+                        } else {
+                            log.info("Successfully revoke permissions for role 
{} on topic {}", role, topicName);
+                        }
+                    });
+        });
+    }
+
     @Override
     public CompletableFuture<Void> grantPermissionAsync(NamespaceName 
namespaceName, Set<AuthAction> actions,
                                                         String role, String 
authDataJson) {
@@ -274,6 +301,29 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         });
     }
 
+    @Override
+    public CompletableFuture<Void> revokePermissionAsync(NamespaceName 
namespaceName, String role) {
+        return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
+            if (readonly) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Policies are read-only. Broker cannot do 
read-write operations");
+                }
+                throw new IllegalStateException("policies are in readonly 
mode");
+            }
+            return pulsarResources.getNamespaceResources()
+                    .setPoliciesAsync(namespaceName, policies -> {
+                        
policies.auth_policies.getNamespaceAuthentication().remove(role);
+                        return policies;
+                    }).whenComplete((__, ex) -> {
+                        if (ex != null) {
+                            log.error("Failed to revoke permissions for role 
{} namespace {}", role, namespaceName, ex);
+                        } else {
+                            log.info("Successfully revoke permissions for role 
{} namespace {}", role, namespaceName);
+                        }
+                    });
+        });
+    }
+
     @Override
     public CompletableFuture<Void> 
grantSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
                                                                     
Set<String> roles, String authDataJson) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 0ce3b7df07d..01fa64b1bc6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -667,6 +667,54 @@ public class AuthorizationProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testRevokePermission() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        cleanup();
+        
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        setup();
+
+        Authentication adminAuthentication = new 
ClientAuthentication("superUser");
+
+        @Cleanup
+        PulsarAdmin admin = spy(
+                
PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).authentication(adminAuthentication).build());
+
+        Authentication authentication = new ClientAuthentication(clientRole);
+
+        replacePulsarClient(PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(authentication));
+
+        admin.clusters().createCluster("test", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("public/default", 
Sets.newHashSet("test"));
+
+        AuthorizationService authorizationService = new 
AuthorizationService(conf, pulsar.getPulsarResources());
+        TopicName topicName = TopicName.get("persistent://public/default/t1");
+        NamespaceName namespaceName = NamespaceName.get("public/default");
+        String role = "test-role";
+        Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, 
AuthAction.consume);
+        Assert.assertFalse(authorizationService.canProduce(topicName, role, 
null));
+        Assert.assertFalse(authorizationService.canConsume(topicName, role, 
null, "sub1"));
+        authorizationService.grantPermissionAsync(topicName, actions, role, 
"auth-json").get();
+        Assert.assertTrue(authorizationService.canProduce(topicName, role, 
null));
+        Assert.assertTrue(authorizationService.canConsume(topicName, role, 
null, "sub1"));
+
+        authorizationService.revokePermissionAsync(topicName, role).get();
+        Assert.assertFalse(authorizationService.canProduce(topicName, role, 
null));
+        Assert.assertFalse(authorizationService.canConsume(topicName, role, 
null, "sub1"));
+
+        authorizationService.grantPermissionAsync(namespaceName, actions, 
role, null).get();
+        
Assert.assertTrue(authorizationService.allowNamespaceOperationAsync(namespaceName,
 NamespaceOperation.GET_TOPIC, role, null).get());
+        authorizationService.revokePermissionAsync(namespaceName, role).get();
+        
Assert.assertFalse(authorizationService.allowNamespaceOperationAsync(namespaceName,
 NamespaceOperation.GET_TOPIC, role, null).get());
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
     @Test
     public void testAuthData() throws Exception {
         log.info("-- Starting {} test --", methodName);

Reply via email to