Technoboy- commented on code in PR #23372:
URL: https://github.com/apache/pulsar/pull/23372#discussion_r1794452230


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -613,6 +616,83 @@ protected CompletableFuture<Void> 
internalGrantPermissionOnNamespaceAsync(String
                 });
     }
 
+    protected CompletableFuture<Void> 
internalGrantPermissionOnTopicsAsync(List<GrantTopicPermissionOptions> options) 
{
+        return checkNamespace(options.stream().map(o -> 
TopicName.get(o.getTopic()).getNamespace()))
+                .thenCompose(theSameNamespace -> {
+                    if (!theSameNamespace) {
+                        throw new RestException(Status.BAD_REQUEST, "The 
namespace should be the same");
+                    }
+
+                    return 
validateAdminAccessForTenantAsync(TopicName.get(options.get(0).getTopic()).getTenant());
+                }).thenCompose(__ -> 
internalCheckTopicExists(options.stream().map(o -> 
TopicName.get(o.getTopic()))))
+                .thenCompose(__ -> 
getAuthorizationService().grantPermissionAsync(options))
+                .thenAccept(unused -> log.info("[{}] Successfully granted 
access for {}", clientAppId(), options))
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    //The IllegalArgumentException and the 
IllegalStateException were historically thrown by the
+                    // grantPermissionAsync method, so we catch them here to 
ensure backwards compatibility.
+                    if (realCause instanceof 
MetadataStoreException.NotFoundException
+                            || realCause instanceof IllegalArgumentException) {
+                        log.warn("[{}] Failed to grant permissions for 
namespace {}: does not exist", clientAppId(),
+                                namespaceName, ex);
+                        throw new RestException(Status.NOT_FOUND, "Topic's 
namespace does not exist");
+                    } else if (realCause instanceof 
MetadataStoreException.BadVersionException
+                            || realCause instanceof IllegalStateException) {
+                        log.warn("[{}] Failed to grant permissions for 
namespace {}: {}",
+                                clientAppId(), namespaceName, 
ex.getCause().getMessage(), ex);
+                        throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+                    } else {
+                        log.error("[{}] Failed to grant permissions for 
namespace {}",
+                                clientAppId(), namespaceName, ex);
+                        throw new RestException(realCause);
+                    }
+                });
+    }
+
+    protected CompletableFuture<Void> internalRevokePermissionOnTopicsAsync(
+            List<RevokeTopicPermissionOptions> options) {
+        return checkNamespace(options.stream().map(o -> 
TopicName.get(o.getTopic()).getNamespace()))
+                .thenCompose(theSameNamespace -> {
+                    if (!theSameNamespace) {
+                        throw new RestException(Status.BAD_REQUEST, "The 
namespace should be the same");
+                    }
+
+                    return 
validateAdminAccessForTenantAsync(TopicName.get(options.get(0).getTopic()).getTenant());
+                }).thenCompose(__ -> 
internalCheckTopicExists(options.stream().map(o -> 
TopicName.get(o.getTopic()))))
+                .thenCompose(__ -> 
getAuthorizationService().revokePermissionAsync(options))
+                .thenAccept(unused -> log.info("[{}] Successfully revoke 
access for {}", clientAppId(), options))
+                .exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    //The IllegalArgumentException and the 
IllegalStateException were historically thrown by the
+                    // grantPermissionAsync method, so we catch them here to 
ensure backwards compatibility.
+                    if (realCause instanceof 
MetadataStoreException.NotFoundException
+                            || realCause instanceof IllegalArgumentException) {
+                        log.warn("[{}] Failed to revoke permissions for 
namespace {}: does not exist", clientAppId(),
+                                namespaceName, ex);
+                        throw new RestException(Status.NOT_FOUND, "Topic's 
namespace does not exist");
+                    } else if (realCause instanceof 
MetadataStoreException.BadVersionException
+                            || realCause instanceof IllegalStateException) {
+                        log.warn("[{}] Failed to revoke permissions for 
namespace {}: {}",
+                                clientAppId(), namespaceName, 
ex.getCause().getMessage(), ex);
+                        throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+                    } else {
+                        log.error("[{}] Failed to revoke permissions for 
namespace {}",
+                                clientAppId(), namespaceName, ex);
+                        throw new RestException(realCause);
+                    }
+                });
+    }
+
+    private CompletableFuture<Boolean> checkNamespace(Stream<String> 
namespaces) {

Review Comment:
   I have checked it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to