This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 9c436cb466 Use General ScheduledExecutorThreadPool where possible
(#5472)
9c436cb466 is described below
commit 9c436cb466490702b953482c07acf5a2d562af8d
Author: Dave Marion <[email protected]>
AuthorDate: Mon Apr 14 12:31:50 2025 -0400
Use General ScheduledExecutorThreadPool where possible (#5472)
Noticed several places in the code where different thread
pools were being created based on the GENERAL_THREADPOOL_SIZE
property instead of re-using the one available via the
ServerContext.
---
.../accumulo/core/clientImpl/ClientContext.java | 1 -
.../java/org/apache/accumulo/core/fate/Fate.java | 55 +++++++++++-----------
.../util/ratelimit/SharedRateLimiterFactory.java | 10 ++--
.../org/apache/accumulo/compactor/Compactor.java | 3 +-
.../apache/accumulo/compactor/CompactorTest.java | 4 ++
.../java/org/apache/accumulo/manager/Manager.java | 2 +-
.../tserver/compactions/CompactionService.java | 4 +-
.../accumulo/test/fate/zookeeper/FateIT.java | 11 +++--
8 files changed, 45 insertions(+), 45 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 03bd7b66bc..c9e714f74c 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -1132,5 +1132,4 @@ public class ClientContext implements AccumuloClient {
}
return this.zkLockChecker;
}
-
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 279a1bf099..aac1921914 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -62,7 +62,6 @@ public class Fate<T> {
private final TStore<T> store;
private final T environment;
- private ScheduledThreadPoolExecutor fatePoolWatcher;
private ExecutorService executor;
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
@@ -243,8 +242,9 @@ public class Fate<T> {
/**
* Creates a Fault-tolerant executor.
* <p>
- * Note: Users of this class should call {@link
#startTransactionRunners(AccumuloConfiguration)}
- * to launch the worker threads after creating a Fate object.
+ * Note: Users of this class should call
+ * {@link #startTransactionRunners(AccumuloConfiguration,
ScheduledThreadPoolExecutor)} to launch
+ * the worker threads after creating a Fate object.
*
* @param toLogStrFunc A function that converts Repo to Strings that are
suitable for logging
*/
@@ -256,34 +256,34 @@ public class Fate<T> {
/**
* Launches the specified number of worker threads.
*/
- public void startTransactionRunners(AccumuloConfiguration conf) {
+ public void startTransactionRunners(AccumuloConfiguration conf,
+ ScheduledThreadPoolExecutor serverGeneralScheduledThreadPool) {
final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
- fatePoolWatcher =
-
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
-
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(()
-> {
- // resize the pool if the property changed
- ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
- // If the pool grew, then ensure that there is a TransactionRunner for
each thread
- int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) -
pool.getActiveCount();
- if (needed > 0) {
- for (int i = 0; i < needed; i++) {
- try {
- pool.execute(new TransactionRunner());
- } catch (RejectedExecutionException e) {
- // RejectedExecutionException could be shutting down
- if (pool.isShutdown()) {
- // The exception is expected in this case, no need to spam the
logs.
- log.trace("Error adding transaction runner to FaTE executor
pool.", e);
- } else {
- // This is bad, FaTE may no longer work!
- log.error("Error adding transaction runner to FaTE executor
pool.", e);
+ ThreadPools
+
.watchCriticalScheduledTask(serverGeneralScheduledThreadPool.scheduleWithFixedDelay(()
-> {
+ // resize the pool if the property changed
+ ThreadPools.resizePool(pool, conf,
Property.MANAGER_FATE_THREADPOOL_SIZE);
+ // If the pool grew, then ensure that there is a TransactionRunner
for each thread
+ int needed = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) -
pool.getActiveCount();
+ if (needed > 0) {
+ for (int i = 0; i < needed; i++) {
+ try {
+ pool.execute(new TransactionRunner());
+ } catch (RejectedExecutionException e) {
+ // RejectedExecutionException could be shutting down
+ if (pool.isShutdown()) {
+ // The exception is expected in this case, no need to spam
the logs.
+ log.trace("Error adding transaction runner to FaTE executor
pool.", e);
+ } else {
+ // This is bad, FaTE may no longer work!
+ log.error("Error adding transaction runner to FaTE executor
pool.", e);
+ }
+ break;
+ }
}
- break;
}
- }
- }
- }, 3, 30, SECONDS));
+ }, 3, 30, SECONDS));
executor = pool;
}
@@ -421,7 +421,6 @@ public class Fate<T> {
*/
public void shutdown() {
keepRunning.set(false);
- fatePoolWatcher.shutdown();
executor.shutdown();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index 95bc63fbac..7b086522d5 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.slf4j.Logger;
@@ -52,17 +51,16 @@ public class SharedRateLimiterFactory {
private SharedRateLimiterFactory() {}
/** Get the singleton instance of the SharedRateLimiterFactory. */
- public static synchronized SharedRateLimiterFactory
getInstance(AccumuloConfiguration conf) {
+ public static synchronized SharedRateLimiterFactory
+ getInstance(ScheduledThreadPoolExecutor executor) {
if (instance == null) {
instance = new SharedRateLimiterFactory();
- ScheduledThreadPoolExecutor svc =
-
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
- updateTaskFuture = svc.scheduleWithFixedDelay(Threads
+ updateTaskFuture = executor.scheduleWithFixedDelay(Threads
.createNamedRunnable("SharedRateLimiterFactory update polling",
instance::updateAll),
UPDATE_RATE, UPDATE_RATE, MILLISECONDS);
- ScheduledFuture<?> future = svc.scheduleWithFixedDelay(Threads
+ ScheduledFuture<?> future = executor.scheduleWithFixedDelay(Threads
.createNamedRunnable("SharedRateLimiterFactory report polling",
instance::reportAll),
REPORT_RATE, REPORT_RATE, MILLISECONDS);
ThreadPools.watchNonCriticalScheduledTask(future);
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 109f287ffa..9d95a73429 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -686,8 +686,7 @@ public class Compactor extends AbstractServer
metricsInfo.init(getServiceTags(clientAddress));
var watcher = new CompactionWatcher(getConfiguration());
- var schedExecutor = ThreadPools.getServerThreadPools()
- .createGeneralScheduledExecutorService(getConfiguration());
+ var schedExecutor = getContext().getScheduledExecutor();
startGCLogger(schedExecutor);
startCancelChecker(schedExecutor,
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 74aaf09fa3..3489e8fef8 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -300,6 +300,10 @@ public class CompactorTest {
return List.of();
}
+ @Override
+ protected void startCancelChecker(ScheduledThreadPoolExecutor
schedExecutor,
+ long timeBetweenChecks) {}
+
}
public class FailedCompactor extends SuccessfulCompactor implements
ServerProcessService.Iface {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 616dc7dac4..8cb51d160b 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1353,7 +1353,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
HOURS.toMillis(8), System::currentTimeMillis);
Fate<Manager> f = initializeFateInstance(store);
- f.startTransactionRunners(getConfiguration());
+ f.startTransactionRunners(getConfiguration(),
getContext().getScheduledExecutor());
fateRef.set(f);
fateReadyLatch.countDown();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 52107f19d0..49d5644795 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -119,9 +119,9 @@ public class CompactionService {
this.rateLimit.set(maxRate);
- this.readLimiter =
SharedRateLimiterFactory.getInstance(this.context.getConfiguration())
+ this.readLimiter =
SharedRateLimiterFactory.getInstance(this.context.getScheduledExecutor())
.create("CS_" + serviceName + "_read", () -> rateLimit.get());
- this.writeLimiter =
SharedRateLimiterFactory.getInstance(this.context.getConfiguration())
+ this.writeLimiter =
SharedRateLimiterFactory.getInstance(this.context.getScheduledExecutor())
.create("CS_" + serviceName + "_write", () -> rateLimit.get());
initParams.getRequestedExecutors().forEach((ceid, numThreads) -> {
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index bc92312d41..9962d3941f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.Constants;
@@ -235,7 +236,7 @@ public class FateIT {
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
assertEquals(TStatus.SUBMITTED, getTxStatus(zk, txid));
- fate.startTransactionRunners(config);
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -300,7 +301,7 @@ public class FateIT {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -376,7 +377,7 @@ public class FateIT {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -430,7 +431,7 @@ public class FateIT {
fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
assertEquals(SUBMITTED, getTxStatus(zk, txid));
- fate.startTransactionRunners(config);
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -469,7 +470,7 @@ public class FateIT {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
+ fate.startTransactionRunners(config, new ScheduledThreadPoolExecutor(2));
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);