nodece commented on code in PR #20068:
URL: https://github.com/apache/pulsar/pull/20068#discussion_r1163070081


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java:
##########
@@ -101,43 +112,57 @@ public CompletableFuture<Boolean> 
canProduceAsync(TopicName topicName, String ro
     @Override
     public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, 
String role,
             AuthenticationDataSource authenticationData, String subscription) {
-        return 
pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(policies -> {
-                    if (!policies.isPresent()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Policies node couldn't be found for 
topic : {}", topicName);
-                        }
-                    } else {
-                        if (isNotBlank(subscription)) {
-                            // validate if role is authorized to access 
subscription. (skip validation if authorization
-                            // list is empty)
-                            Set<String> roles = policies.get().auth_policies
-                                    
.getSubscriptionAuthentication().get(subscription);
-                            if (roles != null && !roles.isEmpty() && 
!roles.contains(role)) {
-                                log.warn("[{}] is not authorized to subscribe 
on {}-{}", role, topicName, subscription);
-                                return 
CompletableFuture.completedFuture(false);
-                            }
-
-                            // validate if subscription-auth mode is configured
-                            if (policies.get().subscription_auth_mode != null) 
{
-                                switch (policies.get().subscription_auth_mode) 
{
-                                    case Prefix:
-                                        if (!subscription.startsWith(role)) {
-                                            PulsarServerException ex = new 
PulsarServerException(String.format(
-                                                 "Failed to create consumer - 
The subscription name needs to be"
-                                                 + " prefixed by the 
authentication role, like %s-xxxx for topic: %s",
-                                                 role, topicName));
-                                            return FutureUtil.failedFuture(ex);
+        return validateTenantAdminAccess(topicName.getTenant(), role, 
authenticationData).exceptionally(ex -> {

Review Comment:
   I think you can throw this exception.  Don't use the `exceptionally`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to