This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8323a3c4991 [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598) 8323a3c4991 is described below commit 8323a3c49912976aee723787fa67bee4d7d8d846 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Fri Apr 26 23:17:51 2024 +0300 [improve][broker] Don't use forkjoin pool by default for deleting partitioned topics (#22598) --- .../apache/pulsar/broker/resources/NamespaceResources.java | 14 +++++++++++--- .../apache/pulsar/broker/resources/PulsarResources.java | 12 ++++++++++-- .../main/java/org/apache/pulsar/broker/PulsarService.java | 2 +- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 1ba353dccaa..975b23192f9 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources<Policies> { private static final String NAMESPACE_BASE_PATH = "/namespace"; public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) { + this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool()); + } + + public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) { super(configurationStore, Policies.class, operationTimeoutSec); this.configurationStore = configurationStore; isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec); - partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec); + partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor); } public CompletableFuture<List<String>> listNamespacesAsync(String tenant) { @@ -234,9 +240,11 @@ public class NamespaceResources extends BaseResources<Policies> { public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> { private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics"; + private final Executor executor; - public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) { + public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) { super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec); + this.executor = executor; } public CompletableFuture<Void> updatePartitionedTopicAsync(TopicName tn, Function<PartitionedTopicMetadata, @@ -371,7 +379,7 @@ public class NamespaceResources extends BaseResources<Policies> { future.complete(deleteResult); } }); - }); + }, executor); return future; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index fe7ffe0bc7b..cc64eeb52f6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.resources; import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import lombok.Getter; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -57,13 +59,19 @@ public class PulsarResources { public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) { this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC); } + + public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore, + int operationTimeoutSec) { + this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool()); + } + public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore, - int operationTimeoutSec) { + int operationTimeoutSec, Executor executor) { if (configurationMetadataStore != null) { tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec); clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore, operationTimeoutSec); - namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec); + namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec, executor); resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec); } else { tenantResources = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 51dffc20d07..96f3653ea99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1000,7 +1000,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { @VisibleForTesting protected PulsarResources newPulsarResources() { PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, - config.getMetadataStoreOperationTimeoutSeconds()); + config.getMetadataStoreOperationTimeoutSeconds(), getExecutor()); pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster); return pulsarResources;