Technoboy- commented on code in PR #23372:
URL: https://github.com/apache/pulsar/pull/23372#discussion_r1794452230
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -613,6 +616,83 @@ protected CompletableFuture<Void>
internalGrantPermissionOnNamespaceAsync(String
});
}
+ protected CompletableFuture<Void>
internalGrantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options)
{
+ return checkNamespace(options.stream().map(o ->
TopicName.get(o.getTopic()).getNamespace()))
+ .thenCompose(theSameNamespace -> {
+ if (!theSameNamespace) {
+ throw new RestException(Status.BAD_REQUEST, "The
namespace should be the same");
+ }
+
+ return
validateAdminAccessForTenantAsync(TopicName.get(options.get(0).getTopic()).getTenant());
+ }).thenCompose(__ ->
internalCheckTopicExists(options.stream().map(o ->
TopicName.get(o.getTopic()))))
+ .thenCompose(__ ->
getAuthorizationService().grantPermissionAsync(options))
+ .thenAccept(unused -> log.info("[{}] Successfully granted
access for {}", clientAppId(), options))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here to
ensure backwards compatibility.
+ if (realCause instanceof
MetadataStoreException.NotFoundException
+ || realCause instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to grant permissions for
namespace {}: does not exist", clientAppId(),
+ namespaceName, ex);
+ throw new RestException(Status.NOT_FOUND, "Topic's
namespace does not exist");
+ } else if (realCause instanceof
MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException) {
+ log.warn("[{}] Failed to grant permissions for
namespace {}: {}",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } else {
+ log.error("[{}] Failed to grant permissions for
namespace {}",
+ clientAppId(), namespaceName, ex);
+ throw new RestException(realCause);
+ }
+ });
+ }
+
+ protected CompletableFuture<Void> internalRevokePermissionOnTopicsAsync(
+ List<RevokeTopicPermissionOptions> options) {
+ return checkNamespace(options.stream().map(o ->
TopicName.get(o.getTopic()).getNamespace()))
+ .thenCompose(theSameNamespace -> {
+ if (!theSameNamespace) {
+ throw new RestException(Status.BAD_REQUEST, "The
namespace should be the same");
+ }
+
+ return
validateAdminAccessForTenantAsync(TopicName.get(options.get(0).getTopic()).getTenant());
+ }).thenCompose(__ ->
internalCheckTopicExists(options.stream().map(o ->
TopicName.get(o.getTopic()))))
+ .thenCompose(__ ->
getAuthorizationService().revokePermissionAsync(options))
+ .thenAccept(unused -> log.info("[{}] Successfully revoke
access for {}", clientAppId(), options))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here to
ensure backwards compatibility.
+ if (realCause instanceof
MetadataStoreException.NotFoundException
+ || realCause instanceof IllegalArgumentException) {
+ log.warn("[{}] Failed to revoke permissions for
namespace {}: does not exist", clientAppId(),
+ namespaceName, ex);
+ throw new RestException(Status.NOT_FOUND, "Topic's
namespace does not exist");
+ } else if (realCause instanceof
MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException) {
+ log.warn("[{}] Failed to revoke permissions for
namespace {}: {}",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ throw new RestException(Status.CONFLICT, "Concurrent
modification");
+ } else {
+ log.error("[{}] Failed to revoke permissions for
namespace {}",
+ clientAppId(), namespaceName, ex);
+ throw new RestException(realCause);
+ }
+ });
+ }
+
+ private CompletableFuture<Boolean> checkNamespace(Stream<String>
namespaces) {
Review Comment:
I have checked it here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]