This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 4ce720b40c6 [fix] [broker] Update topic policies as much as possible
when some ex was thrown (#21810)
4ce720b40c6 is described below
commit 4ce720b40c6991f77cc9eeeadd86f5183c6ea6e1
Author: fengyubiao <[email protected]>
AuthorDate: Wed Jan 3 21:51:26 2024 +0800
[fix] [broker] Update topic policies as much as possible when some ex was
thrown (#21810)
After the topic policies update, there are many components will be updated
one by one, even if the config of components has not been modified. There are
the 11 components that need update:
- `7` rate limiters(`publish`, `dispatch topic-level`, `dispatch
subscription-level`, `dispatch resourceGroup-level`, `subscribe API`,
`replication`, `shadow topic replication`)
- update ManagedLedger configs(`retention`, `offloader`)
- start/stop replication
- start/stop compaction
- start/stop deduplication
Once a component update fails, the following update will be skipped. It
would cause a confusing thing: you want to set a retention policy, but it will
be skipped due to the `update subscribe rate limiter` failure (you did not edit
the `subscribe rate limitation policy`)
Since none of the components in the above list have any additional
dependencies for individual updates, ensuring success as much as possible is
appropriate.
- Update topic policies as much as possible even if some component updates
fail, all component updates are still in the same thread, and they still update
one by one, just throw the error later.
- Rename `updatePublishDispatcher` to `updatePublishRateLimiter`
(cherry picked from commit ed599673c7e60ab5bb02e1fb0615a7ff8e5d6430)
---
.../pulsar/broker/service/AbstractTopic.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 2 +-
.../service/nonpersistent/NonPersistentTopic.java | 2 +-
.../broker/service/persistent/PersistentTopic.java | 117 +++++++++++----------
.../pulsar/broker/admin/TopicPoliciesTest.java | 51 +++++++++
.../functions/worker/PulsarFunctionTlsTest.java | 1 +
.../org/apache/pulsar/common/util/FutureUtil.java | 6 ++
7 files changed, 123 insertions(+), 58 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 9f3d4f73770..73dff214902 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1261,7 +1261,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
/**
* update topic publish dispatcher for this topic.
*/
- public void updatePublishDispatcher() {
+ public void updatePublishRateLimiter() {
synchronized (topicPublishRateLimiterLock) {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 ||
publishRate.publishThrottlingRateInMsg > 0) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fd14d43feac..5ba505cc289 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2601,7 +2601,7 @@ public class BrokerService implements Closeable {
forEachTopic(topic -> {
if (topic instanceof AbstractTopic) {
((AbstractTopic) topic).updateBrokerPublishRate();
- ((AbstractTopic) topic).updatePublishDispatcher();
+ ((AbstractTopic) topic).updatePublishRateLimiter();
}
}));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 80cb828feff..e5670ad93c3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -169,7 +169,7 @@ public class NonPersistentTopic extends AbstractTopic
implements Topic, TopicPol
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema =
policies.is_allow_auto_update_schema;
}
- updatePublishDispatcher();
+ updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
});
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index db27057029a..d57ed91ab81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -322,7 +322,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
.thenAccept(optPolicies -> {
if (!optPolicies.isPresent()) {
isEncryptionRequired = false;
- updatePublishDispatcher();
+ updatePublishRateLimiter();
updateResourceGroupLimiter(new Policies());
initializeDispatchRateLimiterIfNeeded();
updateSubscribeRateLimiter();
@@ -337,7 +337,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
updateSubscribeRateLimiter();
- updatePublishDispatcher();
+ updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
@@ -2477,38 +2477,58 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return CompletableFuture.completedFuture(null);
}
+ // Update props.
+ // The component "EntryFilters" is update in the method
"updateTopicPolicyByNamespacePolicy(data)".
+ // see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicyByNamespacePolicy(data);
-
+ checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;
-
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
- updateDispatchRateLimiter();
-
- updateSubscribeRateLimiter();
+ // Apply policies for components.
+ List<CompletableFuture<Void>> applyPolicyTasks =
applyUpdatedTopicPolicies();
+ applyPolicyTasks.add(applyUpdatedNamespacePolicies(data));
+ return FutureUtil.waitForAll(applyPolicyTasks)
+ .thenAccept(__ -> log.info("[{}] namespace-level policies updated
successfully", topic))
+ .exceptionally(ex -> {
+ log.error("[{}] update namespace polices : {} error",
this.getName(), data, ex);
+ throw FutureUtil.wrapToCompletionException(ex);
+ });
+ }
- updatePublishDispatcher();
+ private CompletableFuture<Void> applyUpdatedNamespacePolicies(Policies
namespaceLevelPolicies) {
+ return FutureUtil.runWithCurrentThread(() ->
updateResourceGroupLimiter(namespaceLevelPolicies));
+ }
- updateResourceGroupLimiter(data);
+ private List<CompletableFuture<Void>> applyUpdatedTopicPolicies() {
+ List<CompletableFuture<Void>> applyPoliciesFutureList = new
ArrayList<>();
- List<CompletableFuture<Void>> producerCheckFutures = new
ArrayList<>(producers.size());
- producers.values().forEach(producer -> producerCheckFutures.add(
+ // Client permission check.
+ subscriptions.forEach((subName, sub) -> {
+ sub.getConsumers().forEach(consumer ->
applyPoliciesFutureList.add(consumer.checkPermissionsAsync()));
+ });
+ producers.values().forEach(producer -> applyPoliciesFutureList.add(
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
+ // Check message expiry.
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
checkMessageExpiry()));
- return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) ->
{
- return
updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
- replicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
- checkMessageExpiry();
- CompletableFuture<Void> replicationFuture =
checkReplicationAndRetryOnFailure();
- CompletableFuture<Void> dedupFuture =
checkDeduplicationStatus();
- CompletableFuture<Void> persistentPoliciesFuture =
checkPersistencePolicies();
- return CompletableFuture.allOf(replicationFuture, dedupFuture,
persistentPoliciesFuture,
- preCreateSubscriptionForCompactionIfNeeded());
- });
- }).exceptionally(ex -> {
- log.error("[{}] update namespace polices : {} error",
this.getName(), data, ex);
- throw FutureUtil.wrapToCompletionException(ex);
- });
+ // Update rate limiters.
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
updateDispatchRateLimiter()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
updateSubscribeRateLimiter()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
updatePublishRateLimiter()));
+
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
updateSubscriptionsDispatcherRateLimiter()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
+ () -> replicators.forEach((name, replicator) ->
replicator.updateRateLimiter())));
+
+ // Other components.
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
checkReplicationAndRetryOnFailure()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
checkDeduplicationStatus()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(() ->
checkPersistencePolicies()));
+ applyPoliciesFutureList.add(FutureUtil.runWithCurrentThread(
+ () -> preCreateSubscriptionForCompactionIfNeeded()));
+
+ return applyPoliciesFutureList;
}
/**
@@ -3124,42 +3144,29 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
if (policies == null) {
return;
}
+ // Update props.
+ // The component "EntryFilters" is update in the method
"updateTopicPolicy(data)".
+ // see more detail: https://github.com/apache/pulsar/pull/19364.
updateTopicPolicy(policies);
+ checkReplicatedSubscriptionControllerState();
- updateDispatchRateLimiter();
- updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
- updatePublishDispatcher();
- updateSubscribeRateLimiter();
- replicators.forEach((name, replicator) ->
replicator.updateRateLimiter());
- checkMessageExpiry();
- checkReplicationAndRetryOnFailure();
-
- checkDeduplicationStatus();
-
- preCreateSubscriptionForCompactionIfNeeded();
-
- // update managed ledger config
- checkPersistencePolicies();
- }).exceptionally(e -> {
- Throwable t = e instanceof CompletionException ? e.getCause() : e;
- log.error("[{}] update topic policy error: {}", topic,
t.getMessage(), t);
- return null;
- });
+ // Apply policies for components(not contains the specified policies
which only defined in namespace policies).
+ FutureUtil.waitForAll(applyUpdatedTopicPolicies())
+ .thenAccept(__ -> log.info("[{}] topic-level policies updated
successfully", topic))
+ .exceptionally(e -> {
+ Throwable t = FutureUtil.unwrapCompletionException(e);
+ log.error("[{}] update topic-level policy error: {}", topic,
t.getMessage(), t);
+ return null;
+ });
}
- private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter()
{
- List<CompletableFuture<Void>> subscriptionCheckFutures = new
ArrayList<>((int) subscriptions.size());
+ private void updateSubscriptionsDispatcherRateLimiter() {
subscriptions.forEach((subName, sub) -> {
- List<CompletableFuture<Void>> consumerCheckFutures = new
ArrayList<>(sub.getConsumers().size());
- sub.getConsumers().forEach(consumer ->
consumerCheckFutures.add(consumer.checkPermissionsAsync()));
-
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(()
-> {
- Dispatcher dispatcher = sub.getDispatcher();
- if (dispatcher != null) {
- dispatcher.updateRateLimiter();
- }
- }));
+ Dispatcher dispatcher = sub.getDispatcher();
+ if (dispatcher != null) {
+ dispatcher.updateRateLimiter();
+ }
});
- return FutureUtil.waitForAll(subscriptionCheckFutures);
}
protected CompletableFuture<Void> initTopicPolicy() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 831e308e241..34ef3666992 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -43,6 +43,8 @@ import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
@@ -51,6 +53,7 @@ import
org.apache.pulsar.broker.service.PublishRateLimiterImpl;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -84,8 +87,11 @@ import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -3093,4 +3099,49 @@ public class TopicPoliciesTest extends
MockedPulsarServiceBaseTest {
assertNull(topicPolicies);
}
+ @Test
+ public void testUpdateRetentionWithPartialFailure() throws Exception {
+ String tpName = BrokerTestUtil.newUniqueName("persistent://" +
myNamespace + "/tp");
+ admin.topics().createNonPartitionedTopic(tpName);
+
+ // Load topic up.
+ admin.topics().getInternalStats(tpName);
+
+ // Inject an error that makes dispatch rate update fail.
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(tpName,
false).join().get();
+ ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions =
+ WhiteboxImpl.getInternalState(persistentTopic,
"subscriptions");
+ PersistentSubscription mockedSubscription =
Mockito.mock(PersistentSubscription.class);
+ Mockito.when(mockedSubscription.getDispatcher()).thenThrow(new
RuntimeException("Mocked error: getDispatcher"));
+ subscriptions.put("mockedSubscription", mockedSubscription);
+
+ // Update namespace-level retention policies.
+ RetentionPolicies retentionPolicies1 = new RetentionPolicies(1, 1);
+ admin.namespaces().setRetentionAsync(myNamespace, retentionPolicies1);
+
+ // Verify: update retention will be success even if other component
update throws exception.
+ Awaitility.await().untilAsserted(() -> {
+ ManagedLedgerImpl ML = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ assertEquals(ML.getConfig().getRetentionSizeInMB(), 1);
+ assertEquals(ML.getConfig().getRetentionTimeMillis(), 1 * 60 *
1000);
+ });
+
+ // Update topic-level retention policies.
+ RetentionPolicies retentionPolicies2 = new RetentionPolicies(2, 2);
+ admin.topics().setRetentionAsync(tpName, retentionPolicies2);
+
+ // Verify: update retention will be success even if other component
update throws exception.
+ Awaitility.await().untilAsserted(() -> {
+ ManagedLedgerImpl ML = (ManagedLedgerImpl)
persistentTopic.getManagedLedger();
+ assertEquals(ML.getConfig().getRetentionSizeInMB(), 2);
+ assertEquals(ML.getConfig().getRetentionTimeMillis(), 2 * 60 *
1000);
+ });
+
+ // Cleanup.
+ subscriptions.clear();
+ admin.namespaces().removeRetention(myNamespace);
+ admin.topics().delete(tpName, false);
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index d6c808a337b..c6370dec7b2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -55,6 +55,7 @@ import
org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 45d040fa3fe..44e42f361d9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.common.util;
+import com.google.common.util.concurrent.MoreExecutors;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,6 +53,11 @@ public class FutureUtil {
return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0]));
}
+ public static CompletableFuture<Void> runWithCurrentThread(Runnable
runnable) {
+ return CompletableFuture.runAsync(
+ () -> runnable.run(), MoreExecutors.directExecutor());
+ }
+
public static <T> CompletableFuture<List<T>>
waitForAll(Stream<CompletableFuture<List<T>>> futures) {
return futures.reduce(CompletableFuture.completedFuture(new
ArrayList<>()),
(pre, curr) -> pre.thenCompose(preV -> curr.thenApply(currV ->
{