Technoboy- commented on code in PR #16324:
URL: https://github.com/apache/pulsar/pull/16324#discussion_r912787982
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java:
##########
@@ -701,97 +700,113 @@ protected void
internalDeleteNamespaceBundleForcefully(String bundleRange, boole
}
}
- protected void internalGrantPermissionOnNamespace(String role,
Set<AuthAction> actions) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.GRANT_PERMISSION);
- checkNotNull(role, "Role should not be null");
- checkNotNull(actions, "Actions should not be null");
-
- try {
- AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
- if (null != authService) {
- authService.grantPermissionAsync(namespaceName, actions, role,
null/*additional auth-data json*/)
- .get();
- } else {
- throw new RestException(Status.NOT_IMPLEMENTED, "Authorization
is not enabled");
- }
- } catch (InterruptedException e) {
- log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
- throw new RestException(e);
- } catch (ExecutionException e) {
- // The IllegalArgumentException and the IllegalStateException were
historically thrown by the
- // grantPermissionAsync method, so we catch them here to ensure
backwards compatibility.
- if (e.getCause() instanceof
MetadataStoreException.NotFoundException
- || e.getCause() instanceof IllegalArgumentException) {
- log.warn("[{}] Failed to set permissions for namespace {}:
does not exist", clientAppId(),
- namespaceName, e);
- throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
- } else if (e.getCause() instanceof
MetadataStoreException.BadVersionException
- || e.getCause() instanceof IllegalStateException) {
- log.warn("[{}] Failed to set permissions for namespace {}: {}",
- clientAppId(), namespaceName,
e.getCause().getMessage(), e);
- throw new RestException(Status.CONFLICT, "Concurrent
modification");
- } else {
- log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void>
internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GRANT_PERMISSION)
+ .thenAccept(__ -> {
+ checkNotNull(role, "Role should not be null");
+ checkNotNull(actions, "Actions should not be null");
+ }).thenCompose(__ ->
+ authService.grantPermissionAsync(namespaceName,
actions, role, null))
+ .thenAccept(unused -> {
+ log.info("[{}] Successfully granted access for role
{}: {} - namespaceName {}",
+ clientAppId(), role, actions, namespaceName);})
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here
to ensure backwards compatibility.
+ if (realCause instanceof
MetadataStoreException.NotFoundException
+ || realCause instanceof
IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for
namespace {}: does not exist", clientAppId(),
+ namespaceName, ex);
+ throw new RestException(Status.NOT_FOUND, "Topic's
namespace does not exist");
+ } else if (realCause instanceof
MetadataStoreException.BadVersionException
+ || realCause instanceof IllegalStateException)
{
+ log.warn("[{}] Failed to set permissions for
namespace {}: {}",
+ clientAppId(), namespaceName,
ex.getCause().getMessage(), ex);
+ throw new RestException(Status.CONFLICT,
"Concurrent modification");
+ } else {
+ log.error("[{}] Failed to get permissions for
namespace {}", clientAppId(), namespaceName, ex);
+ throw new RestException(realCause);
+ }
+ });
+ } else {
+ String msg = "Authorization is not enabled";
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_IMPLEMENTED, msg));
}
}
- protected void internalGrantPermissionOnSubscription(String subscription,
Set<String> roles) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.GRANT_PERMISSION);
- checkNotNull(subscription, "Subscription should not be null");
- checkNotNull(roles, "Roles should not be null");
-
- try {
- AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
- if (null != authService) {
- authService.grantSubscriptionPermissionAsync(namespaceName,
subscription, roles,
- null/* additional auth-data json */).get();
- } else {
- throw new RestException(Status.NOT_IMPLEMENTED, "Authorization
is not enabled");
- }
- } catch (InterruptedException e) {
- log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
- throw new RestException(e);
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IllegalArgumentException) {
- log.warn("[{}] Failed to set permissions for namespace {}:
does not exist", clientAppId(),
- namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace does not
exist");
- } else if (e.getCause() instanceof IllegalStateException) {
- log.warn("[{}] Failed to set permissions for namespace {}:
concurrent modification", clientAppId(),
- namespaceName);
- throw new RestException(Status.CONFLICT, "Concurrent
modification");
- } else {
- log.error("[{}] Failed to get permissions for namespace {}",
clientAppId(), namespaceName, e);
- throw new RestException(e);
- }
+ protected CompletableFuture<Void>
internalGrantPermissionOnSubscriptionAsync(String subscription,
+
Set<String> roles) {
+ AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
+ if (null != authService) {
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.GRANT_PERMISSION)
+ .thenAccept(__ -> {
+ checkNotNull(subscription, "Subscription should not be
null");
+ checkNotNull(roles, "Roles should not be null");
+ })
+ .thenCompose(__ ->
authService.grantSubscriptionPermissionAsync(namespaceName, subscription,
+ roles, null))
+ .thenAccept(unused -> {
+ log.info("[{}] Successfully granted permssion on
subscription for role {}:{} - "
+ + "namespaceName {}", clientAppId(), roles,
subscription, namespaceName);
+ })
+ .exceptionally(ex -> {
+ Throwable realCause =
FutureUtil.unwrapCompletionException(ex);
+ //The IllegalArgumentException and the
IllegalStateException were historically thrown by the
+ // grantPermissionAsync method, so we catch them here
to ensure backwards compatibility.
+ if (realCause.getCause() instanceof
IllegalArgumentException) {
+ log.warn("[{}] Failed to set permissions for
namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.NOT_FOUND,
"Namespace does not exist");
+ } else if (realCause.getCause() instanceof
IllegalStateException) {
+ log.warn("[{}] Failed to set permissions for
namespace {}: concurrent modification", clientAppId(),
+ namespaceName);
+ throw new RestException(Status.CONFLICT,
"Concurrent modification");
+ } else {
+ log.error("[{}] Failed to get permissions for
namespace {}",
+ clientAppId(), namespaceName, realCause);
+ throw new RestException(realCause);
+ }
+ });
+ } else {
+ String msg = "Authorization is not enabled";
+ return FutureUtil.failedFuture(new
RestException(Status.NOT_IMPLEMENTED, msg));
}
}
- protected void internalRevokePermissionsOnNamespace(String role) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.REVOKE_PERMISSION);
- validatePoliciesReadOnlyAccess();
- checkNotNull(role, "Role should not be null");
- updatePolicies(namespaceName, policies ->{
- policies.auth_policies.getNamespaceAuthentication().remove(role);
- return policies;
- });
+ protected CompletableFuture<Void>
internalRevokePermissionsOnNamespaceAsync(String role) {
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.REVOKE_PERMISSION)
+ .thenAccept(__ -> checkNotNull(role, "Role should not be
null"))
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies
-> {
+
policies.auth_policies.getNamespaceAuthentication().remove(role);
+ return policies;
+ }));
}
- protected void internalRevokePermissionsOnSubscription(String
subscriptionName, String role) {
- validateNamespaceOperation(namespaceName,
NamespaceOperation.REVOKE_PERMISSION);
- validatePoliciesReadOnlyAccess();
- checkNotNull(subscriptionName, "SubscriptionName should not be null");
- checkNotNull(role, "Role should not be null");
-
+ protected CompletableFuture<Void>
internalRevokePermissionsOnSubscriptionAsync(String subscriptionName,
+
String role) {
AuthorizationService authService =
pulsar().getBrokerService().getAuthorizationService();
if (null != authService) {
- authService.revokeSubscriptionPermissionAsync(namespaceName,
subscriptionName, role,
- null/* additional auth-data json */);
+ return validateNamespaceOperationAsync(namespaceName,
NamespaceOperation.REVOKE_PERMISSION)
+ .thenAccept(__ -> {
+ checkNotNull(subscriptionName, "SubscriptionName
should not be null");
+ checkNotNull(role, "Role should not be null");
+ })
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ ->
authService.revokeSubscriptionPermissionAsync(namespaceName,
+ subscriptionName, role, null/* additional
auth-data json */))
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to revoke permission on
subscription for role {}:{} - namespace: {}",
+ clientAppId(), role, subscriptionName,
namespaceName);
+ return null;
Review Comment:
Seems we can't add `exceptionally` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]