Technoboy- commented on code in PR #14152:
URL: https://github.com/apache/pulsar/pull/14152#discussion_r853799748


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -256,53 +256,63 @@ public void validateAdminOperationOnTopic(boolean 
authoritative) {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, 
Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, 
null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization 
is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic 
{}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were 
historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure 
backwards compatibility.
-            if (e.getCause() instanceof 
MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: 
Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace 
does not exist");
-            } else if (e.getCause() instanceof 
MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent 
modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, 
String role, Set<AuthAction> actions) {
+        AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, 
null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted 
access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the 
IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here 
to ensure backwards compatibility.
+                        if (realCause instanceof 
MetadataStoreException.NotFoundException
+                                || realCause instanceof 
IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic 
{}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's 
namespace does not exist");
+                        } else if (realCause instanceof 
MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) 
{
+                            log.warn("[{}] Failed to set permissions for topic 
{}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, 
"Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for 
topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            log.error("[{}] Failed to get permissions for topic {}, because 
{}", clientAppId(), topicUri, msg);
+            return FutureUtil.failedFuture(new 
RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, 
Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse 
asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, 
true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);

Review Comment:
   Use a List to add these futures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to