This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1344b33e411 [Broker] make grantPermissionsOnTopic method async (#14152)
1344b33e411 is described below

commit 1344b33e411e234b55595307dc6c1e3be14b8a55
Author: Dezhi LIiu <[email protected]>
AuthorDate: Sun Apr 24 08:39:09 2022 +0800

    [Broker] make grantPermissionsOnTopic method async (#14152)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 95 ++++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 ++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 11 ++-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 26 +++++-
 5 files changed, 101 insertions(+), 55 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index addf02f8005..909c7de8d74 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -256,53 +256,62 @@ public class PersistentTopicsBase extends AdminResource {
         validateTopicOwnership(topicName, authoritative);
     }
 
-    private void grantPermissions(TopicName topicUri, String role, 
Set<AuthAction> actions) {
-        try {
-            AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
-            if (null != authService) {
-                authService.grantPermissionAsync(topicUri, actions, role, 
null/*additional auth-data json*/).get();
-            } else {
-                throw new RestException(Status.NOT_IMPLEMENTED, "Authorization 
is not enabled");
-            }
-            log.info("[{}] Successfully granted access for role {}: {} - topic 
{}", clientAppId(), role, actions,
-                    topicUri);
-        } catch (InterruptedException e) {
-            log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        } catch (ExecutionException e) {
-            // The IllegalArgumentException and the IllegalStateException were 
historically thrown by the
-            // grantPermissionAsync method, so we catch them here to ensure 
backwards compatibility.
-            if (e.getCause() instanceof 
MetadataStoreException.NotFoundException
-                    || e.getCause() instanceof IllegalArgumentException) {
-                log.warn("[{}] Failed to set permissions for topic {}: 
Namespace does not exist", clientAppId(),
-                        topicUri, e);
-                throw new RestException(Status.NOT_FOUND, "Topic's namespace 
does not exist");
-            } else if (e.getCause() instanceof 
MetadataStoreException.BadVersionException
-                    || e.getCause() instanceof IllegalStateException) {
-                log.warn("[{}] Failed to set permissions for topic {}: {}",
-                        clientAppId(), topicUri, e.getCause().getMessage(), e);
-                throw new RestException(Status.CONFLICT, "Concurrent 
modification");
-            } else {
-                log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicUri, e);
-                throw new RestException(e);
-            }
+    private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri, 
String role, Set<AuthAction> actions) {
+        AuthorizationService authService = 
pulsar().getBrokerService().getAuthorizationService();
+        if (null != authService) {
+            return authService.grantPermissionAsync(topicUri, actions, role, 
null/*additional auth-data json*/)
+                    .thenAccept(__ -> log.info("[{}] Successfully granted 
access for role {}: {} - topic {}",
+                            clientAppId(), role, actions, topicUri))
+                    .exceptionally(ex -> {
+                        Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                        //The IllegalArgumentException and the 
IllegalStateException were historically thrown by the
+                        // grantPermissionAsync method, so we catch them here 
to ensure backwards compatibility.
+                        if (realCause instanceof 
MetadataStoreException.NotFoundException
+                                || realCause instanceof 
IllegalArgumentException) {
+                            log.warn("[{}] Failed to set permissions for topic 
{}: Namespace does not exist",
+                                    clientAppId(), topicUri, realCause);
+                            throw new RestException(Status.NOT_FOUND, "Topic's 
namespace does not exist");
+                        } else if (realCause instanceof 
MetadataStoreException.BadVersionException
+                                || realCause instanceof IllegalStateException) 
{
+                            log.warn("[{}] Failed to set permissions for topic 
{}: {}", clientAppId(), topicUri,
+                                    realCause.getMessage(), realCause);
+                            throw new RestException(Status.CONFLICT, 
"Concurrent modification");
+                        } else {
+                            log.error("[{}] Failed to get permissions for 
topic {}", clientAppId(), topicUri,
+                                    realCause);
+                            throw new RestException(realCause);
+                        }
+                    });
+        } else {
+            String msg = "Authorization is not enabled";
+            return FutureUtil.failedFuture(new 
RestException(Status.NOT_IMPLEMENTED, msg));
         }
     }
 
-    protected void internalGrantPermissionsOnTopic(String role, 
Set<AuthAction> actions) {
+    protected void internalGrantPermissionsOnTopic(final AsyncResponse 
asyncResponse, String role,
+                                                   Set<AuthAction> actions) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, 
true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                grantPermissions(topicNamePartition, role, actions);
-            }
-        }
-        grantPermissions(topicName, role, actions);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+             getPartitionedTopicMetadataAsync(topicName, true, false)
+                  .thenCompose(metadata -> {
+                      int numPartitions = metadata.partitions;
+                      CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+                      if (numPartitions > 0) {
+                          for (int i = 0; i < numPartitions; i++) {
+                              TopicName topicNamePartition = 
topicName.getPartition(i);
+                              future = future.thenCompose(unused -> 
grantPermissionsAsync(topicNamePartition, role,
+                                      actions));
+                          }
+                      }
+                      return future.thenCompose(unused -> 
grantPermissionsAsync(topicName, role, actions))
+                              .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                  }))).exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to get permissions for topic {}", 
clientAppId(), topicName, realCause);
+                    resumeAsyncResponseExceptionally(asyncResponse, realCause);
+                    return null;
+                });
     }
 
     protected void internalDeleteTopicForcefully(boolean authoritative, 
boolean deleteSchema) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8353756dce0..c24e0c34b40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -122,12 +122,20 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 409, message = "Concurrent modification") })
-    public void grantPermissionsOnTopic(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, 
@PathParam("role") String role,
+    public void grantPermissionsOnTopic(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
+            @PathParam("role") String role,
             Set<AuthAction> actions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalGrantPermissionsOnTopic(role, actions);
+
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f640b1ca6de..3f12df47f4e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -163,6 +163,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error") })
     public void grantPermissionsOnTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -174,8 +175,14 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @ApiParam(value = "Actions to be granted 
(produce,functions,consume)",
                     allowableValues = "produce,functions,consume")
                     Set<AuthAction> actions) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGrantPermissionsOnTopic(role, actions);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGrantPermissionsOnTopic(asyncResponse, role, actions);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 91ec9adecc7..11bed0583b7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -780,7 +780,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest 
{
         // grant permission
         final Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce);
         final String role = "test-role";
-        persistentTopics.grantPermissionsOnTopic(property, cluster, namespace, 
topic, role, actions);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, property, cluster, 
namespace, topic, role, actions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         // verify permission
         Map<String, Set<AuthAction>> permission = 
persistentTopics.getPermissionsOnTopic(property, cluster,
                 namespace, topic);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 22014fc37e4..6c72e19437f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -623,7 +623,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
topicName, role, expectActions);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, 
testNamespace, topicName, role, expectActions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
         Assert.assertEquals(permissions.get(role), expectActions);
     }
@@ -659,7 +663,12 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
partitionedTopicName, role, expectActions);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, 
testNamespace, partitionedTopicName, role,
+                expectActions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
                 partitionedTopicName);
         Assert.assertEquals(permissions.get(role), expectActions);
@@ -680,9 +689,13 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
topicName, role, expectActions);
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, 
testNamespace, topicName, role, expectActions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.revokePermissionsOnTopic(response, testTenant, 
testNamespace, topicName, role);
         verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
@@ -703,7 +716,12 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
partitionedTopicName, role, expectActions);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.grantPermissionsOnTopic(response, testTenant, 
testNamespace, partitionedTopicName, role,
+                expectActions);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         response = mock(AsyncResponse.class);
         persistentTopics.revokePermissionsOnTopic(response, testTenant, 
testNamespace, partitionedTopicName, role);
         responseCaptor = ArgumentCaptor.forClass(Response.class);

Reply via email to