This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b334c4f637b [improve][broker] PIP-383: Support granting/revoking 
permissions for multiple topics (#23372)
b334c4f637b is described below

commit b334c4f637bdd32787494c16e9d34169f1a25812
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Oct 10 18:37:27 2024 +0800

    [improve][broker] PIP-383: Support granting/revoking permissions for 
multiple topics (#23372)
---
 .../authorization/AuthorizationProvider.java       | 13 ++++
 .../broker/authorization/AuthorizationService.java | 11 +++
 .../authorization/PulsarAuthorizationProvider.java | 78 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminResource.java  | 11 +++
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 75 +++++++++++++++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 44 ++++++++++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 59 ++++++++++++++++
 .../client/admin/GrantTopicPermissionOptions.java  | 36 ++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java | 28 ++++++++
 .../client/admin/RevokeTopicPermissionOptions.java | 32 +++++++++
 .../client/admin/internal/NamespacesImpl.java      | 26 ++++++++
 11 files changed, 413 insertions(+)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 7d25580ff92..ffb38f770a9 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -20,12 +20,15 @@ package org.apache.pulsar.broker.authorization;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -223,6 +226,16 @@ public interface AuthorizationProvider extends Closeable {
     CompletableFuture<Void> grantPermissionAsync(TopicName topicName, 
Set<AuthAction> actions, String role,
             String authDataJson);
 
+    default CompletableFuture<Void> 
grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+                String.format("grantPermissionAsync is not supported by the 
Authorization")));
+    }
+
+    default CompletableFuture<Void> 
revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
+        return FutureUtil.failedFuture(new IllegalStateException(
+                String.format("revokePermissionAsync is not supported by the 
Authorization")));
+    }
+
 
     /**
      * Revoke authorization-action permission on a topic to the given client.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index c121d93b9b7..2951eb1f297 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -32,6 +33,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationParameters;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -181,6 +184,14 @@ public class AuthorizationService {
         return provider.grantPermissionAsync(topicName, actions, role, 
authDataJson);
     }
 
+    public CompletableFuture<Void> 
grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
+        return provider.grantPermissionAsync(options);
+    }
+
+    public CompletableFuture<Void> 
revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
+        return provider.revokePermissionAsync(options);
+    }
+
     /**
      * Revoke authorization-action permission on a topic to the given client.
      *
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index a39c3d05607..0af63724cc8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -24,14 +24,18 @@ import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -251,6 +255,80 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         });
     }
 
+    public CompletableFuture<Void> 
grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
+        return checkNamespace(options.stream().map(o -> 
TopicName.get(o.getTopic()).getNamespace()))
+                .thenCompose(__ -> getPoliciesReadOnlyAsync())
+                .thenCompose(readonly -> {
+                    if (readonly) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Policies are read-only. Broker cannot 
do read-write operations");
+                        }
+                        throw new IllegalStateException("policies are in 
readonly mode");
+                    }
+                    TopicName topicName = 
TopicName.get(options.get(0).getTopic());
+                    return pulsarResources.getNamespaceResources()
+                            .setPoliciesAsync(topicName.getNamespaceObject(), 
policies -> {
+                                options.stream().forEach(o -> {
+                                    final String topicUri = 
TopicName.get(o.getTopic()).toString();
+                                    
policies.auth_policies.getTopicAuthentication()
+                                            .computeIfAbsent(topicUri, __ -> 
new HashMap<>())
+                                            .put(o.getRole(), o.getActions());
+                                });
+                                return policies;
+                            }).whenComplete((__, ex) -> {
+                                if (ex != null) {
+                                    log.error("Failed to grant permissions for 
{}", options);
+                                } else {
+                                    log.info("Successfully granted access for 
{}", options);
+                                }
+                            });
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> 
revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
+        return checkNamespace(options.stream().map(o -> 
TopicName.get(o.getTopic()).getNamespace()))
+                .thenCompose(__ -> getPoliciesReadOnlyAsync())
+                .thenCompose(readonly -> {
+                    if (readonly) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("Policies are read-only. Broker cannot 
do read-write operations");
+                        }
+                        throw new IllegalStateException("policies are in 
readonly mode");
+                    }
+                    TopicName topicName = 
TopicName.get(options.get(0).getTopic());
+                    return pulsarResources.getNamespaceResources()
+                            .setPoliciesAsync(topicName.getNamespaceObject(), 
policies -> {
+                                options.stream().forEach(o -> {
+                                    final String topicUri = 
TopicName.get(o.getTopic()).toString();
+                                    
policies.auth_policies.getTopicAuthentication()
+                                            .computeIfPresent(topicUri, 
(topicNameUri, roles) -> {
+                                                roles.remove(o.getRole());
+                                                if (roles.isEmpty()) {
+                                                    return  null;
+                                                }
+                                                return roles;
+                                            });
+                                });
+                                return policies;
+                            }).whenComplete((__, ex) -> {
+                                if (ex != null) {
+                                    log.error("Failed to revoke permissions 
for {}", options, ex);
+                                } else {
+                                    log.info("Successfully revoke permissions 
for {}", options);
+                                }
+                            });
+                 });
+    }
+
+    private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
+        boolean sameNamespace = namespaces.distinct().count() == 1;
+        if (!sameNamespace) {
+            throw new IllegalArgumentException("The namespace should be the 
same");
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
     @Override
     public CompletableFuture<Void> revokePermissionAsync(TopicName topicName, 
String role) {
         return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3268f07b13d..45772dc279b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -924,4 +924,15 @@ public abstract class AdminResource extends 
PulsarWebResource {
                     "The bucket must be specified for namespace offload.");
         }
     }
+
+    protected CompletableFuture<Void> internalCheckTopicExists(TopicName 
topicName) {
+        return pulsar().getNamespaceService().checkTopicExists(topicName)
+                .thenAccept(info -> {
+                    boolean exists = info.isExists();
+                    info.recycle();
+                    if (!exists) {
+                        throw new RestException(Status.NOT_FOUND, 
getTopicNotFoundErrorMessage(topicName.toString()));
+                    }
+                });
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 18c80d6bef4..d80e2487b4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.annotation.Nonnull;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
@@ -65,8 +66,10 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.NamedEntity;
@@ -613,6 +616,78 @@ public abstract class NamespacesBase extends AdminResource 
{
                 });
     }
 
+    protected CompletableFuture<Void> 
internalGrantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options) 
{
+        return checkNamespace(options.stream().map(o -> 
TopicName.get(o.getTopic()).getNamespace()))
+                .thenCompose(__ -> validateAdminAccessForTenantAsync(
+                        TopicName.get(options.get(0).getTopic()).getTenant())
+                ).thenCompose(__ -> 
internalCheckTopicExists(options.stream().map(o -> 
TopicName.get(o.getTopic()))))
+                .thenCompose(__ -> 
getAuthorizationService().grantPermissionAsync(options))
+                .thenAccept(unused -> log.info("[{}] Successfully granted 
access for {}", clientAppId(), options))
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    //The IllegalArgumentException and the 
IllegalStateException were historically thrown by the
+                    // grantPermissionAsync method, so we catch them here to 
ensure backwards compatibility.
+                    if (realCause instanceof 
MetadataStoreException.NotFoundException
+                            || realCause instanceof IllegalArgumentException) {
+                        log.warn("[{}] Failed to grant permissions for 
namespace {}: does not exist", clientAppId(),
+                                namespaceName, ex);
+                        throw new RestException(Status.NOT_FOUND, "Topic's 
namespace does not exist");
+                    } else if (realCause instanceof 
MetadataStoreException.BadVersionException
+                            || realCause instanceof IllegalStateException) {
+                        log.warn("[{}] Failed to grant permissions for 
namespace {}: {}",
+                                clientAppId(), namespaceName, 
ex.getCause().getMessage(), ex);
+                        throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+                    } else {
+                        log.error("[{}] Failed to grant permissions for 
namespace {}",
+                                clientAppId(), namespaceName, ex);
+                        throw new RestException(realCause);
+                    }
+                });
+    }
+
+    protected CompletableFuture<Void> internalRevokePermissionOnTopicsAsync(
+            List<RevokeTopicPermissionOptions> options) {
+        return checkNamespace(options.stream().map(o -> 
TopicName.get(o.getTopic()).getNamespace()))
+                .thenCompose(__ -> validateAdminAccessForTenantAsync(
+                        TopicName.get(options.get(0).getTopic()).getTenant()))
+                .thenCompose(__ -> 
internalCheckTopicExists(options.stream().map(o -> 
TopicName.get(o.getTopic()))))
+                .thenCompose(__ -> 
getAuthorizationService().revokePermissionAsync(options))
+                .thenAccept(unused -> log.info("[{}] Successfully revoke 
access for {}", clientAppId(), options))
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    //The IllegalArgumentException and the 
IllegalStateException were historically thrown by the
+                    // grantPermissionAsync method, so we catch them here to 
ensure backwards compatibility.
+                    if (realCause instanceof 
MetadataStoreException.NotFoundException
+                            || realCause instanceof IllegalArgumentException) {
+                        log.warn("[{}] Failed to revoke permissions for 
namespace {}: does not exist", clientAppId(),
+                                namespaceName, ex);
+                        throw new RestException(Status.NOT_FOUND, "Topic's 
namespace does not exist");
+                    } else if (realCause instanceof 
MetadataStoreException.BadVersionException
+                            || realCause instanceof IllegalStateException) {
+                        log.warn("[{}] Failed to revoke permissions for 
namespace {}: {}",
+                                clientAppId(), namespaceName, 
ex.getCause().getMessage(), ex);
+                        throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+                    } else {
+                        log.error("[{}] Failed to revoke permissions for 
namespace {}",
+                                clientAppId(), namespaceName, ex);
+                        throw new RestException(realCause);
+                    }
+                });
+    }
+
+    private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
+        boolean sameNamespace = namespaces.distinct().count() == 1;
+        if (!sameNamespace) {
+            throw new RestException(Status.BAD_REQUEST, "The namespace should 
be the same");
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    private CompletableFuture<Void> internalCheckTopicExists(Stream<TopicName> 
topicNameStream) {
+        List<TopicName> topicNames = 
topicNameStream.collect(Collectors.toList());
+        return CompletableFuture.allOf(topicNames.stream().map(topic -> 
internalCheckTopicExists(topic))
+                .toArray(CompletableFuture[]::new));
+    }
 
     protected CompletableFuture<Void> 
internalGrantPermissionOnSubscriptionAsync(String subscription,
                                                                                
 Set<String> roles) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 54cceaf09e9..36150ee21b3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -50,6 +50,8 @@ import javax.ws.rs.core.StreamingOutput;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -314,6 +316,48 @@ public class Namespaces extends NamespacesBase {
                 });
     }
 
+    @POST
+    @Path("/grantPermissionsOnTopics")
+    @ApiOperation(value = "Grant new permissions to a role on multi-topics.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void grantPermissionsOnTopics(@Suspended final AsyncResponse 
asyncResponse,
+                                 List<GrantTopicPermissionOptions> options) {
+        internalGrantPermissionOnTopicsAsync(options)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to grant permissions {}",
+                            clientAppId(), options, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    @POST
+    @Path("/revokePermissionsOnTopics")
+    @ApiOperation(value = "Revoke new permissions to a role on multi-topics.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 204, message = "Operation successful"),
+            @ApiResponse(code = 401, message = "Don't have permission to 
administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't 
exit"),
+            @ApiResponse(code = 500, message = "Internal server error") })
+    public void revokePermissionsOnTopics(@Suspended final AsyncResponse 
asyncResponse,
+                                         List<RevokeTopicPermissionOptions> 
options) {
+        internalRevokePermissionOnTopicsAsync(options)
+                .thenAccept(__ -> 
asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to revoke permissions {}",
+                            clientAppId(), options, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
     @POST
     @Path("/{property}/{namespace}/permissions/subscription/{subscription}")
     @ApiOperation(hidden = true, value = "Grant a new permission to roles for 
a subscription."
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 900babbecf4..3f5ee721a7e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -87,11 +87,13 @@ import 
org.apache.pulsar.broker.service.plugin.EntryFilterTest;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
 import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
 import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.Mode;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.client.admin.Topics.QueryParam;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -3685,4 +3687,61 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
                 List.of(".*", "broker.*")
         );
     }
+
+    @Test
+    public void testGrantAndRevokePermissions() throws Exception {
+
+        String namespace = newUniqueName(defaultTenant + "/") + 
"-unload-test-";
+        String namespace2 = newUniqueName(defaultTenant + "/") + 
"-unload-test-";
+        admin.namespaces().createNamespace(namespace, Set.of("test"));
+        admin.namespaces().createNamespace(namespace2, Set.of("test"));
+        //
+        final String topic1 = "persistent://" + namespace + "/test1";
+        final String topic2 = "persistent://" + namespace + "/test2";
+        final String topic3 = "non-persistent://" + namespace + "/test3";
+        final String topic4 = "persistent://" + namespace2 + "/test4";;
+
+        admin.topics().createPartitionedTopic(topic1, 3);
+        admin.topics().createPartitionedTopic(topic2, 3);
+        admin.topics().createPartitionedTopic(topic3, 3);
+        admin.topics().createPartitionedTopic(topic4, 3);
+        pulsarClient.newProducer().topic(topic1).create().close();
+        pulsarClient.newProducer().topic(topic2).create().close();
+        pulsarClient.newProducer().topic(topic3).create().close();
+        pulsarClient.newProducer().topic(topic4).create().close();
+
+        List<GrantTopicPermissionOptions> grantPermissionOptions = new 
ArrayList<>();
+        
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic1).role("role1").actions(Set.of(AuthAction.produce)).build());
+        
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic4).role("role4").actions(Set.of(AuthAction.produce)).build());
+        try {
+            admin.namespaces().grantPermissionOnTopics(grantPermissionOptions);
+            fail("Should go here, because there are two namespaces");
+        } catch (Exception ex) {
+            Assert.assertTrue(ex != null);
+        }
+        grantPermissionOptions.clear();
+        
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic1).role("role1").actions(Set.of(AuthAction.produce)).build());
+        
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic2).role("role2").actions(Set.of(AuthAction.consume)).build());
+        
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic3).role("role3").actions(Set.of(AuthAction.produce,
 AuthAction.consume)).build());
+        admin.namespaces().grantPermissionOnTopics(grantPermissionOptions);
+
+        final Map<String, Set<AuthAction>> permissions1 = 
admin.topics().getPermissions(topic1);
+        final Map<String, Set<AuthAction>> permissions2 = 
admin.topics().getPermissions(topic2);
+        final Map<String, Set<AuthAction>> permissions3 = 
admin.topics().getPermissions(topic3);
+
+        Assert.assertEquals(permissions1.get("role1"), 
Set.of(AuthAction.produce));
+        Assert.assertEquals(permissions2.get("role2"), 
Set.of(AuthAction.consume));
+        Assert.assertEquals(permissions3.get("role3"), 
Set.of(AuthAction.produce, AuthAction.consume));
+        //
+        List<RevokeTopicPermissionOptions> revokePermissionOptions = new 
ArrayList<>();
+        
revokePermissionOptions.add(RevokeTopicPermissionOptions.builder().topic(topic1).role("role1").build());
+        
revokePermissionOptions.add(RevokeTopicPermissionOptions.builder().topic(topic2).role("role2").build());
+        admin.namespaces().revokePermissionOnTopics(revokePermissionOptions);
+
+        final Map<String, Set<AuthAction>> permissions11 = 
admin.topics().getPermissions(topic1);
+        final Map<String, Set<AuthAction>> permissions22 = 
admin.topics().getPermissions(topic2);
+
+        Assert.assertTrue(permissions11.isEmpty());
+        Assert.assertTrue(permissions22.isEmpty());
+    }
 }
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java
new file mode 100644
index 00000000000..e365a086a77
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import java.util.Set;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.AuthAction;
+
+@Data
+@Builder
+public class GrantTopicPermissionOptions {
+
+    private final String topic;
+
+    private final String role;
+
+    private final Set<AuthAction> actions;
+
+}
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 65124a6a76a..28ad852064b 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -703,6 +703,34 @@ public interface Namespaces {
      */
     CompletableFuture<Void> grantPermissionOnNamespaceAsync(String namespace, 
String role, Set<AuthAction> actions);
 
+    /**
+     * Grant permissions on topics asynchronously.
+     * @param options
+     * @return
+     */
+    CompletableFuture<Void> 
grantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options);
+
+    /**
+     * Grant permissions on topics.
+     * @param options
+     * @throws PulsarAdminException
+     */
+    void grantPermissionOnTopics(List<GrantTopicPermissionOptions> options) 
throws PulsarAdminException;
+
+    /**
+     * Revoke permissions on topics asynchronously.
+     * @param options
+     * @return
+     */
+    CompletableFuture<Void> 
revokePermissionOnTopicsAsync(List<RevokeTopicPermissionOptions> options);
+
+    /**
+     * Revoke permissions on topics.
+     * @param options
+     * @throws PulsarAdminException
+     */
+    void revokePermissionOnTopics(List<RevokeTopicPermissionOptions> options) 
throws PulsarAdminException;
+
     /**
      * Revoke permissions on a namespace.
      * <p/>
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java
new file mode 100644
index 00000000000..38e33c966b2
--- /dev/null
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class RevokeTopicPermissionOptions {
+
+    private final String topic;
+
+    private final String role;
+
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 7d41c7203d2..7695abdd480 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -28,9 +28,11 @@ import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
 import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -288,6 +290,30 @@ public class NamespacesImpl extends BaseResource 
implements Namespaces {
         return asyncPostRequest(path, Entity.entity(actions, 
MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public CompletableFuture<Void> 
grantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options) {
+        final WebTarget base = adminV2Namespaces;
+        WebTarget path = base.path("/grantPermissionsOnTopics");
+        return asyncPostRequest(path, Entity.entity(options, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void grantPermissionOnTopics(List<GrantTopicPermissionOptions> 
options) throws PulsarAdminException {
+        sync(() -> grantPermissionOnTopicsAsync(options));
+    }
+
+    @Override
+    public CompletableFuture<Void> 
revokePermissionOnTopicsAsync(List<RevokeTopicPermissionOptions> options) {
+        final WebTarget base = adminV2Namespaces;
+        WebTarget path = base.path("/revokePermissionsOnTopics");
+        return asyncPostRequest(path, Entity.entity(options, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void revokePermissionOnTopics(List<RevokeTopicPermissionOptions> 
options) throws PulsarAdminException {
+        sync(() -> revokePermissionOnTopicsAsync(options));
+    }
+
     @Override
     public void revokePermissionsOnNamespace(String namespace, String role) 
throws PulsarAdminException {
         sync(() -> revokePermissionsOnNamespaceAsync(namespace, role));


Reply via email to