Technoboy- commented on code in PR #20068:
URL: https://github.com/apache/pulsar/pull/20068#discussion_r1163478487


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java:
##########
@@ -101,43 +112,57 @@ public CompletableFuture<Boolean> 
canProduceAsync(TopicName topicName, String ro
     @Override
     public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, 
String role,
             AuthenticationDataSource authenticationData, String subscription) {
-        return 
pulsarResources.getNamespaceResources().getPoliciesAsync(topicName.getNamespaceObject())
-                .thenCompose(policies -> {
-                    if (!policies.isPresent()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Policies node couldn't be found for 
topic : {}", topicName);
-                        }
-                    } else {
-                        if (isNotBlank(subscription)) {
-                            // validate if role is authorized to access 
subscription. (skip validation if authorization
-                            // list is empty)
-                            Set<String> roles = policies.get().auth_policies
-                                    
.getSubscriptionAuthentication().get(subscription);
-                            if (roles != null && !roles.isEmpty() && 
!roles.contains(role)) {
-                                log.warn("[{}] is not authorized to subscribe 
on {}-{}", role, topicName, subscription);
-                                return 
CompletableFuture.completedFuture(false);
-                            }
-
-                            // validate if subscription-auth mode is configured
-                            if (policies.get().subscription_auth_mode != null) 
{
-                                switch (policies.get().subscription_auth_mode) 
{
-                                    case Prefix:
-                                        if (!subscription.startsWith(role)) {
-                                            PulsarServerException ex = new 
PulsarServerException(String.format(
-                                                 "Failed to create consumer - 
The subscription name needs to be"
-                                                 + " prefixed by the 
authentication role, like %s-xxxx for topic: %s",
-                                                 role, topicName));
-                                            return FutureUtil.failedFuture(ex);
+        return validateTenantAdminAccess(topicName.getTenant(), role, 
authenticationData).exceptionally(ex -> {
+            log.warn("Client with Role - {} failed to check tenant admin for 
topic - {}. {}", role, topicName,
+                    ex.getMessage());
+            return false;
+        }).thenComposeAsync(isSuperUserOrAdmin -> {

Review Comment:
   If we use `thenComposeAsync`, it's better to define the `executor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to