Technoboy- commented on code in PR #14152:
URL: https://github.com/apache/pulsar/pull/14152#discussion_r853800007
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -256,53 +256,63 @@ public void validateAdminOperationOnTopic(boolean
authoritative) {
validateTopicOwnership(topicName, authoritative);
}
- private void grantPermissions(TopicName topicUri, String role,
Set<AuthAction> actions) {
- try {
- AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
- if (null != authService) {
- authService.grantPermissionAsync(topicUri, actions, role,
null/*additional auth-data json*/).get();
- } else {
- throw new RestException(Status.NOT_IMPLEMENTED, "Authorization
is not enabled");
- }
- log.info("[{}] Successfully granted access for role {}: {} - topic
{}", clientAppId(), role, actions,
- topicUri);
- } catch (InterruptedException e) {
- log.error("[{}] Failed to get permissions for topic {}",
clientAppId(), topicUri, e);
- throw new RestException(e);
- } catch (ExecutionException e) {
- // The IllegalArgumentException and the IllegalStateException were
historically thrown by the
- // grantPermissionAsync method, so we catch them here to ensure
backwards compatibility.
- if (e.getCause() instanceof
MetadataStoreException.NotFoundException
- || e.getCause() instanceof IllegalArgumentException) {
- log.warn("[{}] Failed to set permissions for topic {}:
Namespace does not exist", clientAppId(),
- topicUri, e);
- throw new RestException(Status.NOT_FOUND, "Topic's namespace
does not exist");
- } else if (e.getCause() instanceof
MetadataStoreException.BadVersionException
- || e.getCause() instanceof IllegalStateException) {
- log.warn("[{}] Failed to set permissions for topic {}: {}",
- clientAppId(), topicUri, e.getCause().getMessage(), e);
- throw new RestException(Status.CONFLICT, "Concurrent
modification");
- } else {
- log.error("[{}] Failed to get permissions for topic {}",
clientAppId(), topicUri, e);
- throw new RestException(e);
- }
+ private CompletableFuture<Void> grantPermissionsAsync(TopicName topicUri,
String role, Set<AuthAction> actions) {
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ return authService.grantPermissionAsync(topicUri, actions, role,
null/*additional auth-data json*/)
+ .thenAccept(__ -> log.info("[{}] Successfully granted
access for role {}: {} - topic {}",
+ clientAppId(), role, actions, topicUri))
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here
to ensure backwards compatibility.
+ if (realCause instanceof
MetadataStoreException.NotFoundException
+ || realCause instanceof
IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for topic
{}: Namespace does not exist",
+ clientAppId(), topicUri, realCause);
+ throw new RestException(Status.NOT_FOUND, "Topic's
namespace does not exist");
+ } else if (realCause instanceof
MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException)
{
+ log.warn("[{}] Failed to set permissions for topic
{}: {}", clientAppId(), topicUri,
+ realCause.getMessage(), realCause);
+ throw new RestException(Status.CONFLICT,
"Concurrent modification");
+ } else {
+ log.error("[{}] Failed to get permissions for
topic {}", clientAppId(), topicUri,
+ realCause);
+ throw new RestException(realCause);
+ }
+ });
+ } else {
+ String msg = "Authorization is not enabled";
+ log.error("[{}] Failed to get permissions for topic {}, because
{}", clientAppId(), topicUri, msg);
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_IMPLEMENTED, msg));
}
}
- protected void internalGrantPermissionsOnTopic(String role,
Set<AuthAction> actions) {
+ protected void internalGrantPermissionsOnTopic(final AsyncResponse
asyncResponse, String role,
+ Set<AuthAction> actions) {
// This operation should be reading from zookeeper and it should be
allowed without having admin privileges
- validateAdminAccessForTenant(namespaceName.getTenant());
- validatePoliciesReadOnlyAccess();
-
- PartitionedTopicMetadata meta = getPartitionedTopicMetadata(topicName,
true, false);
- int numPartitions = meta.partitions;
- if (numPartitions > 0) {
- for (int i = 0; i < numPartitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- grantPermissions(topicNamePartition, role, actions);
- }
- }
- grantPermissions(topicName, role, actions);
+ validateAdminAccessForTenantAsync(namespaceName.getTenant())
+ .thenCompose(__ ->
validatePoliciesReadOnlyAccessAsync().thenCompose(unused1 ->
+ getPartitionedTopicMetadataAsync(topicName, true, false)
+ .thenCompose(metadata -> {
+ int numPartitions = metadata.partitions;
+ CompletableFuture<Void> future =
CompletableFuture.completedFuture(null);
+ if (numPartitions > 0) {
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ future = future.thenComposeAsync(unused ->
grantPermissionsAsync(topicNamePartition, role,
Review Comment:
We can't use common-pool resources...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]