This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d7ddda81143 [Broker]make revokePermissionsOnTopic method async (#14149)
d7ddda81143 is described below
commit d7ddda811437096b857bffff7d080a1c555f54d8
Author: Dezhi LIiu <[email protected]>
AuthorDate: Wed Apr 20 11:24:14 2022 +0800
[Broker]make revokePermissionsOnTopic method async (#14149)
---
.../broker/admin/impl/PersistentTopicsBase.java | 89 ++++++------
.../pulsar/broker/admin/v1/PersistentTopics.java | 17 ++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 11 +-
.../pulsar/broker/web/PulsarWebResource.java | 154 +++++++++++++--------
.../org/apache/pulsar/broker/admin/AdminTest.java | 8 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 13 +-
6 files changed, 185 insertions(+), 107 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 3c3f622266c..eb8d9bcc513 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static
org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static
org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
@@ -320,49 +321,54 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- private void revokePermissions(String topicUri, String role) {
- Policies policies;
- try {
- policies = namespaceResources().getPolicies(namespaceName)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace does not exist"));
- } catch (Exception e) {
- log.error("[{}] Failed to revoke permissions for topic {}",
clientAppId(), topicUri, e);
- throw new RestException(e);
- }
- if
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
- ||
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
{
- log.warn("[{}] Failed to revoke permission from role {} on topic:
Not set at topic level {}", clientAppId(),
- role, topicUri);
- throw new RestException(Status.PRECONDITION_FAILED, "Permissions
are not set at the topic level");
- }
- try {
- // Write the new policies to metadata store
- namespaceResources().setPolicies(namespaceName, p -> {
-
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
- return p;
- });
- log.info("[{}] Successfully revoke access for role {} - topic {}",
clientAppId(), role, topicUri);
- } catch (Exception e) {
- log.error("[{}] Failed to revoke permissions for topic {}",
clientAppId(), topicUri, e);
- throw new RestException(e);
- }
-
+ private CompletableFuture<Void> revokePermissionsAsync(String topicUri,
String role) {
+ return
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(
+ policiesOptional -> {
+ Policies policies = policiesOptional.orElseThrow(() ->
+ new RestException(Status.NOT_FOUND, "Namespace
does not exist"));
+ if
(!policies.auth_policies.getTopicAuthentication().containsKey(topicUri)
+ ||
!policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role))
{
+ log.warn("[{}] Failed to revoke permission from role
{} on topic: Not set at topic level {}",
+ clientAppId(), role, topicUri);
+ return FutureUtil.failedFuture(new
RestException(Status.PRECONDITION_FAILED,
+ "Permissions are not set at the topic level"));
+ }
+ // Write the new policies to metadata store
+ return
namespaceResources().setPoliciesAsync(namespaceName, p -> {
+
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
+ return p;
+ }).thenAccept(__ ->
+ log.info("[{}] Successfully revoke access for role
{} - topic {}", clientAppId(), role,
+ topicUri)
+ );
+ }
+ );
}
- protected void internalRevokePermissionsOnTopic(String role) {
+ protected void internalRevokePermissionsOnTopic(AsyncResponse
asyncResponse, String role) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
-
- PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName,
true, false);
- int numPartitions = meta.partitions;
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- revokePermissions(topicNamePartition.toString(), role);
- }
- }
- revokePermissions(topicName.toString(), role);
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+ getPartitionedTopicMetadataAsync(topicName, true, false)
+ .thenCompose(metadata -> {
+ int numPartitions = metadata.partitions;
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (numPartitions > 0) {
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ future = future.thenComposeAsync(unused ->
+
revokePermissionsAsync(topicNamePartition.toString(), role));
+ }
+ }
+ return future.thenComposeAsync(unused ->
revokePermissionsAsync(topicName.toString(), role))
+ .thenAccept(unused ->
asyncResponse.resume(Response.noContent().build()));
+ }))
+ ).exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to revoke permissions for topic
{}", clientAppId(), topicName, realCause);
+ resumeAsyncResponseExceptionally(asyncResponse, realCause);
+ return null;
+ });
}
protected void internalCreateNonPartitionedTopic(boolean authoritative,
Map<String, String> properties) {
@@ -3866,7 +3872,8 @@ public class PersistentTopicsBase extends AdminResource {
} catch (RestException e) {
try {
validateAdminAccessForTenant(pulsar,
- clientAppId, originalPrincipal,
topicName.getTenant(), authenticationData);
+ clientAppId, originalPrincipal,
topicName.getTenant(), authenticationData,
+
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (RestException authException) {
log.warn("Failed to authorize {} on topic {}",
clientAppId, topicName);
throw new
PulsarClientException(String.format("Authorization failed %s on topic %s with
error %s",
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index fcb71566f6b..8353756dce0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -141,11 +141,18 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace doesn't exist"),
@ApiResponse(code = 412, message = "Permissions are not set at the
topic level")})
- public void revokePermissionsOnTopic(@PathParam("property") String
property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
@PathParam("role") String role) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalRevokePermissionsOnTopic(role);
+ public void revokePermissionsOnTopic(@Suspended final AsyncResponse
asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster")
String cluster,
+ @PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
+ @PathParam("role") String role) {
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalRevokePermissionsOnTopic(asyncResponse, role);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@PUT
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 30836b9b3d5..f640b1ca6de 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -192,6 +192,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Permissions are not set at the
topic level"),
@ApiResponse(code = 500, message = "Internal server error")})
public void revokePermissionsOnTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -200,8 +201,14 @@ public class PersistentTopics extends PersistentTopicsBase
{
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Client role to which grant permissions",
required = true)
@PathParam("role") String role) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalRevokePermissionsOnTopic(role);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalRevokePermissionsOnTopic(asyncResponse, role);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@PUT
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index abfd4b9cf97..baa5d3fe332 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -34,6 +34,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import javax.servlet.ServletContext;
@@ -247,7 +248,8 @@ public abstract class PulsarWebResource {
*/
protected void validateAdminAccessForTenant(String tenant) {
try {
- validateAdminAccessForTenant(pulsar(), clientAppId(),
originalPrincipal(), tenant, clientAuthData());
+ validateAdminAccessForTenant(pulsar(), clientAppId(),
originalPrincipal(), tenant, clientAuthData(),
+ config().getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
} catch (RestException e) {
throw e;
} catch (Exception e) {
@@ -257,65 +259,109 @@ public abstract class PulsarWebResource {
}
protected static void validateAdminAccessForTenant(PulsarService pulsar,
String clientAppId,
- String
originalPrincipal, String tenant,
-
AuthenticationDataSource authenticationData)
- throws Exception {
+ String originalPrincipal,
String tenant,
+ AuthenticationDataSource
authenticationData,
+ long timeout, TimeUnit unit) {
+ try {
+ validateAdminAccessForTenantAsync(pulsar, clientAppId,
originalPrincipal, tenant, authenticationData)
+ .get(timeout, unit);
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(e);
+ if (realCause instanceof WebApplicationException) {
+ throw (WebApplicationException) realCause;
+ } else {
+ throw new RestException(realCause);
+ }
+ }
+ }
+
+ /**
+ * Checks that the http client role has admin access to the specified
tenant async.
+ *
+ * @param tenant the tenant id
+ */
+ protected CompletableFuture<Void> validateAdminAccessForTenantAsync(String
tenant) {
+ return validateAdminAccessForTenantAsync(pulsar(), clientAppId(),
originalPrincipal(), tenant,
+ clientAuthData());
+ }
+
+ protected static CompletableFuture<Void> validateAdminAccessForTenantAsync(
+ PulsarService pulsar, String clientAppId,
+ String originalPrincipal, String tenant,
+ AuthenticationDataSource authenticationData) {
if (log.isDebugEnabled()) {
log.debug("check admin access on tenant: {} - Authenticated: {} --
role: {}", tenant,
(isClientAuthenticated(clientAppId)), clientAppId);
}
-
- TenantInfo tenantInfo =
pulsar.getPulsarResources().getTenantResources().getTenant(tenant)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Tenant
does not exist"));
-
- if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration().isAuthorizationEnabled()) {
- if (!isClientAuthenticated(clientAppId)) {
- throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
- }
-
-
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId, originalPrincipal);
-
- if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
- CompletableFuture<Boolean> isProxySuperUserFuture;
- CompletableFuture<Boolean> isOriginalPrincipalSuperUserFuture;
- try {
- AuthorizationService authorizationService =
pulsar.getBrokerService().getAuthorizationService();
- isProxySuperUserFuture =
authorizationService.isSuperUser(clientAppId, authenticationData);
-
- isOriginalPrincipalSuperUserFuture =
-
authorizationService.isSuperUser(originalPrincipal, authenticationData);
-
- boolean proxyAuthorized = isProxySuperUserFuture.get()
- || authorizationService.isTenantAdmin(tenant,
clientAppId,
- tenantInfo, authenticationData).get();
- boolean originalPrincipalAuthorized =
- isOriginalPrincipalSuperUserFuture.get() ||
authorizationService.isTenantAdmin(tenant,
- originalPrincipal, tenantInfo,
authenticationData).get();
- if (!proxyAuthorized || !originalPrincipalAuthorized) {
- throw new RestException(Status.UNAUTHORIZED,
- String.format("Proxy not authorized to access
resource (proxy:%s,original:%s)",
- clientAppId, originalPrincipal));
+ return
pulsar.getPulsarResources().getTenantResources().getTenantAsync(tenant)
+ .thenCompose(tenantInfoOptional -> {
+ if (!tenantInfoOptional.isPresent()) {
+ throw new RestException(Status.NOT_FOUND, "Tenant does
not exist");
}
- } catch (InterruptedException | ExecutionException e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
e.getMessage());
- }
- log.debug("Successfully authorized {} (proxied by {}) on
tenant {}",
- originalPrincipal, clientAppId, tenant);
- } else {
- if (!pulsar.getBrokerService()
- .getAuthorizationService()
- .isSuperUser(clientAppId, authenticationData)
- .join()) {
- if (!pulsar.getBrokerService().getAuthorizationService()
- .isTenantAdmin(tenant, clientAppId, tenantInfo,
authenticationData).get()) {
- throw new RestException(Status.UNAUTHORIZED,
- "Don't have permission to administrate
resources on this tenant");
+ TenantInfo tenantInfo = tenantInfoOptional.get();
+ if (pulsar.getConfiguration().isAuthenticationEnabled() &&
pulsar.getConfiguration()
+ .isAuthorizationEnabled()) {
+ if (!isClientAuthenticated(clientAppId)) {
+ throw new RestException(Status.FORBIDDEN, "Need to
authenticate to perform the request");
+ }
+
validateOriginalPrincipal(pulsar.getConfiguration().getProxyRoles(),
clientAppId,
+ originalPrincipal);
+ if
(pulsar.getConfiguration().getProxyRoles().contains(clientAppId)) {
+ AuthorizationService authorizationService =
+
pulsar.getBrokerService().getAuthorizationService();
+ return authorizationService.isTenantAdmin(tenant,
clientAppId, tenantInfo,
+ authenticationData)
+ .thenCompose(isTenantAdmin -> {
+ String debugMsg = "Successfully authorized
{} (proxied by {}) on tenant {}";
+ if (!isTenantAdmin) {
+ return
authorizationService.isSuperUser(clientAppId, authenticationData)
+
.thenCombine(authorizationService.isSuperUser(originalPrincipal,
+
authenticationData),
+ (proxyAuthorized,
originalPrincipalAuthorized) -> {
+ if (!proxyAuthorized
|| !originalPrincipalAuthorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+
String.format("Proxy not authorized to access "
+
+ "resource (proxy:%s,original:%s)"
+ ,
clientAppId, originalPrincipal));
+ } else {
+ if
(log.isDebugEnabled()) {
+
log.debug(debugMsg, originalPrincipal, clientAppId,
+
tenant);
+ }
+ return null;
+ }
+ });
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(debugMsg,
originalPrincipal, clientAppId, tenant);
+ }
+ return
CompletableFuture.completedFuture(null);
+ }
+ });
+ } else {
+ return pulsar.getBrokerService()
+ .getAuthorizationService()
+ .isSuperUser(clientAppId,
authenticationData)
+ .thenCompose(isSuperUser -> {
+ if (!isSuperUser) {
+ return
pulsar.getBrokerService().getAuthorizationService()
+ .isTenantAdmin(tenant,
clientAppId, tenantInfo, authenticationData);
+ } else {
+ return
CompletableFuture.completedFuture(true);
+ }
+ }).thenAccept(authorized -> {
+ if (!authorized) {
+ throw new
RestException(Status.UNAUTHORIZED,
+ "Don't have permission to
administrate resources on this tenant");
+ } else {
+ log.debug("Successfully authorized
{} on tenant {}", clientAppId, tenant);
+ }
+ });
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
}
- }
-
- log.debug("Successfully authorized {} on tenant {}",
clientAppId, tenant);
- }
- }
+ });
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f506feedd33..91ec9adecc7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -786,14 +786,16 @@ public class AdminTest extends
MockedPulsarServiceBaseTest {
namespace, topic);
assertEquals(permission.get(role), actions);
// remove permission
- persistentTopics.revokePermissionsOnTopic(property, cluster,
namespace, topic, role);
-
+ response = mock(AsyncResponse.class);
+ persistentTopics.revokePermissionsOnTopic(response, property, cluster,
namespace, topic, role);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
// verify removed permission
Awaitility.await().untilAsserted(() -> {
Map<String, Set<AuthAction>> p =
persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
assertTrue(p.isEmpty());
});
-
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index d6dd9d91050..22014fc37e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -681,7 +681,11 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace,
topicName, role, expectActions);
- persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace,
topicName, role);
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ persistentTopics.revokePermissionsOnTopic(response, testTenant,
testNamespace, topicName, role);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
Map<String, Set<AuthAction>> permissions =
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace, topicName);
Assert.assertEquals(permissions.get(role), null);
}
@@ -700,7 +704,12 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
persistentTopics.grantPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName, role, expectActions);
- persistentTopics.revokePermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName, role);
+ response = mock(AsyncResponse.class);
+ persistentTopics.revokePermissionsOnTopic(response, testTenant,
testNamespace, partitionedTopicName, role);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
+
Map<String, Set<AuthAction>> permissions =
persistentTopics.getPermissionsOnTopic(testTenant, testNamespace,
partitionedTopicName);
Assert.assertEquals(permissions.get(role), null);