codelipenghui commented on a change in pull request #13885:
URL: https://github.com/apache/pulsar/pull/13885#discussion_r789607704



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2436,39 +2436,49 @@ private boolean shouldTopicBeRetained() {
 
         this.updateMaxPublishRate(data);
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            // If the topic-level policy already exists, the namespace-level 
policy cannot override
-            // the topic-level policy.
-            if (dispatcher != null
-                    && (!topicPolicies.isPresent() || 
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
-                dispatcher.getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
-            }
-        });
-        replicators.forEach((name, replicator) ->
-                
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-        );
-        checkMessageExpiry();
-        CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-        CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
-        CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-        // update rate-limiter if policies updated
-        if (this.dispatchRateLimiter.isPresent()) {
-            if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
-                dispatchRateLimiter.get().onPoliciesUpdate(data);
-            }
-        }
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.get().onPoliciesUpdate(data);
-        }
+        List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>();

Review comment:
       ```suggestion
           List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2436,39 +2436,49 @@ private boolean shouldTopicBeRetained() {
 
         this.updateMaxPublishRate(data);
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            // If the topic-level policy already exists, the namespace-level 
policy cannot override
-            // the topic-level policy.
-            if (dispatcher != null
-                    && (!topicPolicies.isPresent() || 
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
-                dispatcher.getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
-            }
-        });
-        replicators.forEach((name, replicator) ->
-                
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-        );
-        checkMessageExpiry();
-        CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-        CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
-        CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-        // update rate-limiter if policies updated
-        if (this.dispatchRateLimiter.isPresent()) {
-            if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
-                dispatchRateLimiter.get().onPoliciesUpdate(data);
-            }
-        }
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.get().onPoliciesUpdate(data);
-        }
+        List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>();
+        producers.values().forEach(producer -> producerCheckFutures.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+
+        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__)->{
+            List<CompletableFuture<Void>> subscriptionCheckFutures = new 
ArrayList<>();

Review comment:
       ```suggestion
               List<CompletableFuture<Void>> subscriptionCheckFutures = new 
ArrayList<>(subscriptions.size());
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2436,39 +2436,49 @@ private boolean shouldTopicBeRetained() {
 
         this.updateMaxPublishRate(data);
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            // If the topic-level policy already exists, the namespace-level 
policy cannot override
-            // the topic-level policy.
-            if (dispatcher != null
-                    && (!topicPolicies.isPresent() || 
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
-                dispatcher.getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
-            }
-        });
-        replicators.forEach((name, replicator) ->
-                
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-        );
-        checkMessageExpiry();
-        CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-        CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
-        CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-        // update rate-limiter if policies updated
-        if (this.dispatchRateLimiter.isPresent()) {
-            if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
-                dispatchRateLimiter.get().onPoliciesUpdate(data);
-            }
-        }
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.get().onPoliciesUpdate(data);
-        }
+        List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>();
+        producers.values().forEach(producer -> producerCheckFutures.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+
+        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__)->{

Review comment:
       ```suggestion
           return FutureUtil.waitForAll(producerCheckFutures).thenCompose( (__) 
-> {
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
##########
@@ -1003,13 +1004,19 @@ public void checkDeduplicationSnapshot() {
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
         schemaValidationEnforced = data.schema_validation_enforced;
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> 
sub.getConsumers().forEach(Consumer::checkPermissions));
+        List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>();

Review comment:
       ```suggestion
           List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2436,39 +2436,49 @@ private boolean shouldTopicBeRetained() {
 
         this.updateMaxPublishRate(data);
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            // If the topic-level policy already exists, the namespace-level 
policy cannot override
-            // the topic-level policy.
-            if (dispatcher != null
-                    && (!topicPolicies.isPresent() || 
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
-                dispatcher.getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
-            }
-        });
-        replicators.forEach((name, replicator) ->
-                
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-        );
-        checkMessageExpiry();
-        CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-        CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
-        CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-        // update rate-limiter if policies updated
-        if (this.dispatchRateLimiter.isPresent()) {
-            if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
-                dispatchRateLimiter.get().onPoliciesUpdate(data);
-            }
-        }
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.get().onPoliciesUpdate(data);
-        }
+        List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>();
+        producers.values().forEach(producer -> producerCheckFutures.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+
+        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__)->{
+            List<CompletableFuture<Void>> subscriptionCheckFutures = new 
ArrayList<>();
+            subscriptions.forEach((subName, sub) -> {
+                List<CompletableFuture<Void>> consumerCheckFutures = new 
ArrayList<>();

Review comment:
       ```suggestion
                   List<CompletableFuture<Void>> consumerCheckFutures = new 
ArrayList<>(sub.getConsumers().size());
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2436,39 +2436,49 @@ private boolean shouldTopicBeRetained() {
 
         this.updateMaxPublishRate(data);
 
-        producers.values().forEach(producer -> {
-            producer.checkPermissions();
-            producer.checkEncryption();
-        });
-        subscriptions.forEach((subName, sub) -> {
-            sub.getConsumers().forEach(Consumer::checkPermissions);
-            Dispatcher dispatcher = sub.getDispatcher();
-            // If the topic-level policy already exists, the namespace-level 
policy cannot override
-            // the topic-level policy.
-            if (dispatcher != null
-                    && (!topicPolicies.isPresent() || 
!topicPolicies.get().isSubscriptionDispatchRateSet())) {
-                dispatcher.getRateLimiter().ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
-            }
-        });
-        replicators.forEach((name, replicator) ->
-                
replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate)
-        );
-        checkMessageExpiry();
-        CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-        CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
-        CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-        // update rate-limiter if policies updated
-        if (this.dispatchRateLimiter.isPresent()) {
-            if (!topicPolicies.isPresent() || 
!topicPolicies.get().isDispatchRateSet()) {
-                dispatchRateLimiter.get().onPoliciesUpdate(data);
-            }
-        }
-        if (this.subscribeRateLimiter.isPresent()) {
-            subscribeRateLimiter.get().onPoliciesUpdate(data);
-        }
+        List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>();
+        producers.values().forEach(producer -> producerCheckFutures.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+
+        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__)->{
+            List<CompletableFuture<Void>> subscriptionCheckFutures = new 
ArrayList<>();
+            subscriptions.forEach((subName, sub) -> {
+                List<CompletableFuture<Void>> consumerCheckFutures = new 
ArrayList<>();
+                sub.getConsumers().forEach(consumer -> 
consumerCheckFutures.add(consumer.checkPermissionsAsync()));
+                
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(()
 -> {
+                    Dispatcher dispatcher = sub.getDispatcher();
+                    // If the topic-level policy already exists, the 
namespace-level policy cannot override
+                    // the topic-level policy.
+                    if (dispatcher != null && (!topicPolicies.isPresent() || 
!topicPolicies.get()
+                            .isSubscriptionDispatchRateSet())) {
+                        dispatcher.getRateLimiter()
+                                .ifPresent(rateLimiter -> 
rateLimiter.onPoliciesUpdate(data));
+                    }
+                }));
+            });
+
+            return 
FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___)->{

Review comment:
       ```suggestion
               return 
FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to