This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new c0e87c0  Avoid potentially blocking calls to metadata on critical 
threads (#12339)
c0e87c0 is described below

commit c0e87c04c95ba7226b70ed0ee17fd967e67b6194
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Oct 14 12:33:17 2021 -0700

    Avoid potentially blocking calls to metadata on critical threads (#12339)
    
    * Avoid potentially blocking calls to metadata on critical threads
    
    * Fixed log arguments order
    
    * Addressed comments
    
    * Fixed mock in PersistentSubscriptionTest
    
    * Fixed issue in mocked tests
    
    * Fixed test that was force policies modification under the hood
---
 .../authorization/PulsarAuthorizationProvider.java | 12 +++---
 .../pulsar/broker/service/AbstractTopic.java       | 43 ++++++++++++----------
 .../pulsar/broker/service/BrokerService.java       | 16 ++++----
 .../service/persistent/DispatchRateLimiter.java    |  8 ++--
 4 files changed, 41 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index df95460..60e02fc 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.google.common.base.Function;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
@@ -47,6 +48,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -331,16 +333,16 @@ public class PulsarAuthorizationProvider implements 
AuthorizationProvider {
         return updateSubscriptionPermissionAsync(namespace, subscriptionName, 
Collections.singleton(role), true);
     }
 
-    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName, Set<String> roles,
-            boolean remove) {
-        CompletableFuture<Void> result = new CompletableFuture<>();
-
+    private CompletableFuture<Void> 
updateSubscriptionPermissionAsync(NamespaceName namespace, String 
subscriptionName,
+                                                                      
Set<String> roles,
+                                                                      boolean 
remove) {
         try {
             validatePoliciesReadOnlyAccess();
         } catch (Exception e) {
-            result.completeExceptionally(e);
+            return FutureUtil.failedFuture(e);
         }
 
+        CompletableFuture<Void> result = new CompletableFuture<>();
         final String policiesPath = String.format("/%s/%s/%s", "admin", 
POLICIES, namespace.toString());
 
         try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index c8b1079..62f8fe1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -134,17 +134,9 @@ public abstract class AbstractTopic implements Topic {
         
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration()
                 .getBrokerDeleteInactiveTopicsMode());
         this.lastActive = System.nanoTime();
-        Policies policies = null;
-        try {
-            policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
-                    .orElseGet(() -> new Policies());
-        } catch (Exception e) {
-            log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
-        }
         this.preciseTopicPublishRateLimitingEnable =
                 
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
-        updatePublishDispatcher(policies);
+        updatePublishDispatcher(Optional.empty());
     }
 
     protected boolean isProducersExceeded() {
@@ -154,11 +146,11 @@ public abstract class AbstractTopic implements Topic {
             Policies policies;
             try {
                 policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
-                        .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
-                        .orElseGet(() -> new Policies());
+                        .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
             } catch (Exception e) {
                 policies = new Policies();
             }
+
             maxProducers = policies.max_producers_per_topic;
         }
         maxProducers = maxProducers != null ? maxProducers : 
brokerService.pulsar()
@@ -201,15 +193,10 @@ public abstract class AbstractTopic implements Topic {
                 // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks
                 policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
                         .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
-
-                if (policies == null) {
-                    policies = new Policies();
-                }
             } catch (Exception e) {
-                log.warn("[{}] Failed to get namespace policies that include 
max number of consumers: {}", topic,
-                        e.getMessage());
                 policies = new Policies();
             }
+
             maxConsumers = policies.max_consumers_per_topic;
         }
         final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
@@ -767,10 +754,10 @@ public abstract class AbstractTopic implements Topic {
     }
 
     public void updateMaxPublishRate(Policies policies) {
-        updatePublishDispatcher(policies);
+        updatePublishDispatcher(Optional.of(policies));
     }
 
-    private void updatePublishDispatcher(Policies policies) {
+    private void updatePublishDispatcher(Optional<Policies> optPolicies) {
         //if topic-level policy exists, try to use topic-level publish rate 
policy
         Optional<PublishRate> topicPublishRate = 
getTopicPolicies().map(TopicPolicies::getPublishRate);
         if (topicPublishRate.isPresent()) {
@@ -780,9 +767,25 @@ public abstract class AbstractTopic implements Topic {
             return;
         }
 
+        Policies policies;
+        try {
+            if (optPolicies.isPresent()) {
+                policies = optPolicies.get();
+            } else {
+                policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .getDataIfPresent(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()));
+                if (policies == null) {
+                    policies = new Policies();
+                }
+            }
+        } catch (Exception e) {
+            log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
+            policies = new Policies();
+        }
+
         //topic-level policy is not set, try to use namespace-level rate policy
         final String clusterName = 
brokerService.pulsar().getConfiguration().getClusterName();
-        final PublishRate publishRate = policies != null && 
policies.publishMaxMessageRate != null
+        final PublishRate publishRate = policies.publishMaxMessageRate != null
                 ? policies.publishMaxMessageRate.get(clusterName)
                 : null;
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 56dbc9c..98b2f8f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2568,11 +2568,11 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private AutoTopicCreationOverride getAutoTopicCreationOverride(final 
TopicName topicName) {
         try {
-            Optional<Policies> policies = 
pulsar.getConfigurationCache().policiesCache()
-                            .get(AdminResource.path(POLICIES, 
topicName.getNamespace()));
+            Policies policies = pulsar.getConfigurationCache().policiesCache()
+                            .getDataIfPresent(AdminResource.path(POLICIES, 
topicName.getNamespace()));
             // If namespace policies have the field set, it will override the 
broker-level setting
-            if (policies.isPresent() && 
policies.get().autoTopicCreationOverride != null) {
-                return policies.get().autoTopicCreationOverride;
+            if (policies != null && policies.autoTopicCreationOverride != 
null) {
+                return policies.autoTopicCreationOverride;
             }
         } catch (Throwable t) {
             // Ignoring since if we don't have policies, we fallback on the 
default
@@ -2601,11 +2601,11 @@ public class BrokerService implements Closeable, 
ZooKeeperCacheListener<Policies
 
     private AutoSubscriptionCreationOverride 
getAutoSubscriptionCreationOverride(final TopicName topicName) {
         try {
-            Optional<Policies> policies = 
pulsar.getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, 
topicName.getNamespace()));
+            Policies policies = pulsar.getConfigurationCache().policiesCache()
+                    .getDataIfPresent(AdminResource.path(POLICIES, 
topicName.getNamespace()));
             // If namespace policies have the field set, it will override the 
broker-level setting
-            if (policies.isPresent() && 
policies.get().autoSubscriptionCreationOverride != null) {
-                return policies.get().autoSubscriptionCreationOverride;
+            if (policies != null && policies.autoSubscriptionCreationOverride 
!= null) {
+                return policies.autoSubscriptionCreationOverride;
             }
         } catch (Throwable t) {
             // Ignoring since if we don't have policies, we fallback on the 
default
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 994d274..91eb074 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 import java.util.Optional;
@@ -325,17 +324,16 @@ public class DispatchRateLimiter {
     public static Optional<Policies> getPolicies(BrokerService brokerService, 
String topicName) {
         final NamespaceName namespace = 
TopicName.get(topicName).getNamespaceObject();
         final String path = path(POLICIES, namespace.toString());
-        Optional<Policies> policies = Optional.empty();
+        Policies policies = null;
         try {
             ConfigurationCacheService configurationCacheService = 
brokerService.pulsar().getConfigurationCache();
             if (configurationCacheService != null) {
-                policies = 
configurationCacheService.policiesCache().getAsync(path)
-                        
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
 SECONDS);
+                policies = 
configurationCacheService.policiesCache().getDataIfPresent(path);
             }
         } catch (Exception e) {
             log.warn("Failed to get message-rate for {} ", topicName, e);
         }
-        return policies;
+        return Optional.ofNullable(policies);
     }
 
     /**

Reply via email to