This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b334c4f637b [improve][broker] PIP-383: Support granting/revoking
permissions for multiple topics (#23372)
b334c4f637b is described below
commit b334c4f637bdd32787494c16e9d34169f1a25812
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Oct 10 18:37:27 2024 +0800
[improve][broker] PIP-383: Support granting/revoking permissions for
multiple topics (#23372)
---
.../authorization/AuthorizationProvider.java | 13 ++++
.../broker/authorization/AuthorizationService.java | 11 +++
.../authorization/PulsarAuthorizationProvider.java | 78 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminResource.java | 11 +++
.../pulsar/broker/admin/impl/NamespacesBase.java | 75 +++++++++++++++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 44 ++++++++++++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 59 ++++++++++++++++
.../client/admin/GrantTopicPermissionOptions.java | 36 ++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 28 ++++++++
.../client/admin/RevokeTopicPermissionOptions.java | 32 +++++++++
.../client/admin/internal/NamespacesImpl.java | 26 ++++++++
11 files changed, 413 insertions(+)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index 7d25580ff92..ffb38f770a9 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -20,12 +20,15 @@ package org.apache.pulsar.broker.authorization;
import java.io.Closeable;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -223,6 +226,16 @@ public interface AuthorizationProvider extends Closeable {
CompletableFuture<Void> grantPermissionAsync(TopicName topicName,
Set<AuthAction> actions, String role,
String authDataJson);
+ default CompletableFuture<Void>
grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("grantPermissionAsync is not supported by the
Authorization")));
+ }
+
+ default CompletableFuture<Void>
revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
+ return FutureUtil.failedFuture(new IllegalStateException(
+ String.format("revokePermissionAsync is not supported by the
Authorization")));
+ }
+
/**
* Revoke authorization-action permission on a topic to the given client.
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index c121d93b9b7..2951eb1f297 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.net.SocketAddress;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -32,6 +33,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -181,6 +184,14 @@ public class AuthorizationService {
return provider.grantPermissionAsync(topicName, actions, role,
authDataJson);
}
+ public CompletableFuture<Void>
grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
+ return provider.grantPermissionAsync(options);
+ }
+
+ public CompletableFuture<Void>
revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
+ return provider.revokePermissionAsync(options);
+ }
+
/**
* Revoke authorization-action permission on a topic to the given client.
*
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index a39c3d05607..0af63724cc8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -24,14 +24,18 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -251,6 +255,80 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
});
}
+ public CompletableFuture<Void>
grantPermissionAsync(List<GrantTopicPermissionOptions> options) {
+ return checkNamespace(options.stream().map(o ->
TopicName.get(o.getTopic()).getNamespace()))
+ .thenCompose(__ -> getPoliciesReadOnlyAsync())
+ .thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot
do read-write operations");
+ }
+ throw new IllegalStateException("policies are in
readonly mode");
+ }
+ TopicName topicName =
TopicName.get(options.get(0).getTopic());
+ return pulsarResources.getNamespaceResources()
+ .setPoliciesAsync(topicName.getNamespaceObject(),
policies -> {
+ options.stream().forEach(o -> {
+ final String topicUri =
TopicName.get(o.getTopic()).toString();
+
policies.auth_policies.getTopicAuthentication()
+ .computeIfAbsent(topicUri, __ ->
new HashMap<>())
+ .put(o.getRole(), o.getActions());
+ });
+ return policies;
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to grant permissions for
{}", options);
+ } else {
+ log.info("Successfully granted access for
{}", options);
+ }
+ });
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void>
revokePermissionAsync(List<RevokeTopicPermissionOptions> options) {
+ return checkNamespace(options.stream().map(o ->
TopicName.get(o.getTopic()).getNamespace()))
+ .thenCompose(__ -> getPoliciesReadOnlyAsync())
+ .thenCompose(readonly -> {
+ if (readonly) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies are read-only. Broker cannot
do read-write operations");
+ }
+ throw new IllegalStateException("policies are in
readonly mode");
+ }
+ TopicName topicName =
TopicName.get(options.get(0).getTopic());
+ return pulsarResources.getNamespaceResources()
+ .setPoliciesAsync(topicName.getNamespaceObject(),
policies -> {
+ options.stream().forEach(o -> {
+ final String topicUri =
TopicName.get(o.getTopic()).toString();
+
policies.auth_policies.getTopicAuthentication()
+ .computeIfPresent(topicUri,
(topicNameUri, roles) -> {
+ roles.remove(o.getRole());
+ if (roles.isEmpty()) {
+ return null;
+ }
+ return roles;
+ });
+ });
+ return policies;
+ }).whenComplete((__, ex) -> {
+ if (ex != null) {
+ log.error("Failed to revoke permissions
for {}", options, ex);
+ } else {
+ log.info("Successfully revoke permissions
for {}", options);
+ }
+ });
+ });
+ }
+
+ private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
+ boolean sameNamespace = namespaces.distinct().count() == 1;
+ if (!sameNamespace) {
+ throw new IllegalArgumentException("The namespace should be the
same");
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
@Override
public CompletableFuture<Void> revokePermissionAsync(TopicName topicName,
String role) {
return getPoliciesReadOnlyAsync().thenCompose(readonly -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 3268f07b13d..45772dc279b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -924,4 +924,15 @@ public abstract class AdminResource extends
PulsarWebResource {
"The bucket must be specified for namespace offload.");
}
}
+
+ protected CompletableFuture<Void> internalCheckTopicExists(TopicName
topicName) {
+ return pulsar().getNamespaceService().checkTopicExists(topicName)
+ .thenAccept(info -> {
+ boolean exists = info.isExists();
+ info.recycle();
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
+ }
+ });
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 18c80d6bef4..d80e2487b4f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
@@ -65,8 +66,10 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamedEntity;
@@ -613,6 +616,78 @@ public abstract class NamespacesBase extends AdminResource
{
});
}
+ protected CompletableFuture<Void>
internalGrantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options)
{
+ return checkNamespace(options.stream().map(o ->
TopicName.get(o.getTopic()).getNamespace()))
+ .thenCompose(__ -> validateAdminAccessForTenantAsync(
+ TopicName.get(options.get(0).getTopic()).getTenant())
+ ).thenCompose(__ ->
internalCheckTopicExists(options.stream().map(o ->
TopicName.get(o.getTopic()))))
+ .thenCompose(__ ->
getAuthorizationService().grantPermissionAsync(options))
+ .thenAccept(unused -> log.info("[{}] Successfully granted
access for {}", clientAppId(), options))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here to
ensure backwards compatibility.
+ if (realCause instanceof
MetadataStoreException.NotFoundException
+ || realCause instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to grant permissions for
namespace {}: does not exist", clientAppId(),
+ namespaceName, ex);
+ throw new RestException(Status.NOT_FOUND, "Topic's
namespace does not exist");
+ } else if (realCause instanceof
MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException) {
+ log.warn("[{}] Failed to grant permissions for
namespace {}: {}",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } else {
+ log.error("[{}] Failed to grant permissions for
namespace {}",
+ clientAppId(), namespaceName, ex);
+ throw new RestException(realCause);
+ }
+ });
+ }
+
+ protected CompletableFuture<Void> internalRevokePermissionOnTopicsAsync(
+ List<RevokeTopicPermissionOptions> options) {
+ return checkNamespace(options.stream().map(o ->
TopicName.get(o.getTopic()).getNamespace()))
+ .thenCompose(__ -> validateAdminAccessForTenantAsync(
+ TopicName.get(options.get(0).getTopic()).getTenant()))
+ .thenCompose(__ ->
internalCheckTopicExists(options.stream().map(o ->
TopicName.get(o.getTopic()))))
+ .thenCompose(__ ->
getAuthorizationService().revokePermissionAsync(options))
+ .thenAccept(unused -> log.info("[{}] Successfully revoke
access for {}", clientAppId(), options))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here to
ensure backwards compatibility.
+ if (realCause instanceof
MetadataStoreException.NotFoundException
+ || realCause instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to revoke permissions for
namespace {}: does not exist", clientAppId(),
+ namespaceName, ex);
+ throw new RestException(Status.NOT_FOUND, "Topic's
namespace does not exist");
+ } else if (realCause instanceof
MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException) {
+ log.warn("[{}] Failed to revoke permissions for
namespace {}: {}",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } else {
+ log.error("[{}] Failed to revoke permissions for
namespace {}",
+ clientAppId(), namespaceName, ex);
+ throw new RestException(realCause);
+ }
+ });
+ }
+
+ private CompletableFuture<Void> checkNamespace(Stream<String> namespaces) {
+ boolean sameNamespace = namespaces.distinct().count() == 1;
+ if (!sameNamespace) {
+ throw new RestException(Status.BAD_REQUEST, "The namespace should
be the same");
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private CompletableFuture<Void> internalCheckTopicExists(Stream<TopicName>
topicNameStream) {
+ List<TopicName> topicNames =
topicNameStream.collect(Collectors.toList());
+ return CompletableFuture.allOf(topicNames.stream().map(topic ->
internalCheckTopicExists(topic))
+ .toArray(CompletableFuture[]::new));
+ }
protected CompletableFuture<Void>
internalGrantPermissionOnSubscriptionAsync(String subscription,
Set<String> roles) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 54cceaf09e9..36150ee21b3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -50,6 +50,8 @@ import javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -314,6 +316,48 @@ public class Namespaces extends NamespacesBase {
});
}
+ @POST
+ @Path("/grantPermissionsOnTopics")
+ @ApiOperation(value = "Grant new permissions to a role on multi-topics.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void grantPermissionsOnTopics(@Suspended final AsyncResponse
asyncResponse,
+ List<GrantTopicPermissionOptions> options) {
+ internalGrantPermissionOnTopicsAsync(options)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to grant permissions {}",
+ clientAppId(), options, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ @POST
+ @Path("/revokePermissionsOnTopics")
+ @ApiOperation(value = "Revoke new permissions to a role on multi-topics.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 204, message = "Operation successful"),
+ @ApiResponse(code = 401, message = "Don't have permission to
administrate resources on this tenant"),
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "tenant/namespace/topic doesn't
exit"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ public void revokePermissionsOnTopics(@Suspended final AsyncResponse
asyncResponse,
+ List<RevokeTopicPermissionOptions>
options) {
+ internalRevokePermissionOnTopicsAsync(options)
+ .thenAccept(__ ->
asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to revoke permissions {}",
+ clientAppId(), options, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
@POST
@Path("/{property}/{namespace}/permissions/subscription/{subscription}")
@ApiOperation(hidden = true, value = "Grant a new permission to roles for
a subscription."
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 900babbecf4..3f5ee721a7e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -87,11 +87,13 @@ import
org.apache.pulsar.broker.service.plugin.EntryFilterTest;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.testcontext.MockEntryFilterProvider;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Mode;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.admin.Topics.QueryParam;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -3685,4 +3687,61 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
List.of(".*", "broker.*")
);
}
+
+ @Test
+ public void testGrantAndRevokePermissions() throws Exception {
+
+ String namespace = newUniqueName(defaultTenant + "/") +
"-unload-test-";
+ String namespace2 = newUniqueName(defaultTenant + "/") +
"-unload-test-";
+ admin.namespaces().createNamespace(namespace, Set.of("test"));
+ admin.namespaces().createNamespace(namespace2, Set.of("test"));
+ //
+ final String topic1 = "persistent://" + namespace + "/test1";
+ final String topic2 = "persistent://" + namespace + "/test2";
+ final String topic3 = "non-persistent://" + namespace + "/test3";
+ final String topic4 = "persistent://" + namespace2 + "/test4";;
+
+ admin.topics().createPartitionedTopic(topic1, 3);
+ admin.topics().createPartitionedTopic(topic2, 3);
+ admin.topics().createPartitionedTopic(topic3, 3);
+ admin.topics().createPartitionedTopic(topic4, 3);
+ pulsarClient.newProducer().topic(topic1).create().close();
+ pulsarClient.newProducer().topic(topic2).create().close();
+ pulsarClient.newProducer().topic(topic3).create().close();
+ pulsarClient.newProducer().topic(topic4).create().close();
+
+ List<GrantTopicPermissionOptions> grantPermissionOptions = new
ArrayList<>();
+
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic1).role("role1").actions(Set.of(AuthAction.produce)).build());
+
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic4).role("role4").actions(Set.of(AuthAction.produce)).build());
+ try {
+ admin.namespaces().grantPermissionOnTopics(grantPermissionOptions);
+ fail("Should go here, because there are two namespaces");
+ } catch (Exception ex) {
+ Assert.assertTrue(ex != null);
+ }
+ grantPermissionOptions.clear();
+
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic1).role("role1").actions(Set.of(AuthAction.produce)).build());
+
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic2).role("role2").actions(Set.of(AuthAction.consume)).build());
+
grantPermissionOptions.add(GrantTopicPermissionOptions.builder().topic(topic3).role("role3").actions(Set.of(AuthAction.produce,
AuthAction.consume)).build());
+ admin.namespaces().grantPermissionOnTopics(grantPermissionOptions);
+
+ final Map<String, Set<AuthAction>> permissions1 =
admin.topics().getPermissions(topic1);
+ final Map<String, Set<AuthAction>> permissions2 =
admin.topics().getPermissions(topic2);
+ final Map<String, Set<AuthAction>> permissions3 =
admin.topics().getPermissions(topic3);
+
+ Assert.assertEquals(permissions1.get("role1"),
Set.of(AuthAction.produce));
+ Assert.assertEquals(permissions2.get("role2"),
Set.of(AuthAction.consume));
+ Assert.assertEquals(permissions3.get("role3"),
Set.of(AuthAction.produce, AuthAction.consume));
+ //
+ List<RevokeTopicPermissionOptions> revokePermissionOptions = new
ArrayList<>();
+
revokePermissionOptions.add(RevokeTopicPermissionOptions.builder().topic(topic1).role("role1").build());
+
revokePermissionOptions.add(RevokeTopicPermissionOptions.builder().topic(topic2).role("role2").build());
+ admin.namespaces().revokePermissionOnTopics(revokePermissionOptions);
+
+ final Map<String, Set<AuthAction>> permissions11 =
admin.topics().getPermissions(topic1);
+ final Map<String, Set<AuthAction>> permissions22 =
admin.topics().getPermissions(topic2);
+
+ Assert.assertTrue(permissions11.isEmpty());
+ Assert.assertTrue(permissions22.isEmpty());
+ }
}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java
new file mode 100644
index 00000000000..e365a086a77
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/GrantTopicPermissionOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import java.util.Set;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.AuthAction;
+
+@Data
+@Builder
+public class GrantTopicPermissionOptions {
+
+ private final String topic;
+
+ private final String role;
+
+ private final Set<AuthAction> actions;
+
+}
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 65124a6a76a..28ad852064b 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -703,6 +703,34 @@ public interface Namespaces {
*/
CompletableFuture<Void> grantPermissionOnNamespaceAsync(String namespace,
String role, Set<AuthAction> actions);
+ /**
+ * Grant permissions on topics asynchronously.
+ * @param options
+ * @return
+ */
+ CompletableFuture<Void>
grantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options);
+
+ /**
+ * Grant permissions on topics.
+ * @param options
+ * @throws PulsarAdminException
+ */
+ void grantPermissionOnTopics(List<GrantTopicPermissionOptions> options)
throws PulsarAdminException;
+
+ /**
+ * Revoke permissions on topics asynchronously.
+ * @param options
+ * @return
+ */
+ CompletableFuture<Void>
revokePermissionOnTopicsAsync(List<RevokeTopicPermissionOptions> options);
+
+ /**
+ * Revoke permissions on topics.
+ * @param options
+ * @throws PulsarAdminException
+ */
+ void revokePermissionOnTopics(List<RevokeTopicPermissionOptions> options)
throws PulsarAdminException;
+
/**
* Revoke permissions on a namespace.
* <p/>
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java
new file mode 100644
index 00000000000..38e33c966b2
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/RevokeTopicPermissionOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class RevokeTopicPermissionOptions {
+
+ private final String topic;
+
+ private final String role;
+
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 7d41c7203d2..7695abdd480 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -28,9 +28,11 @@ import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.GrantTopicPermissionOptions;
import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -288,6 +290,30 @@ public class NamespacesImpl extends BaseResource
implements Namespaces {
return asyncPostRequest(path, Entity.entity(actions,
MediaType.APPLICATION_JSON));
}
+ @Override
+ public CompletableFuture<Void>
grantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options) {
+ final WebTarget base = adminV2Namespaces;
+ WebTarget path = base.path("/grantPermissionsOnTopics");
+ return asyncPostRequest(path, Entity.entity(options,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void grantPermissionOnTopics(List<GrantTopicPermissionOptions>
options) throws PulsarAdminException {
+ sync(() -> grantPermissionOnTopicsAsync(options));
+ }
+
+ @Override
+ public CompletableFuture<Void>
revokePermissionOnTopicsAsync(List<RevokeTopicPermissionOptions> options) {
+ final WebTarget base = adminV2Namespaces;
+ WebTarget path = base.path("/revokePermissionsOnTopics");
+ return asyncPostRequest(path, Entity.entity(options,
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void revokePermissionOnTopics(List<RevokeTopicPermissionOptions>
options) throws PulsarAdminException {
+ sync(() -> revokePermissionOnTopicsAsync(options));
+ }
+
@Override
public void revokePermissionsOnNamespace(String namespace, String role)
throws PulsarAdminException {
sync(() -> revokePermissionsOnNamespaceAsync(namespace, role));