This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new dc14b8bec49 [Broker] Fix call sync method in onPoliciesUpdate method
(#15227)
dc14b8bec49 is described below
commit dc14b8bec49eea90860d6b2f0bba4d22f245c68e
Author: Zixuan Liu <[email protected]>
AuthorDate: Sun Apr 24 11:34:50 2022 +0800
[Broker] Fix call sync method in onPoliciesUpdate method (#15227)
---
.../org/apache/pulsar/broker/service/Consumer.java | 29 ++--
.../org/apache/pulsar/broker/service/Producer.java | 31 +++--
.../service/nonpersistent/NonPersistentTopic.java | 37 +++--
.../broker/service/persistent/PersistentTopic.java | 154 ++++++++++++---------
4 files changed, 144 insertions(+), 107 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 591b71c71f6..b8358d06f3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -765,21 +765,26 @@ public class Consumer {
.add("consumerName", consumerName).add("address",
this.cnx.clientAddress()).toString();
}
- public void checkPermissions() {
+ public CompletableFuture<Void> checkPermissionsAsync() {
TopicName topicName = TopicName.get(subscription.getTopicName());
if (cnx.getBrokerService().getAuthorizationService() != null) {
- try {
- if
(cnx.getBrokerService().getAuthorizationService().canConsume(topicName, appId,
- cnx.getAuthenticationData(), subscription.getName())) {
- return;
- }
- } catch (Exception e) {
- log.warn("[{}] Get unexpected error while autorizing [{}]
{}", appId, subscription.getTopicName(),
- e.getMessage(), e);
- }
- log.info("[{}] is not allowed to consume from topic [{}] anymore",
appId, subscription.getTopicName());
- disconnect();
+ return
cnx.getBrokerService().getAuthorizationService().canConsumeAsync(topicName,
appId,
+ cnx.getAuthenticationData(),
subscription.getName())
+ .handle((ok, e) -> {
+ if (e != null) {
+ log.warn("[{}] Get unexpected error while
autorizing [{}] {}", appId,
+ subscription.getTopicName(),
e.getMessage(), e);
+ }
+
+ if (ok == null || !ok) {
+ log.info("[{}] is not allowed to consume from
topic [{}] anymore", appId,
+ subscription.getTopicName());
+ disconnect();
+ }
+ return null;
+ });
}
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index d72f904019a..8043695bb99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -647,21 +647,26 @@ public class Producer {
return pendingPublishAcks;
}
- public void checkPermissions() {
+ public CompletableFuture<Void> checkPermissionsAsync() {
TopicName topicName = TopicName.get(topic.getName());
if (cnx.getBrokerService().getAuthorizationService() != null) {
- try {
- if
(cnx.getBrokerService().getAuthorizationService().canProduce(topicName, appId,
- cnx.getAuthenticationData())) {
- return;
- }
- } catch (Exception e) {
- log.warn("[{}] Get unexpected error while autorizing [{}]
{}", appId, topic.getName(), e.getMessage(),
- e);
- }
- log.info("[{}] is not allowed to produce on topic [{}] anymore",
appId, topic.getName());
- disconnect();
- }
+ return cnx.getBrokerService().getAuthorizationService()
+ .canProduceAsync(topicName, appId,
cnx.getAuthenticationData())
+ .handle((ok, ex) -> {
+ if (ex != null) {
+ log.warn("[{}] Get unexpected error while
autorizing [{}] {}", appId, topic.getName(),
+ ex.getMessage(), ex);
+ }
+
+ if (ok == null || !ok) {
+ log.info("[{}] is not allowed to produce on topic
[{}] anymore", appId, topic.getName());
+ disconnect();
+ }
+
+ return null;
+ });
+ }
+ return CompletableFuture.completedFuture(null);
}
public void checkEncryption() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 886140eb25b..472cf1cc200 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -978,21 +979,29 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic {
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
schemaValidationEnforced = data.schema_validation_enforced;
- producers.values().forEach(producer -> {
- producer.checkPermissions();
- producer.checkEncryption();
- });
- subscriptions.forEach((subName, sub) ->
sub.getConsumers().forEach(Consumer::checkPermissions));
+ List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
+ producers.values().forEach(producer -> producerCheckFutures.add(
+
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
- if (data.inactive_topic_policies != null) {
- this.inactiveTopicPolicies = data.inactive_topic_policies;
- } else {
- ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
- resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
- ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
- cfg.isBrokerDeleteInactiveTopicsEnabled());
- }
- return checkReplicationAndRetryOnFailure();
+ return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) ->
{
+ List<CompletableFuture<Void>> consumerCheckFutures = new
ArrayList<>();
+ subscriptions.forEach((subName, sub) ->
sub.getConsumers().forEach(consumer -> {
+ consumerCheckFutures.add(consumer.checkPermissionsAsync());
+ }));
+
+ return FutureUtil.waitForAll(consumerCheckFutures)
+ .thenCompose((___) -> {
+ if (data.inactive_topic_policies != null) {
+ this.inactiveTopicPolicies =
data.inactive_topic_policies;
+ } else {
+ ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
+
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
+ cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
+ return checkReplicationAndRetryOnFailure();
+ });
+ });
}
/**
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4f1b8cc31d8..bdfb2b2e2b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2456,40 +2456,49 @@ public class PersistentTopic extends AbstractTopic
this.updateMaxPublishRate(data);
- producers.values().forEach(producer -> {
- producer.checkPermissions();
- producer.checkEncryption();
- });
- subscriptions.forEach((subName, sub) -> {
- sub.getConsumers().forEach(Consumer::checkPermissions);
- Dispatcher dispatcher = sub.getDispatcher();
- // If the topic-level policy already exists, the namespace-level
policy cannot override
- // the topic-level policy.
- if (dispatcher != null
- && (!topicPolicies.isPresent() ||
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
- dispatcher.getRateLimiter().ifPresent(rateLimiter ->
rateLimiter.onPoliciesUpdate(data));
- }
- });
- replicators.forEach((name, replicator) ->
-
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
- );
- checkMessageExpiry();
- CompletableFuture<Void> replicationFuture =
checkReplicationAndRetryOnFailure();
- CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
- CompletableFuture<Void> persistentPoliciesFuture =
checkPersistencePolicies();
- // update rate-limiter if policies updated
- if (this.dispatchRateLimiter.isPresent()) {
- if (!topicPolicies.isPresent() ||
!topicPolicies.get().isDispatchRateSet()) {
- dispatchRateLimiter.get().onPoliciesUpdate(data);
- }
- }
- if (this.subscribeRateLimiter.isPresent()) {
- subscribeRateLimiter.get().onPoliciesUpdate(data);
- }
+ List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
+ producers.values().forEach(producer -> producerCheckFutures.add(
+
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+
+ return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) ->
{
+ List<CompletableFuture<Void>> subscriptionCheckFutures = new
ArrayList<>((int) subscriptions.size());
+ subscriptions.forEach((subName, sub) -> {
+ List<CompletableFuture<Void>> consumerCheckFutures = new
ArrayList<>(sub.getConsumers().size());
+ sub.getConsumers().forEach(consumer ->
consumerCheckFutures.add(consumer.checkPermissionsAsync()));
+
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(()
-> {
+ Dispatcher dispatcher = sub.getDispatcher();
+ // If the topic-level policy already exists, the
namespace-level policy cannot override
+ // the topic-level policy.
+ if (dispatcher != null && (!topicPolicies.isPresent() ||
!topicPolicies.get()
+ .isSubscriptionDispatchRateSet())) {
+ dispatcher.getRateLimiter()
+ .ifPresent(rateLimiter ->
rateLimiter.onPoliciesUpdate(data));
+ }
+ }));
+ });
+ return
FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
+ replicators.forEach((name, replicator) ->
+
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
+ );
+ checkMessageExpiry();
+ CompletableFuture<Void> replicationFuture =
checkReplicationAndRetryOnFailure();
+ CompletableFuture<Void> dedupFuture =
checkDeduplicationStatus();
+ CompletableFuture<Void> persistentPoliciesFuture =
checkPersistencePolicies();
+ // update rate-limiter if policies updated
+ if (this.dispatchRateLimiter.isPresent()) {
+ if (!topicPolicies.isPresent() ||
!topicPolicies.get().isDispatchRateSet()) {
+ dispatchRateLimiter.get().onPoliciesUpdate(data);
+ }
+ }
+ if (this.subscribeRateLimiter.isPresent()) {
+ subscribeRateLimiter.get().onPoliciesUpdate(data);
+ }
- return CompletableFuture.allOf(replicationFuture, dedupFuture,
persistentPoliciesFuture,
- preCreateSubscriptionForCompactionIfNeeded());
+ return CompletableFuture.allOf(replicationFuture, dedupFuture,
persistentPoliciesFuture,
+ preCreateSubscriptionForCompactionIfNeeded());
+ });
+ });
}
/**
@@ -3114,48 +3123,57 @@ public class PersistentTopic extends AbstractTopic
}
});
- subscriptions.forEach((subName, sub) -> {
- sub.getConsumers().forEach(Consumer::checkPermissions);
- Dispatcher dispatcher = sub.getDispatcher();
- if (dispatcher != null) {
-
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+ List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
+ subscriptions.forEach((subName, sub) ->
sub.getConsumers().forEach(consumer -> {
+
consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
+ Dispatcher dispatcher = sub.getDispatcher();
+ if (dispatcher != null) {
+
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+ }
+ }));
+ }));
+
+ FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
+ if (policies.getPublishRate() != null) {
+ updatePublishDispatcher(policies.getPublishRate());
+ } else {
+ updateMaxPublishRate(namespacePolicies.orElse(null));
}
- });
- if (policies.getPublishRate() != null) {
- updatePublishDispatcher(policies.getPublishRate());
- } else {
- updateMaxPublishRate(namespacePolicies.orElse(null));
- }
+ if (policies.isInactiveTopicPoliciesSet()) {
+ inactiveTopicPolicies = policies.getInactiveTopicPolicies();
+ } else if (namespacePolicies.isPresent() &&
namespacePolicies.get().inactive_topic_policies != null) {
+ //topic-level policies is null , so use namespace-level
+ inactiveTopicPolicies =
namespacePolicies.get().inactive_topic_policies;
+ } else {
+ //namespace-level policies is null , so use broker level
+ ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
+
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+ ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
+ cfg.isBrokerDeleteInactiveTopicsEnabled());
+ }
- if (policies.isInactiveTopicPoliciesSet()) {
- inactiveTopicPolicies = policies.getInactiveTopicPolicies();
- } else if (namespacePolicies.isPresent() &&
namespacePolicies.get().inactive_topic_policies != null) {
- //topic-level policies is null , so use namespace-level
- inactiveTopicPolicies =
namespacePolicies.get().inactive_topic_policies;
- } else {
- //namespace-level policies is null , so use broker level
- ServiceConfiguration cfg =
brokerService.getPulsar().getConfiguration();
- resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
- ,
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
- cfg.isBrokerDeleteInactiveTopicsEnabled());
- }
-
updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
-
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
- if (this.subscribeRateLimiter.isPresent()) {
- subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
-
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
- }
- replicators.forEach((name, replicator) -> replicator.getRateLimiter()
- .ifPresent(DispatchRateLimiter::updateDispatchRate));
-
updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null));
+
updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
+
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
+ if (this.subscribeRateLimiter.isPresent()) {
+ subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
+
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
+ }
+ replicators.forEach((name, replicator) ->
replicator.getRateLimiter()
+ .ifPresent(DispatchRateLimiter::updateDispatchRate));
+
updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null));
- checkDeduplicationStatus();
+ checkDeduplicationStatus();
- preCreateSubscriptionForCompactionIfNeeded();
+ preCreateSubscriptionForCompactionIfNeeded();
- // update managed ledger config
- checkPersistencePolicies();
+ // update managed ledger config
+ checkPersistencePolicies();
+ }).exceptionally(e -> {
+ Throwable t = e instanceof CompletionException ? e.getCause() : e;
+ log.error("[{}] update topic policy error: {}", topic,
t.getMessage(), t);
+ return null;
+ });
}
private Optional<Policies> getNamespacePolicies() {