This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 145e985f7b7 [improve][broker] Replace ScheduledExecutorService to
ExecutorService in ModularLoadManagerImpl (#19656)
145e985f7b7 is described below
commit 145e985f7b7ef981d33036b79b24fd5a4e27d43c
Author: houxiaoyu <[email protected]>
AuthorDate: Wed Mar 1 09:43:13 2023 +0800
[improve][broker] Replace ScheduledExecutorService to ExecutorService in
ModularLoadManagerImpl (#19656)
---
.../broker/loadbalance/impl/ModularLoadManagerImpl.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7a933908962..c1afe6007f6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -33,9 +33,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -174,8 +174,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// Pulsar service used to initialize this.
private PulsarService pulsar;
- // Executor service used to regularly update broker data.
- private final ScheduledExecutorService scheduler;
+ // Executor service used to update broker data.
+ private final ExecutorService executors;
// check if given broker can load persistent/non-persistent topic
private final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate;
@@ -215,7 +215,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
preallocatedBundleToBroker = new ConcurrentHashMap<>();
- scheduler = Executors.newSingleThreadScheduledExecutor(
+ executors = Executors.newSingleThreadExecutor(
new
ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
@@ -276,7 +276,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// register listeners for domain changes
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
- scheduler.execute(() -> refreshBrokerToFailureDomainMap());
+ executors.execute(() -> refreshBrokerToFailureDomainMap());
});
loadSheddingPipeline.add(createLoadSheddingStrategy());
@@ -290,7 +290,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
});
try {
- scheduler.execute(ModularLoadManagerImpl.this::updateAll);
+ executors.execute(ModularLoadManagerImpl.this::updateAll);
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
@@ -982,7 +982,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
*/
@Override
public void stop() throws PulsarServerException {
- scheduler.shutdownNow();
+ executors.shutdownNow();
try {
brokersData.close();