This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new dc14b8bec49 [Broker] Fix call sync method in onPoliciesUpdate method 
(#15227)
dc14b8bec49 is described below

commit dc14b8bec49eea90860d6b2f0bba4d22f245c68e
Author: Zixuan Liu <[email protected]>
AuthorDate: Sun Apr 24 11:34:50 2022 +0800

    [Broker] Fix call sync method in onPoliciesUpdate method (#15227)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  29 ++--
 .../org/apache/pulsar/broker/service/Producer.java |  31 +++--
 .../service/nonpersistent/NonPersistentTopic.java  |  37 +++--
 .../broker/service/persistent/PersistentTopic.java | 154 ++++++++++++---------
 4 files changed, 144 insertions(+), 107 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 591b71c71f6..b8358d06f3a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -765,21 +765,26 @@ public class Consumer {
                 .add("consumerName", consumerName).add("address", 
this.cnx.clientAddress()).toString();
     }
 
-    public void checkPermissions() {
+    public CompletableFuture<Void> checkPermissionsAsync() {
         TopicName topicName = TopicName.get(subscription.getTopicName());
         if (cnx.getBrokerService().getAuthorizationService() != null) {
-            try {
-                if 
(cnx.getBrokerService().getAuthorizationService().canConsume(topicName, appId,
-                        cnx.getAuthenticationData(), subscription.getName())) {
-                    return;
-                }
-            } catch (Exception e) {
-                log.warn("[{}] Get unexpected error while autorizing [{}]  
{}", appId, subscription.getTopicName(),
-                        e.getMessage(), e);
-            }
-            log.info("[{}] is not allowed to consume from topic [{}] anymore", 
appId, subscription.getTopicName());
-            disconnect();
+            return 
cnx.getBrokerService().getAuthorizationService().canConsumeAsync(topicName, 
appId,
+                            cnx.getAuthenticationData(), 
subscription.getName())
+                    .handle((ok, e) -> {
+                        if (e != null) {
+                            log.warn("[{}] Get unexpected error while 
autorizing [{}]  {}", appId,
+                                    subscription.getTopicName(), 
e.getMessage(), e);
+                        }
+
+                        if (ok == null || !ok) {
+                            log.info("[{}] is not allowed to consume from 
topic [{}] anymore", appId,
+                                    subscription.getTopicName());
+                            disconnect();
+                        }
+                        return null;
+                    });
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index d72f904019a..8043695bb99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -647,21 +647,26 @@ public class Producer {
         return pendingPublishAcks;
     }
 
-    public void checkPermissions() {
+    public CompletableFuture<Void> checkPermissionsAsync() {
         TopicName topicName = TopicName.get(topic.getName());
         if (cnx.getBrokerService().getAuthorizationService() != null) {
-            try {
-                if 
(cnx.getBrokerService().getAuthorizationService().canProduce(topicName, appId,
-                        cnx.getAuthenticationData())) {
-                    return;
-                }
-            } catch (Exception e) {
-                log.warn("[{}] Get unexpected error while autorizing [{}]  
{}", appId, topic.getName(), e.getMessage(),
-                        e);
-            }
-            log.info("[{}] is not allowed to produce on topic [{}] anymore", 
appId, topic.getName());
-            disconnect();
-        }
+            return cnx.getBrokerService().getAuthorizationService()
+                    .canProduceAsync(topicName, appId, 
cnx.getAuthenticationData())
+                    .handle((ok, ex) -> {
+                        if (ex != null) {
+                            log.warn("[{}] Get unexpected error while 
autorizing [{}]  {}", appId, topic.getName(),
+                                    ex.getMessage(), ex);
+                        }
+
+                        if (ok == null || !ok) {
+                            log.info("[{}] is not allowed to produce on topic 
[{}] anymore", appId, topic.getName());
+                            disconnect();
+                        }
+
+                        return null;
+                    });
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     public void checkEncryption() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 886140eb25b..472cf1cc200 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -978,21 +979,29 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
         schemaValidationEnforced = data.schema_validation_enforced;
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> 
sub.getConsumers().forEach(Consumer::checkPermissions));
+        List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
+        producers.values().forEach(producer -> producerCheckFutures.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
 
-        if (data.inactive_topic_policies != null) {
-            this.inactiveTopicPolicies = data.inactive_topic_policies;
-        } else {
-            ServiceConfiguration cfg = 
brokerService.getPulsar().getConfiguration();
-            resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
-                    , 
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
-                    cfg.isBrokerDeleteInactiveTopicsEnabled());
-        }
-        return checkReplicationAndRetryOnFailure();
+        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> 
{
+            List<CompletableFuture<Void>> consumerCheckFutures = new 
ArrayList<>();
+            subscriptions.forEach((subName, sub) -> 
sub.getConsumers().forEach(consumer -> {
+                consumerCheckFutures.add(consumer.checkPermissionsAsync());
+            }));
+
+            return FutureUtil.waitForAll(consumerCheckFutures)
+                    .thenCompose((___) -> {
+                        if (data.inactive_topic_policies != null) {
+                            this.inactiveTopicPolicies = 
data.inactive_topic_policies;
+                        } else {
+                            ServiceConfiguration cfg = 
brokerService.getPulsar().getConfiguration();
+                            
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                                    , 
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
+                                    cfg.isBrokerDeleteInactiveTopicsEnabled());
+                        }
+                        return checkReplicationAndRetryOnFailure();
+                    });
+        });
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4f1b8cc31d8..bdfb2b2e2b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2456,40 +2456,49 @@ public class PersistentTopic extends AbstractTopic
 
         this.updateMaxPublishRate(data);
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            // If the topic-level policy already exists, the namespace-level 
policy cannot override
-            // the topic-level policy.
-            if (dispatcher != null
-                    && (!topicPolicies.isPresent() || 
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
-                dispatcher.getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
-            }
-        });
-        replicators.forEach((name, replicator) ->
-                
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-        );
-        checkMessageExpiry();
-        CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-        CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
-        CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-        // update rate-limiter if policies updated
-        if (this.dispatchRateLimiter.isPresent()) {
-            if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
-                dispatchRateLimiter.get().onPoliciesUpdate(data);
-            }
-        }
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.get().onPoliciesUpdate(data);
-        }
+        List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
+        producers.values().forEach(producer -> producerCheckFutures.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+
+        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> 
{
+            List<CompletableFuture<Void>> subscriptionCheckFutures = new 
ArrayList<>((int) subscriptions.size());
+            subscriptions.forEach((subName, sub) -> {
+                List<CompletableFuture<Void>> consumerCheckFutures = new 
ArrayList<>(sub.getConsumers().size());
+                sub.getConsumers().forEach(consumer -> 
consumerCheckFutures.add(consumer.checkPermissionsAsync()));
+                
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(()
 -> {
+                    Dispatcher dispatcher = sub.getDispatcher();
+                    // If the topic-level policy already exists, the 
namespace-level policy cannot override
+                    // the topic-level policy.
+                    if (dispatcher != null && (!topicPolicies.isPresent() || 
!topicPolicies.get()
+                            .isSubscriptionDispatchRateSet())) {
+                        dispatcher.getRateLimiter()
+                                .ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
+                    }
+                }));
+            });
 
+            return 
FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
+                replicators.forEach((name, replicator) ->
+                        
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
+                );
+                checkMessageExpiry();
+                CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
+                CompletableFuture<Void> dedupFuture = 
checkDeduplicationStatus();
+                CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
+                // update rate-limiter if policies updated
+                if (this.dispatchRateLimiter.isPresent()) {
+                    if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
+                        dispatchRateLimiter.get().onPoliciesUpdate(data);
+                    }
+                }
+                if (this.subscribeRateLimiter.isPresent()) {
+                    subscribeRateLimiter.get().onPoliciesUpdate(data);
+                }
 
-        return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture,
-                preCreateSubscriptionForCompactionIfNeeded());
+                return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture,
+                        preCreateSubscriptionForCompactionIfNeeded());
+            });
+        });
     }
 
     /**
@@ -3114,48 +3123,57 @@ public class PersistentTopic extends AbstractTopic
             }
         });
 
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            if (dispatcher != null) {
-                
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+        List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
+        subscriptions.forEach((subName, sub) -> 
sub.getConsumers().forEach(consumer -> {
+            
consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
+                Dispatcher dispatcher = sub.getDispatcher();
+                if (dispatcher != null) {
+                    
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
+                }
+            }));
+        }));
+
+        FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
+            if (policies.getPublishRate() != null) {
+                updatePublishDispatcher(policies.getPublishRate());
+            } else {
+                updateMaxPublishRate(namespacePolicies.orElse(null));
             }
-        });
 
-        if (policies.getPublishRate() != null) {
-            updatePublishDispatcher(policies.getPublishRate());
-        } else {
-            updateMaxPublishRate(namespacePolicies.orElse(null));
-        }
+            if (policies.isInactiveTopicPoliciesSet()) {
+                inactiveTopicPolicies = policies.getInactiveTopicPolicies();
+            } else if (namespacePolicies.isPresent() && 
namespacePolicies.get().inactive_topic_policies != null) {
+                //topic-level policies is null , so use namespace-level
+                inactiveTopicPolicies = 
namespacePolicies.get().inactive_topic_policies;
+            } else {
+                //namespace-level policies is null , so use broker level
+                ServiceConfiguration cfg = 
brokerService.getPulsar().getConfiguration();
+                
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
+                        , 
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
+                        cfg.isBrokerDeleteInactiveTopicsEnabled());
+            }
 
-        if (policies.isInactiveTopicPoliciesSet()) {
-            inactiveTopicPolicies = policies.getInactiveTopicPolicies();
-        } else if (namespacePolicies.isPresent() && 
namespacePolicies.get().inactive_topic_policies != null) {
-            //topic-level policies is null , so use namespace-level
-            inactiveTopicPolicies = 
namespacePolicies.get().inactive_topic_policies;
-        } else {
-            //namespace-level policies is null , so use broker level
-            ServiceConfiguration cfg = 
brokerService.getPulsar().getConfiguration();
-            resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
-                    , 
cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(),
-                    cfg.isBrokerDeleteInactiveTopicsEnabled());
-        }
-        
updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
-        
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
-                
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
-        }
-        replicators.forEach((name, replicator) -> replicator.getRateLimiter()
-                .ifPresent(DispatchRateLimiter::updateDispatchRate));
-        
updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null));
+            
updateUnackedMessagesAppliedOnSubscription(namespacePolicies.orElse(null));
+            
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
+            if (this.subscribeRateLimiter.isPresent()) {
+                subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
+                        
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
+            }
+            replicators.forEach((name, replicator) -> 
replicator.getRateLimiter()
+                    .ifPresent(DispatchRateLimiter::updateDispatchRate));
+            
updateUnackedMessagesExceededOnConsumer(namespacePolicies.orElse(null));
 
-        checkDeduplicationStatus();
+            checkDeduplicationStatus();
 
-        preCreateSubscriptionForCompactionIfNeeded();
+            preCreateSubscriptionForCompactionIfNeeded();
 
-        // update managed ledger config
-        checkPersistencePolicies();
+            // update managed ledger config
+            checkPersistencePolicies();
+        }).exceptionally(e -> {
+            Throwable t = e instanceof CompletionException ? e.getCause() : e;
+            log.error("[{}] update topic policy error: {}", topic, 
t.getMessage(), t);
+            return null;
+        });
     }
 
     private Optional<Policies> getNamespacePolicies() {

Reply via email to