This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 15542f700d7 [fix] [broker] Update topic policies as much as possible 
when some ex was thrown (#21810)
15542f700d7 is described below

commit 15542f700d7f08a5d0cea7424c67f3dbe168e571
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jan 3 21:51:26 2024 +0800

    [fix] [broker] Update topic policies as much as possible when some ex was 
thrown (#21810)
    
    After the topic policies update, there are many components will be updated 
one by one, even if the config of components has not been modified. There are 
the 11 components that need update:
    - `7` rate limiters(`publish`, `dispatch topic-level`, `dispatch 
subscription-level`,  `dispatch resourceGroup-level`, `subscribe API`, 
`replication`, `shadow topic replication`)
    - update ManagedLedger configs(`retention`, `offloader`)
    - start/stop replication
    - start/stop compaction
    - start/stop deduplication
    
    Once a component update fails, the following update will be skipped. It 
would cause a confusing thing: you want to set a retention policy, but it will 
be skipped due to the `update subscribe rate limiter` failure (you did not edit 
the `subscribe rate limitation policy`)
    
    Since none of the components in the above list have any additional 
dependencies for individual updates, ensuring success as much as possible is 
appropriate.
    
    - Update topic policies as much as possible even if some component updates 
fail, all component updates are still in the same thread, and they still update 
one by one, just throw the error later.
    - Rename `updatePublishDispatcher` to `updatePublishRateLimiter`
    
    (cherry picked from commit ed599673c7e60ab5bb02e1fb0615a7ff8e5d6430)
---
 .../pulsar/broker/service/AbstractTopic.java       |   2 +-
 .../pulsar/broker/service/BrokerService.java       |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../broker/service/persistent/PersistentTopic.java | 122 ++++++++++-----------
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  51 +++++++++
 .../org/apache/pulsar/common/util/FutureUtil.java  |   6 +
 6 files changed, 118 insertions(+), 67 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 472a972c0d7..7f5c30a729f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1222,7 +1222,7 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener<TopicP
     /**
      * update topic publish dispatcher for this topic.
      */
-    public void updatePublishDispatcher() {
+    public void updatePublishRateLimiter() {
         synchronized (topicPublishRateLimiterLock) {
             PublishRate publishRate = topicPolicies.getPublishRate().get();
             if (publishRate.publishThrottlingRateInByte > 0 || 
publishRate.publishThrottlingRateInMsg > 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0ab8cffeb36..6860ab57a68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2449,7 +2449,7 @@ public class BrokerService implements Closeable {
             forEachTopic(topic -> {
                 if (topic instanceof AbstractTopic) {
                     ((AbstractTopic) topic).updateBrokerPublishRate();
-                    ((AbstractTopic) topic).updatePublishDispatcher();
+                    ((AbstractTopic) topic).updatePublishRateLimiter();
                 }
             }));
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index a021e7ce94d..e3ecd39b718 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -169,7 +169,7 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                         isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
                         schemaValidationEnforced = 
policies.schema_validation_enforced;
                     }
-                    updatePublishDispatcher();
+                    updatePublishRateLimiter();
                     updateResourceGroupLimiter(policies);
                 });
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 75633860583..cf6b82294dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -318,7 +318,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 .thenAccept(optPolicies -> {
                     if (!optPolicies.isPresent()) {
                         isEncryptionRequired = false;
-                        updatePublishDispatcher();
+                        updatePublishRateLimiter();
                         updateResourceGroupLimiter(new Policies());
                         initializeDispatchRateLimiterIfNeeded();
                         updateSubscribeRateLimiter();
@@ -333,7 +333,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 
                     updateSubscribeRateLimiter();
 
-                    updatePublishDispatcher();
+                    updatePublishRateLimiter();
 
                     updateResourceGroupLimiter(policies);
 
@@ -2447,44 +2447,58 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return CompletableFuture.completedFuture(null);
         }
 
+        // Update props.
+        // The component "EntryFilters" is update in the method 
"updateTopicPolicyByNamespacePolicy(data)".
+        //   see more detail: https://github.com/apache/pulsar/pull/19364.
         updateTopicPolicyByNamespacePolicy(data);
         checkReplicatedSubscriptionControllerState();
         isEncryptionRequired = data.encryption_required;
-
         isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
 
-        schemaValidationEnforced = data.schema_validation_enforced;
+        // Apply policies for components.
+        List<CompletableFuture<Void>> applyPolicyTasks = 
applyUpdatedTopicPolicies();
+        applyPolicyTasks.add(applyUpdatedNamespacePolicies(data));
+        return FutureUtil.waitForAll(applyPolicyTasks)
+            .thenAccept(__ -> log.info("[{}] namespace-level policies updated 
successfully", topic))
+            .exceptionally(ex -> {
+                log.error("[{}] update namespace polices : {} error", 
this.getName(), data, ex);
+                throw FutureUtil.wrapToCompletionException(ex);
+            });
+    }
 
-        updateDispatchRateLimiter();
+    private CompletableFuture<Void> applyUpdatedNamespacePolicies(Policies 
namespaceLevelPolicies) {
+        return FutureUtil.runWithCurrentThread(() -> 
updateResourceGroupLimiter(namespaceLevelPolicies));
+    }
 
-        updateSubscribeRateLimiter();
+    private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
+        List<CompletableFuture<Void>> applyPoliciesFutureList = new 
ArrayList<>();
 
-        updatePublishDispatcher();
+        // Client permission check.
+        subscriptions.forEach((subName, sub) -> {
+            sub.getConsumers().forEach(consumer -> 
applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
+        });
+        producers.values().forEach(producer -> applyPoliciesFutureList.add(
+                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+        // Check message expiry.
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
checkMessageExpiry()));
 
-        updateResourceGroupLimiter(data);
+        // Update rate limiters.
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
updateDispatchRateLimiter()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
updateSubscribeRateLimiter()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
updatePublishRateLimiter()));
 
-        List<CompletableFuture<Void>> producerCheckFutures = new 
ArrayList<>(producers.size());
-        producers.values().forEach(producer -> producerCheckFutures.add(
-                
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
updateSubscriptionsDispatcherRateLimiter()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
+                () -> replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter())));
 
-        return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> 
{
-            return 
updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
-                replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
-                checkMessageExpiry();
-                CompletableFuture<Void> replicationFuture = 
checkReplicationAndRetryOnFailure();
-                CompletableFuture<Void> dedupFuture = 
checkDeduplicationStatus();
-                CompletableFuture<Void> persistentPoliciesFuture = 
checkPersistencePolicies();
-                if (this.subscribeRateLimiter.isPresent()) {
-                    
subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
-                }
+        // Other components.
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
checkReplicationAndRetryOnFailure()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
checkDeduplicationStatus()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() -> 
checkPersistencePolicies()));
+        applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
+                () -> preCreateSubscriptionForCompactionIfNeeded()));
 
-                return CompletableFuture.allOf(replicationFuture, dedupFuture, 
persistentPoliciesFuture,
-                        preCreateSubscriptionForCompactionIfNeeded());
-            });
-        }).exceptionally(ex -> {
-            log.error("[{}] update namespace polices : {} error", 
this.getName(), data, ex);
-            throw FutureUtil.wrapToCompletionException(ex);
-        });
+        return applyPoliciesFutureList;
     }
 
     /**
@@ -3099,49 +3113,29 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         if (policies == null) {
             return;
         }
+        // Update props.
+        // The component "EntryFilters" is update in the method 
"updateTopicPolicy(data)".
+        //   see more detail: https://github.com/apache/pulsar/pull/19364.
         updateTopicPolicy(policies);
-
-        updateDispatchRateLimiter();
         checkReplicatedSubscriptionControllerState();
-        updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
-            updatePublishDispatcher();
-            
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
-            if (this.subscribeRateLimiter.isPresent()) {
-                subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
-                        
subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate()));
-            }
-            replicators.forEach((name, replicator) -> 
replicator.updateRateLimiter());
-            checkMessageExpiry();
-            if (policies.getReplicationClusters() != null) {
-                checkReplicationAndRetryOnFailure();
-            }
 
-            checkDeduplicationStatus();
-
-            preCreateSubscriptionForCompactionIfNeeded();
-
-            // update managed ledger config
-            checkPersistencePolicies();
-        }).exceptionally(e -> {
-            Throwable t = e instanceof CompletionException ? e.getCause() : e;
-            log.error("[{}] update topic policy error: {}", topic, 
t.getMessage(), t);
-            return null;
-        });
+        // Apply policies for components(not contains the specified policies 
which only defined in namespace policies).
+        FutureUtil.waitForAll(applyUpdatedTopicPolicies())
+            .thenAccept(__ -> log.info("[{}] topic-level policies updated 
successfully", topic))
+            .exceptionally(e -> {
+                Throwable t = FutureUtil.unwrapCompletionException(e);
+                log.error("[{}] update topic-level policy error: {}", topic, 
t.getMessage(), t);
+                return null;
+            });
     }
 
-    private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() 
{
-        List<CompletableFuture<Void>> subscriptionCheckFutures = new 
ArrayList<>((int) subscriptions.size());
+    private void updateSubscriptionsDispatcherRateLimiter() {
         subscriptions.forEach((subName, sub) -> {
-            List<CompletableFuture<Void>> consumerCheckFutures = new 
ArrayList<>(sub.getConsumers().size());
-            sub.getConsumers().forEach(consumer -> 
consumerCheckFutures.add(consumer.checkPermissionsAsync()));
-            
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(()
 -> {
-                Dispatcher dispatcher = sub.getDispatcher();
-                if (dispatcher != null) {
-                    dispatcher.updateRateLimiter();
-                }
-            }));
+            Dispatcher dispatcher = sub.getDispatcher();
+            if (dispatcher != null) {
+                dispatcher.updateRateLimiter();
+            }
         });
-        return FutureUtil.waitForAll(subscriptionCheckFutures);
     }
 
     private void 
initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index adfa95e8f4d..376391e6db5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -43,6 +43,8 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.ConfigHelper;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -51,6 +53,7 @@ import 
org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -84,8 +87,11 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -3060,4 +3066,49 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
                 });
     }
 
+    @Test
+    public void testUpdateRetentionWithPartialFailure() throws Exception {
+        String tpName = BrokerTestUtil.newUniqueName("persistent://" + 
myNamespace + "/tp");
+        admin.topics().createNonPartitionedTopic(tpName);
+
+        // Load topic up.
+        admin.topics().getInternalStats(tpName);
+
+        // Inject an error that makes dispatch rate update fail.
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, 
false).join().get();
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+                WhiteboxImpl.getInternalState(persistentTopic, 
"subscriptions");
+        PersistentSubscription mockedSubscription = 
Mockito.mock(PersistentSubscription.class);
+        Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new 
RuntimeException("Mocked error: getDispatcher"));
+        subscriptions.put("mockedSubscription", mockedSubscription);
+
+        // Update namespace-level retention policies.
+        RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1);
+        admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1);
+
+        // Verify: update retention will be success even if other component 
update throws exception.
+        Awaitility.await().untilAsserted(() -> {
+            ManagedLedgerImpl ML = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+            assertEquals(ML.getConfig().getRetentionSizeInMB(), 1);
+            assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 * 
1000);
+        });
+
+        // Update topic-level retention policies.
+        RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2);
+        admin.topics().setRetentionAsync(tpName, retentionPolicies2);
+
+        // Verify: update retention will be success even if other component 
update throws exception.
+        Awaitility.await().untilAsserted(() -> {
+            ManagedLedgerImpl ML = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+            assertEquals(ML.getConfig().getRetentionSizeInMB(), 2);
+            assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 * 
1000);
+        });
+
+        // Cleanup.
+        subscriptions.clear();
+        admin.namespaces().removeRetention(myNamespace);
+        admin.topics().delete(tpName, false);
+    }
+
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 79ad0100a96..352a3909854 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.util;
 
+import com.google.common.util.concurrent.MoreExecutors;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
@@ -54,6 +55,11 @@ public class FutureUtil {
         return CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0]));
     }
 
+    public static CompletableFuture<Void> runWithCurrentThread(Runnable 
runnable) {
+        return CompletableFuture.runAsync(
+                () -> runnable.run(), MoreExecutors.directExecutor());
+    }
+
     /**
      * Return a future that represents the completion of the futures in the 
provided Collection.
      *

Reply via email to