michaeljmarshall commented on a change in pull request #12339:
URL: https://github.com/apache/pulsar/pull/12339#discussion_r727528015



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -209,18 +196,12 @@ protected boolean isConsumersExceededOnTopic() {
         Integer maxConsumers = 
getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
         if (maxConsumers == null) {
             Policies policies;
-            try {
-                // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
-                policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
-                                TopicName.get(topic).getNamespaceObject())
-                        .orElseGet(() -> new Policies());
+            // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
+            policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+                            TopicName.get(topic).getNamespaceObject())
+                    .orElseGet(() -> new Policies());
 
-                if (policies == null) {
-                    policies = new Policies();
-                }
-            } catch (Exception e) {
-                log.warn("[{}] Failed to get namespace policies that include 
max number of consumers: {}", topic,
-                        e.getMessage());
+            if (policies == null) {

Review comment:
       Can we consider this null check no longer necessary? I recognize that an 
`Optional` can be null, but it probably _shouldn't_ be null.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -630,7 +611,7 @@ protected void checkTopicFenced() throws 
BrokerServiceException {
         }
     }
 
-    protected void internalAddProducer(Producer producer) throws 
BrokerServiceException {
+    protected void internalAddProducer(Producer producer) throws 
BrokerServiceException{

Review comment:
       Nit: I think we can leave the space here.
   ```suggestion
       protected void internalAddProducer(Producer producer) throws 
BrokerServiceException {
   ```

##########
File path: 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
##########
@@ -322,48 +324,42 @@ public void initialize(ServiceConfiguration conf, 
PulsarResources pulsarResource
         return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
Collections.singleton(role), true);
     }
 
-    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName, Set<String> roles,
-            boolean remove) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-
+    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+                                                                      
Set<String> roles,
+                                                                      boolean 
remove) {
         try {
             validatePoliciesReadOnlyAccess();
         } catch (Exception e) {
-            result.completeExceptionally(e);
+            return FutureUtil.failedFuture(e);
         }
 
-        try {
-            Policies policies = 
pulsarResources.getNamespaceResources().getPolicies(namespace)
-                    .orElseThrow(() -> new NotFoundException(namespace + " not 
found"));
-            if (remove) {
-                if 
(policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) 
!= null) {
-                    
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
-                }else {
-                    log.info("[{}] Couldn't find role {} while revoking for 
sub = {}", namespace, subscriptionName, roles);
-                    result.completeExceptionally(new 
IllegalArgumentException("couldn't find subscription"));
-                    return result;
-                }
-            } else {
-                
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName, 
roles);
-            }
-            pulsarResources.getNamespaceResources().setPolicies(namespace, 
(data)->policies);
+        CompletableFuture<Void> future =
+                
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies -> 
{
+                    if (remove) {
+                        if 
(policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName) 
!= null) {
+                            
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName)
+                                    .removeAll(roles);

Review comment:
       Nit: I think we could simplify this with a local variable. As a result, 
we'll only retrieve the reference from the map once. What do you think?
   ```suggestion
                           Set<String> currentRoles = 
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
                           if (currentRoles != null) {
                               currentRoles.removeAll(roles);
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
##########
@@ -180,7 +180,7 @@ public static boolean isDispatchRateNeeded(BrokerService 
brokerService, Optional
             return true;
         }
 
-        policies = policies.isPresent() ? policies : 
getPolicies(brokerService, topicName);
+        policies = policies.isPresent() ? policies :  
getPolicies(brokerService, topicName);

Review comment:
       Nit: we can remove the extra space here.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
##########
@@ -802,9 +780,19 @@ private void updatePublishDispatcher(Policies policies) {
             return;
         }
 
+        Policies policies;
+        try {
+            policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+                            TopicName.get(topic).getNamespaceObject())
+                    .orElseGet(() -> new Policies());

Review comment:
       I see this was added in the most recent commit to help with a mocked 
test. This block doesn't throw any checked exceptions. Do we expect 
`RuntimeExceptions` from it? If we do, should we wrap other similar code blocks 
in this PR with try and catch? If we don't, perhaps we should update the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to