liudezhi2098 commented on code in PR #14149:
URL: https://github.com/apache/pulsar/pull/14149#discussion_r851214292


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -320,49 +321,61 @@ protected void internalDeleteTopicForcefully(boolean 
authoritative, boolean dele
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
-        Policies policies;
-        try {
-            policies = namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-        if 
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
-                || 
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
 {
-            log.warn("[{}] Failed to revoke permission from role {} on topic: 
Not set at topic level {}", clientAppId(),
-                    role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions 
are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", 
clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-
+    private CompletableFuture<Void> revokePermissions(String topicUri, String 
role) {
+        return 
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
+                policiesOptional -> {
+                    Policies policies = policiesOptional.orElseThrow(() ->
+                            new RestException(Status.NOT_FOUND, "Namespace 
does not exist"));
+                    if 
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+                            || 
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
 {
+                        log.warn("[{}] Failed to revoke permission from role 
{} on topic: Not set at topic level {}",
+                                clientAppId(), role, topicUri);
+                        return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Permissions are not set at the topic level"));
+                    }
+                    if 
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+                            || 
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
 {
+                        log.warn("[{}] Failed to revoke permission from role 
{} on topic: Not set at topic level {}",
+                                clientAppId(), role, topicUri);
+                        return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Permissions are not set at the topic level"));
+                    }
+                    // Write the new policies to metadata store
+                    return 
namespaceResources().setPoliciesAsync(namespaceName, p -> {
+                        
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+                        return p;
+                    }).thenAccept(__ ->
+                            log.info("[{}] Successfully revoke access for role 
{} - topic {}", clientAppId(), role,
+                            topicUri)
+                    );
+                }
+        );
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, 
true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                revokePermissions(topicNamePartition.toString(), role);
-            }
-        }
-        revokePermissions(topicName.toString(), role);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+            getPartitionedTopicMetadataAsync(topicName, true, false)
+                .thenCompose(metadata -> {
+                    int numPartitions = metadata.partitions;
+                    CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+                    if (numPartitions > 0) {
+                        for (int i = 0; i < numPartitions; i++) {

Review Comment:
   this way will cause KeeperException, we need serial.
   
   ```
   asyncResponse.resume(
       org.apache.pulsar.broker.web.RestException: HTTP 500 
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
BadVersion for /admin/policies/my-tenant/my-namespace
   );
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to