This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 37aca83 Avoid potentially blocking calls to metadata on critical
threads (#12339)
37aca83 is described below
commit 37aca83588ae90c93694fc2afc7349f24bcb8c88
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Oct 14 12:33:17 2021 -0700
Avoid potentially blocking calls to metadata on critical threads (#12339)
* Avoid potentially blocking calls to metadata on critical threads
* Fixed log arguments order
* Addressed comments
* Fixed mock in PersistentSubscriptionTest
* Fixed issue in mocked tests
* Fixed test that was force policies modification under the hood
---
.../authorization/PulsarAuthorizationProvider.java | 67 +++++++++++-----------
.../pulsar/broker/service/AbstractTopic.java | 60 +++++++++----------
.../pulsar/broker/service/BrokerService.java | 35 ++++-------
.../service/persistent/DispatchRateLimiter.java | 7 +--
.../persistent/PersistentSubscriptionTest.java | 9 ++-
5 files changed, 78 insertions(+), 100 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 005707f..e355b12 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authorization;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import com.google.common.base.Function;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
@@ -46,6 +47,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -322,48 +324,43 @@ public class PulsarAuthorizationProvider implements
AuthorizationProvider {
return updateSubscriptionPermissionAsync(namespace, subscriptionName,
Collections.singleton(role), true);
}
- private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName, Set<String> roles,
- boolean remove) {
- CompletableFuture<Void> result = new CompletableFuture<>();
-
+ private CompletableFuture<Void>
updateSubscriptionPermissionAsync(NamespaceName namespace, String
subscriptionName,
+
Set<String> roles,
+ boolean
remove) {
try {
validatePoliciesReadOnlyAccess();
} catch (Exception e) {
- result.completeExceptionally(e);
+ return FutureUtil.failedFuture(e);
}
- try {
- Policies policies =
pulsarResources.getNamespaceResources().getPolicies(namespace)
- .orElseThrow(() -> new NotFoundException(namespace + " not
found"));
- if (remove) {
- if
(policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName)
!= null) {
-
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName).removeAll(roles);
- }else {
- log.info("[{}] Couldn't find role {} while revoking for
sub = {}", namespace, subscriptionName, roles);
- result.completeExceptionally(new
IllegalArgumentException("couldn't find subscription"));
- return result;
- }
- } else {
-
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName,
roles);
- }
- pulsarResources.getNamespaceResources().setPolicies(namespace,
(data)->policies);
+ CompletableFuture<Void> future =
+
pulsarResources.getNamespaceResources().setPoliciesAsync(namespace, policies ->
{
+ if (remove) {
+ Set<String> subscriptionAuth =
+
policies.auth_policies.getSubscriptionAuthentication().get(subscriptionName);
+ if (subscriptionAuth != null) {
+ subscriptionAuth.removeAll(roles);
+ } else {
+ log.info("[{}] Couldn't find role {} while
revoking for sub = {}", namespace,
+ roles, subscriptionName);
+ throw new IllegalArgumentException("couldn't find
subscription");
+ }
+ } else {
+
policies.auth_policies.getSubscriptionAuthentication().put(subscriptionName,
roles);
+ }
+ return policies;
+ }).thenRun(() -> {
+ log.info("[{}] Successfully granted access for role {} for
sub = {}", namespace, subscriptionName,
+ roles);
+ });
- log.info("[{}] Successfully granted access for role {} for sub =
{}", namespace, subscriptionName, roles);
- result.complete(null);
- } catch (NotFoundException e) {
- log.warn("[{}] Failed to set permissions for namespace {}: does
not exist", subscriptionName, namespace);
- result.completeExceptionally(new
IllegalArgumentException("Namespace does not exist" + namespace));
- } catch (BadVersionException e) {
- log.warn("[{}] Failed to set permissions for {} on namespace {}:
concurrent modification", subscriptionName, roles, namespace);
- result.completeExceptionally(new IllegalStateException(
- "Concurrent modification on metadata path: " + namespace +
", " + e.getMessage()));
- } catch (Exception e) {
- log.error("[{}] Failed to get permissions for role {} on namespace
{}", subscriptionName, roles, namespace, e);
- result.completeExceptionally(
- new IllegalStateException("Failed to get permissions for
namespace " + namespace));
- }
+ future.exceptionally(ex -> {
+ log.error("[{}] Failed to get permissions for role {} on namespace
{}", subscriptionName, roles, namespace,
+ ex);
+ return null;
+ });
- return result;
+ return future;
}
private CompletableFuture<Boolean> checkAuthorization(TopicName topicName,
String role, AuthAction action) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 1e5710b..951d602 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -146,31 +146,18 @@ public abstract class AbstractTopic implements Topic {
this.topicMaxMessageSizeCheckIntervalMs =
TimeUnit.SECONDS.toMillis(brokerService.pulsar().getConfiguration()
.getMaxMessageSizeCheckIntervalInSeconds());
this.lastActive = System.nanoTime();
- Policies policies = null;
- try {
- policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
- TopicName.get(topic).getNamespaceObject())
- .orElseGet(() -> new Policies());
- } catch (Exception e) {
- log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
- }
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
- updatePublishDispatcher(policies);
+ updatePublishDispatcher(Optional.empty());
}
protected boolean isProducersExceeded() {
Integer maxProducers =
getTopicPolicies().map(TopicPolicies::getMaxProducerPerTopic).orElse(null);
if (maxProducers == null) {
- Policies policies;
- try {
- policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
- TopicName.get(topic).getNamespaceObject())
- .orElseGet(() -> new Policies());
- } catch (Exception e) {
- policies = new Policies();
- }
+ Policies policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources()
+
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject())
+ .orElseGet(() -> new Policies());
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers :
brokerService.pulsar()
@@ -208,21 +195,12 @@ public abstract class AbstractTopic implements Topic {
protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers =
getTopicPolicies().map(TopicPolicies::getMaxConsumerPerTopic).orElse(null);
if (maxConsumers == null) {
- Policies policies;
- try {
- // Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
- policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(
- TopicName.get(topic).getNamespaceObject())
- .orElseGet(() -> new Policies());
- if (policies == null) {
- policies = new Policies();
- }
- } catch (Exception e) {
- log.warn("[{}] Failed to get namespace policies that include
max number of consumers: {}", topic,
- e.getMessage());
- policies = new Policies();
- }
+ // Use getDataIfPresent from zk cache to make the call
non-blocking and prevent deadlocks
+ Policies policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+ TopicName.get(topic).getNamespaceObject())
+ .orElseGet(() -> new Policies());
+
maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
@@ -789,10 +767,10 @@ public abstract class AbstractTopic implements Topic {
}
public void updateMaxPublishRate(Policies policies) {
- updatePublishDispatcher(policies);
+ updatePublishDispatcher(Optional.of(policies));
}
- private void updatePublishDispatcher(Policies policies) {
+ private void updatePublishDispatcher(Optional<Policies> optPolicies) {
//if topic-level policy exists, try to use topic-level publish rate
policy
Optional<PublishRate> topicPublishRate =
getTopicPolicies().map(TopicPolicies::getPublishRate);
if (topicPublishRate.isPresent()) {
@@ -802,9 +780,23 @@ public abstract class AbstractTopic implements Topic {
return;
}
+ Policies policies;
+ try {
+ if (optPolicies.isPresent()) {
+ policies = optPolicies.get();
+ } else {
+ policies =
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(
+ TopicName.get(topic).getNamespaceObject())
+ .orElseGet(() -> new Policies());
+ }
+ } catch (Exception e) {
+ log.warn("[{}] Error getting policies {} and publish throttling
will be disabled", topic, e.getMessage());
+ policies = new Policies();
+ }
+
//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName =
brokerService.pulsar().getConfiguration().getClusterName();
- final PublishRate publishRate = policies != null &&
policies.publishMaxMessageRate != null
+ final PublishRate publishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bd3c1e9..71c64c4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2535,18 +2535,12 @@ public class BrokerService implements Closeable {
}
private AutoTopicCreationOverride getAutoTopicCreationOverride(final
TopicName topicName) {
- try {
- Optional<Policies> policies =
-
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
- // If namespace policies have the field set, it will override the
broker-level setting
- if (policies.isPresent() &&
policies.get().autoTopicCreationOverride != null) {
- return policies.get().autoTopicCreationOverride;
- }
- } catch (Throwable t) {
- // Ignoring since if we don't have policies, we fallback on the
default
- log.warn("Got exception when reading autoTopicCreateOverride
policy for {}: {};",
- topicName, t.getMessage(), t);
- return null;
+ Optional<Policies> policies =
+ pulsar.getPulsarResources().getNamespaceResources()
+ .getPoliciesIfCached(topicName.getNamespaceObject());
+ // If namespace policies have the field set, it will override the
broker-level setting
+ if (policies.isPresent() && policies.get().autoTopicCreationOverride
!= null) {
+ return policies.get().autoTopicCreationOverride;
}
log.debug("No autoTopicCreateOverride policy found for {}", topicName);
return null;
@@ -2568,18 +2562,11 @@ public class BrokerService implements Closeable {
}
private AutoSubscriptionCreationOverride
getAutoSubscriptionCreationOverride(final TopicName topicName) {
- try {
- Optional<Policies> policies =
-
pulsar.getPulsarResources().getNamespaceResources().getPolicies(topicName.getNamespaceObject());
- // If namespace policies have the field set, it will override the
broker-level setting
- if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
- return policies.get().autoSubscriptionCreationOverride;
- }
- } catch (Throwable t) {
- // Ignoring since if we don't have policies, we fallback on the
default
- log.warn("Got exception when reading
autoSubscriptionCreateOverride policy for {}: {};",
- topicName, t.getMessage(), t);
- return null;
+ Optional<Policies> policies =
+
pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject());
+ // If namespace policies have the field set, it will override the
broker-level setting
+ if (policies.isPresent() &&
policies.get().autoSubscriptionCreationOverride != null) {
+ return policies.get().autoSubscriptionCreationOverride;
}
log.debug("No autoSubscriptionCreateOverride policy found for {}",
topicName);
return null;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 696995d..47bb638 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -320,12 +320,7 @@ public class DispatchRateLimiter {
public static Optional<Policies> getPolicies(BrokerService brokerService,
String topicName) {
final NamespaceName namespace =
TopicName.get(topicName).getNamespaceObject();
- try {
- return
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPolicies(namespace);
- } catch (Exception e) {
- log.warn("Failed to get message-rate for {} ", topicName, e);
- return Optional.empty();
- }
+ return
brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
}
/**
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
index 8be9b81..76f485e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -120,7 +121,13 @@ public class PersistentSubscriptionTest {
svcConfig.setBrokerShutdownTimeoutMs(0L);
svcConfig.setTransactionCoordinatorEnabled(true);
pulsarMock = spy(new PulsarService(svcConfig));
-
doReturn(mock(PulsarResources.class)).when(pulsarMock).getPulsarResources();
+ PulsarResources pulsarResources = mock(PulsarResources.class);
+ doReturn(pulsarResources).when(pulsarMock).getPulsarResources();
+ NamespaceResources namespaceResources = mock(NamespaceResources.class);
+
doReturn(namespaceResources).when(pulsarResources).getNamespaceResources();
+
+ doReturn(Optional.of(new
Policies())).when(namespaceResources).getPoliciesIfCached(any());
+
doReturn(new
InMemTransactionBufferProvider()).when(pulsarMock).getTransactionBufferProvider();
doReturn(new TransactionPendingAckStoreProvider() {
@Override