This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ddda81143 [Broker]make revokePermissionsOnTopic method async (#14149)
d7ddda81143 is described below

commit d7ddda811437096b857bffff7d080a1c555f54d8
Author: Dezhi LIiu <[email protected]>
AuthorDate: Wed Apr 20 11:24:14 2022 +0800

    [Broker]make revokePermissionsOnTopic method async (#14149)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  89 ++++++------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  17 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  11 +-
 .../pulsar/broker/web/PulsarWebResource.java       | 154 +++++++++++++--------
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   8 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  13 +-
 6 files changed, 185 insertions(+), 107 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3c3f622266c..eb8d9bcc513 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
 import static 
org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
 import static 
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -320,49 +321,54 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    private void revokePermissions(String topicUri, String role) {
-        Policies policies;
-        try {
-            policies = namespaceResources().getPolicies(namespaceName)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND, 
"Namespace does not exist"));
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-        if 
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
-                || 
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
 {
-            log.warn("[{}] Failed to revoke permission from role {} on topic: 
Not set at topic level {}", clientAppId(),
-                    role, topicUri);
-            throw new RestException(Status.PRECONDITION_FAILED, "Permissions 
are not set at the topic level");
-        }
-        try {
-            // Write the new policies to metadata store
-            namespaceResources().setPolicies(namespaceName, p -> {
-                
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
-                return p;
-            });
-            log.info("[{}] Successfully revoke access for role {} - topic {}", 
clientAppId(), role, topicUri);
-        } catch (Exception e) {
-            log.error("[{}] Failed to revoke permissions for topic {}", 
clientAppId(), topicUri, e);
-            throw new RestException(e);
-        }
-
+    private CompletableFuture<Void> revokePermissionsAsync(String topicUri, 
String role) {
+        return 
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
+                policiesOptional -> {
+                    Policies policies = policiesOptional.orElseThrow(() ->
+                            new RestException(Status.NOT_FOUND, "Namespace 
does not exist"));
+                    if 
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+                            || 
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
 {
+                        log.warn("[{}] Failed to revoke permission from role 
{} on topic: Not set at topic level {}",
+                                clientAppId(), role, topicUri);
+                        return FutureUtil.failedFuture(new 
RestException(Status.PRECONDITION_FAILED,
+                                "Permissions are not set at the topic level"));
+                    }
+                    // Write the new policies to metadata store
+                    return 
namespaceResources().setPoliciesAsync(namespaceName, p -> {
+                        
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+                        return p;
+                    }).thenAccept(__ ->
+                            log.info("[{}] Successfully revoke access for role 
{} - topic {}", clientAppId(), role,
+                            topicUri)
+                    );
+                }
+        );
     }
 
-    protected void internalRevokePermissionsOnTopic(String role) {
+    protected void internalRevokePermissionsOnTopic(AsyncResponse 
asyncResponse, String role) {
         // This operation should be reading from zookeeper and it should be 
allowed without having admin privileges
-        validateAdminAccessForTenant(namespaceName.getTenant());
-        validatePoliciesReadOnlyAccess();
-
-        PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName, 
true, false);
-        int numPartitions = meta.partitions;
-        if (numPartitions > 0) {
-            for (int i = 0; i < numPartitions; i++) {
-                TopicName topicNamePartition = topicName.getPartition(i);
-                revokePermissions(topicNamePartition.toString(), role);
-            }
-        }
-        revokePermissions(topicName.toString(), role);
+        validateAdminAccessForTenantAsync(namespaceName.getTenant())
+                .thenCompose(__ -> 
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+            getPartitionedTopicMetadataAsync(topicName, true, false)
+                .thenCompose(metadata -> {
+                    int numPartitions = metadata.partitions;
+                    CompletableFuture<Void> future = 
CompletableFuture.completedFuture(null);
+                    if (numPartitions > 0) {
+                        for (int i = 0; i < numPartitions; i++) {
+                            TopicName topicNamePartition = 
topicName.getPartition(i);
+                            future = future.thenComposeAsync(unused ->
+                                    
revokePermissionsAsync(topicNamePartition.toString(), role));
+                        }
+                    }
+                    return future.thenComposeAsync(unused -> 
revokePermissionsAsync(topicName.toString(), role))
+                            .thenAccept(unused -> 
asyncResponse.resume(Response.noContent().build()));
+                }))
+            ).exceptionally(ex -> {
+                    Throwable realCause = 
FutureUtil.unwrapCompletionException(ex);
+                    log.error("[{}] Failed to revoke permissions for topic 
{}", clientAppId(), topicName, realCause);
+                    resumeAsyncResponseExceptionally(asyncResponse, realCause);
+                    return null;
+                });
     }
 
     protected void internalCreateNonPartitionedTopic(boolean authoritative, 
Map<String, String> properties) {
@@ -3866,7 +3872,8 @@ public class PersistentTopicsBase extends AdminResource {
             } catch (RestException e) {
                 try {
                     validateAdminAccessForTenant(pulsar,
-                            clientAppId, originalPrincipal, 
topicName.getTenant(), authenticationData);
+                            clientAppId, originalPrincipal, 
topicName.getTenant(), authenticationData,
+                            
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
                 } catch (RestException authException) {
                     log.warn("Failed to authorize {} on topic {}", 
clientAppId, topicName);
                     throw new 
PulsarClientException(String.format("Authorization failed %s on topic %s with 
error %s",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fcb71566f6b..8353756dce0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -141,11 +141,18 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace doesn't exist"),
             @ApiResponse(code = 412, message = "Permissions are not set at the 
topic level")})
-    public void revokePermissionsOnTopic(@PathParam("property") String 
property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
-            @PathParam("topic") @Encoded String encodedTopic, 
@PathParam("role") String role) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+    public void revokePermissionsOnTopic(@Suspended final AsyncResponse 
asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") 
String cluster,
+            @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
+            @PathParam("role") String role) {
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 30836b9b3d5..f640b1ca6de 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -192,6 +192,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Permissions are not set at the 
topic level"),
             @ApiResponse(code = 500, message = "Internal server error")})
     public void revokePermissionsOnTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -200,8 +201,14 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Client role to which grant permissions", 
required = true)
             @PathParam("role") String role) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalRevokePermissionsOnTopic(role);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalRevokePermissionsOnTopic(asyncResponse, role);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @PUT
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index abfd4b9cf97..baa5d3fe332 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -34,6 +34,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import javax.servlet.ServletContext;
@@ -247,7 +248,8 @@ public abstract class PulsarWebResource {
      */
     protected void validateAdminAccessForTenant(String tenant) {
         try {
-            validateAdminAccessForTenant(pulsar(), clientAppId(), 
originalPrincipal(), tenant, clientAuthData());
+            validateAdminAccessForTenant(pulsar(), clientAppId(), 
originalPrincipal(), tenant, clientAuthData(),
+                    config().getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
         } catch (RestException e) {
             throw e;
         } catch (Exception e) {
@@ -257,65 +259,109 @@ public abstract class PulsarWebResource {
     }
 
     protected static void validateAdminAccessForTenant(PulsarService pulsar, 
String clientAppId,
-                                                       String 
originalPrincipal, String tenant,
-                                                       
AuthenticationDataSource authenticationData)
-            throws Exception {
+                                                String originalPrincipal, 
String tenant,
+                                                AuthenticationDataSource 
authenticationData,
+                                                long timeout, TimeUnit unit) {
+        try {
+            validateAdminAccessForTenantAsync(pulsar, clientAppId, 
originalPrincipal, tenant, authenticationData)
+                    .get(timeout, unit);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            Throwable realCause = FutureUtil.unwrapCompletionException(e);
+            if (realCause instanceof WebApplicationException) {
+                throw (WebApplicationException) realCause;
+            } else {
+                throw new RestException(realCause);
+            }
+        }
+    }
+
+    /**
+     * Checks that the http client role has admin access to the specified 
tenant async.
+     *
+     * @param tenant the tenant id
+     */
+    protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String 
tenant) {
+        return validateAdminAccessForTenantAsync(pulsar(), clientAppId(), 
originalPrincipal(), tenant,
+                clientAuthData());
+    }
+
+    protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+            PulsarService pulsar, String clientAppId,
+            String originalPrincipal, String tenant,
+            AuthenticationDataSource authenticationData) {
         if (log.isDebugEnabled()) {
             log.debug("check admin access on tenant: {} - Authenticated: {} -- 
role: {}", tenant,
                     (isClientAuthenticated(clientAppId)), clientAppId);
         }
-
-        TenantInfo tenantInfo = 
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
-                .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant 
does not exist"));
-
-        if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration().isAuthorizationEnabled()) {
-            if (!isClientAuthenticated(clientAppId)) {
-                throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
-            }
-
-            
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId, originalPrincipal);
-
-            if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
-                CompletableFuture<Boolean> isProxySuperUserFuture;
-                CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
-                try {
-                    AuthorizationService authorizationService = 
pulsar.getBrokerService().getAuthorizationService();
-                    isProxySuperUserFuture = 
authorizationService.isSuperUser(clientAppId, authenticationData);
-
-                    isOriginalPrincipalSuperUserFuture =
-                            
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
-                    boolean proxyAuthorized = isProxySuperUserFuture.get()
-                            || authorizationService.isTenantAdmin(tenant, 
clientAppId,
-                            tenantInfo, authenticationData).get();
-                    boolean originalPrincipalAuthorized =
-                            isOriginalPrincipalSuperUserFuture.get() || 
authorizationService.isTenantAdmin(tenant,
-                                    originalPrincipal, tenantInfo, 
authenticationData).get();
-                    if (!proxyAuthorized || !originalPrincipalAuthorized) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                String.format("Proxy not authorized to access 
resource (proxy:%s,original:%s)",
-                                        clientAppId, originalPrincipal));
+        return 
pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+                .thenCompose(tenantInfoOptional -> {
+                    if (!tenantInfoOptional.isPresent()) {
+                        throw new RestException(Status.NOT_FOUND, "Tenant does 
not exist");
                     }
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
-                }
-                log.debug("Successfully authorized {} (proxied by {}) on 
tenant {}",
-                          originalPrincipal, clientAppId, tenant);
-            } else {
-                if (!pulsar.getBrokerService()
-                        .getAuthorizationService()
-                        .isSuperUser(clientAppId, authenticationData)
-                        .join()) {
-                    if (!pulsar.getBrokerService().getAuthorizationService()
-                            .isTenantAdmin(tenant, clientAppId, tenantInfo, 
authenticationData).get()) {
-                        throw new RestException(Status.UNAUTHORIZED,
-                                "Don't have permission to administrate 
resources on this tenant");
+                    TenantInfo tenantInfo = tenantInfoOptional.get();
+                    if (pulsar.getConfiguration().isAuthenticationEnabled() && 
pulsar.getConfiguration()
+                            .isAuthorizationEnabled()) {
+                        if (!isClientAuthenticated(clientAppId)) {
+                            throw new RestException(Status.FORBIDDEN, "Need to 
authenticate to perform the request");
+                        }
+                        
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(), 
clientAppId,
+                                originalPrincipal);
+                        if 
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+                            AuthorizationService authorizationService =
+                                    
pulsar.getBrokerService().getAuthorizationService();
+                            return authorizationService.isTenantAdmin(tenant, 
clientAppId, tenantInfo,
+                                            authenticationData)
+                                .thenCompose(isTenantAdmin -> {
+                                    String debugMsg = "Successfully authorized 
{} (proxied by {}) on tenant {}";
+                                    if (!isTenantAdmin) {
+                                            return 
authorizationService.isSuperUser(clientAppId, authenticationData)
+                                                
.thenCombine(authorizationService.isSuperUser(originalPrincipal,
+                                                             
authenticationData),
+                                                     (proxyAuthorized, 
originalPrincipalAuthorized) -> {
+                                                         if (!proxyAuthorized 
|| !originalPrincipalAuthorized) {
+                                                             throw new 
RestException(Status.UNAUTHORIZED,
+                                                                     
String.format("Proxy not authorized to access "
+                                                                               
      + "resource (proxy:%s,original:%s)"
+                                                                             , 
clientAppId, originalPrincipal));
+                                                         } else {
+                                                             if 
(log.isDebugEnabled()) {
+                                                                 
log.debug(debugMsg, originalPrincipal, clientAppId,
+                                                                         
tenant);
+                                                             }
+                                                             return null;
+                                                         }
+                                                     });
+                                    } else {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(debugMsg, 
originalPrincipal, clientAppId, tenant);
+                                        }
+                                        return 
CompletableFuture.completedFuture(null);
+                                    }
+                                });
+                        } else {
+                            return pulsar.getBrokerService()
+                                    .getAuthorizationService()
+                                    .isSuperUser(clientAppId, 
authenticationData)
+                                    .thenCompose(isSuperUser -> {
+                                        if (!isSuperUser) {
+                                            return 
pulsar.getBrokerService().getAuthorizationService()
+                                                    .isTenantAdmin(tenant, 
clientAppId, tenantInfo, authenticationData);
+                                        } else {
+                                            return 
CompletableFuture.completedFuture(true);
+                                        }
+                                    }).thenAccept(authorized -> {
+                                        if (!authorized) {
+                                            throw new 
RestException(Status.UNAUTHORIZED,
+                                                    "Don't have permission to 
administrate resources on this tenant");
+                                        } else {
+                                            log.debug("Successfully authorized 
{} on tenant {}", clientAppId, tenant);
+                                        }
+                                    });
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
                     }
-                }
-
-                log.debug("Successfully authorized {} on tenant {}", 
clientAppId, tenant);
-            }
-        }
+                });
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f506feedd33..91ec9adecc7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -786,14 +786,16 @@ public class AdminTest extends 
MockedPulsarServiceBaseTest {
                 namespace, topic);
         assertEquals(permission.get(role), actions);
         // remove permission
-        persistentTopics.revokePermissionsOnTopic(property, cluster, 
namespace, topic, role);
-
+        response = mock(AsyncResponse.class);
+        persistentTopics.revokePermissionsOnTopic(response, property, cluster, 
namespace, topic, role);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         // verify removed permission
         Awaitility.await().untilAsserted(() -> {
             Map<String, Set<AuthAction>> p = 
persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
             assertTrue(p.isEmpty());
         });
-
     }
 
     @Test
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d6dd9d91050..22014fc37e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -681,7 +681,11 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
topicName, role, expectActions);
-        persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, 
topicName, role);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        persistentTopics.revokePermissionsOnTopic(response, testTenant, 
testNamespace, topicName, role);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
         Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
         Assert.assertEquals(permissions.get(role), null);
     }
@@ -700,7 +704,12 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
         persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace, 
partitionedTopicName, role, expectActions);
-        persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace, 
partitionedTopicName, role);
+        response = mock(AsyncResponse.class);
+        persistentTopics.revokePermissionsOnTopic(response, testTenant, 
testNamespace, partitionedTopicName, role);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+
         Map<String, Set<AuthAction>> permissions = 
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
                 partitionedTopicName);
         Assert.assertEquals(permissions.get(role), null);

Reply via email to