This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new c0e87c0 Avoid potentially blocking calls to metadata on critical
threads (#12339)
c0e87c0 is described below
commit c0e87c04c95ba7226b70ed0ee17fd967e67b6194
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Oct 14 12:33:17 2021 -0700
Avoid potentially blocking calls to metadata on critical threads (#12339)
* Avoid potentially blocking calls to metadata on critical threads
* Fixed log arguments order
* Addressed comments
* Fixed mock in PersistentSubscriptionTest
* Fixed issue in mocked tests
* Fixed test that was force policies modification under the hood
---
.../authorization/PulsarAuthorizationProvider.java | 12 +++---
.../pulsar/broker/service/AbstractTopic.java | 43 ++++++++++++----------
.../pulsar/broker/service/BrokerService.java | 16 ++++----
.../service/persistent/DispatchRateLimiter.java | 8 ++--
4 files changed, 41 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index df95460..60e02fc 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
@@ -47,6 +48,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -331,16 +333,16 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
return updateSubscriptionPermissionAsync(namespace, subscriptionName,
Collections.singleton(role), true);
}
- private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName, Set<String> roles,
- boolean remove) {
- CompletableFuture<Void> result = new CompletableFuture<>();
-
+ private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+
Set<String> roles,
+ boolean
remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
- result.completeExceptionally(e);
+ return FutureUtil.failedFuture(e);
}
+ CompletableFuture<Void> result = new CompletableFuture<>();
final String policiesPath = String.format("/%s/%s/%s", "admin",
POLICIES, namespace.toString());
try {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index c8b1079..62f8fe1 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -134,17 +134,9 @@ public abstract class AbstractTopic implements Topic {
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
- Policies policies = null;
- try {
- policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
- .orElseGet(() -> new Policies());
- } catch (Exception e) {
- log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
- }
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
- updatePublishDispatcher(policies);
+ updatePublishDispatcher(Optional.empty());
}
protected boolean isProducersExceeded() {
@@ -154,11 +146,11 @@ public abstract class AbstractTopic implements Topic {
Policies policies;
try {
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()))
- .orElseGet(() -> new Policies());
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
} catch (Exception e) {
policies = new Policies();
}
+
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers :
brokerService.pulsar()
@@ -201,15 +193,10 @@ public abstract class AbstractTopic implements Topic {
// Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
-
- if (policies == null) {
- policies = new Policies();
- }
} catch (Exception e) {
- log.warn("[{}] Failed to get namespace policies that include
max number of consumers: {}", topic,
- e.getMessage());
policies = new Policies();
}
+
maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
@@ -767,10 +754,10 @@ public abstract class AbstractTopic implements Topic {
}
public void updateMaxPublishRate(Policies policies) {
- updatePublishDispatcher(policies);
+ updatePublishDispatcher(Optional.of(policies));
}
- private void updatePublishDispatcher(Policies policies) {
+ private void updatePublishDispatcher(Optional<Policies> optPolicies) {
//if topic-level policy exists, try to use topic-level publish rate
policy
Optional<PublishRate> topicPublishRate =
getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
@@ -780,9 +767,25 @@ public abstract class AbstractTopic implements Topic {
return;
}
+ Policies policies;
+ try {
+ if (optPolicies.isPresent()) {
+ policies = optPolicies.get();
+ } else {
+ policies =
brokerService.pulsar().getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
TopicName.get(topic).getNamespace()));
+ if (policies == null) {
+ policies = new Policies();
+ }
+ }
+ } catch (Exception e) {
+ log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
+ policies = new Policies();
+ }
+
//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName =
brokerService.pulsar().getConfiguration().getClusterName();
- final PublishRate publishRate = policies != null &&
policies.publishMaxMessageRate != null
+ final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 56dbc9c..98b2f8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2568,11 +2568,11 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private AutoTopicCreationOverride getAutoTopicCreationOverride(final
TopicName topicName) {
try {
- Optional<Policies> policies =
pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
topicName.getNamespace()));
+ Policies policies = pulsar.getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
topicName.getNamespace()));
// If namespace policies have the field set, it will override the
broker-level setting
- if (policies.isPresent() &&
policies.get().autoTopicCreationOverride != null) {
- return policies.get().autoTopicCreationOverride;
+ if (policies != null && policies.autoTopicCreationOverride !=
null) {
+ return policies.autoTopicCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the
default
@@ -2601,11 +2601,11 @@ public class BrokerService implements Closeable,
ZooKeeperCacheListener<Policies
private AutoSubscriptionCreationOverride
getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
- Optional<Policies> policies =
pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES,
topicName.getNamespace()));
+ Policies policies = pulsar.getConfigurationCache().policiesCache()
+ .getDataIfPresent(AdminResource.path(POLICIES,
topicName.getNamespace()));
// If namespace policies have the field set, it will override the
broker-level setting
- if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
- return policies.get().autoSubscriptionCreationOverride;
+ if (policies != null && policies.autoSubscriptionCreationOverride
!= null) {
+ return policies.autoSubscriptionCreationOverride;
}
} catch (Throwable t) {
// Ignoring since if we don't have policies, we fallback on the
default
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 994d274..91eb074 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import java.util.Optional;
@@ -325,17 +324,16 @@ public class DispatchRateLimiter {
public static Optional<Policies> getPolicies(BrokerService brokerService,
String topicName) {
final NamespaceName namespace =
TopicName.get(topicName).getNamespaceObject();
final String path = path(POLICIES, namespace.toString());
- Optional<Policies> policies = Optional.empty();
+ Policies policies = null;
try {
ConfigurationCacheService configurationCacheService =
brokerService.pulsar().getConfigurationCache();
if (configurationCacheService != null) {
- policies =
configurationCacheService.policiesCache().getAsync(path)
-
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(),
SECONDS);
+ policies =
configurationCacheService.policiesCache().getDataIfPresent(path);
}
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
- return policies;
+ return Optional.ofNullable(policies);
}
/**