This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new fca05bf7d3 Reuse ThreadPools instead of create new ones (#3167)
fca05bf7d3 is described below
commit fca05bf7d302e97d8160e2af4dcf8d9571466880
Author: Dave Marion <[email protected]>
AuthorDate: Fri Mar 17 14:20:09 2023 -0400
Reuse ThreadPools instead of create new ones (#3167)
Reuse ThreadPools where possible instead of recreating each time method is
called
Closes #3062
---
.../java/org/apache/accumulo/core/fate/Fate.java | 24 +++-----
.../coordinator/CompactionCoordinator.java | 55 ++++++++++--------
.../java/org/apache/accumulo/manager/Manager.java | 45 ++++++++++-----
.../metrics/CompactionExecutorsMetrics.java | 4 --
.../accumulo/test/fate/zookeeper/FateIT.java | 67 +++++-----------------
5 files changed, 86 insertions(+), 109 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 28cd5e29af..17e07fc0a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -60,8 +60,8 @@ public class Fate<T> {
private final TStore<T> store;
private final T environment;
- private ScheduledThreadPoolExecutor fatePoolWatcher;
- private ExecutorService executor;
+ private final ScheduledThreadPoolExecutor fatePoolWatcher;
+ private final ExecutorService executor;
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
@@ -220,24 +220,16 @@ public class Fate<T> {
/**
* Creates a Fault-tolerant executor.
- * <p>
- * Note: Users of this class should call {@link
#startTransactionRunners(AccumuloConfiguration)}
- * to launch the worker threads after creating a Fate object.
*
* @param toLogStrFunc A function that converts Repo to Strings that are
suitable for logging
*/
- public Fate(T environment, TStore<T> store, Function<Repo<T>,String>
toLogStrFunc) {
+ public Fate(T environment, TStore<T> store, Function<Repo<T>,String>
toLogStrFunc,
+ AccumuloConfiguration conf) {
this.store = FateLogger.wrap(store, toLogStrFunc);
this.environment = environment;
- }
-
- /**
- * Launches the specified number of worker threads.
- */
- public void startTransactionRunners(AccumuloConfiguration conf) {
final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
- fatePoolWatcher =
+ this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
// resize the pool if the property changed
@@ -262,7 +254,7 @@ public class Fate<T> {
}
}
}, 3, SECONDS));
- executor = pool;
+ this.executor = pool;
}
// get a transaction id back to the requester before doing any work
@@ -400,7 +392,9 @@ public class Fate<T> {
public void shutdown() {
keepRunning.set(false);
fatePoolWatcher.shutdown();
- executor.shutdown();
+ if (executor != null) {
+ executor.shutdown();
+ }
}
}
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index a25498f325..2edbc368c5 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -22,6 +22,8 @@ import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -95,6 +98,7 @@ import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Sets;
import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Uninterruptibles;
public class CompactionCoordinator extends AbstractServer
implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
@@ -130,7 +134,8 @@ public class CompactionCoordinator extends AbstractServer
// Exposed for tests
protected volatile Boolean shutdown = false;
- private ScheduledThreadPoolExecutor schedExecutor;
+ private final ScheduledThreadPoolExecutor schedExecutor;
+ private final ExecutorService summariesExecutor;
protected CompactionCoordinator(ConfigOpts opts, String[] args) {
this(opts, args, null);
@@ -140,6 +145,8 @@ public class CompactionCoordinator extends AbstractServer
super("compaction-coordinator", opts, args);
aconf = conf == null ? super.getConfiguration() : conf;
schedExecutor =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
+ summariesExecutor =
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
+ "Compaction Summary Gatherer", false);
compactionFinalizer = createCompactionFinalizer(schedExecutor);
tserverSet = createLiveTServerSet();
setupSecurity();
@@ -311,39 +318,39 @@ public class CompactionCoordinator extends AbstractServer
}
}
+ summariesExecutor.shutdownNow();
LOG.info("Shutting down");
}
private void updateSummaries() {
- ExecutorService executor =
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
- "Compaction Summary Gatherer", false);
- try {
- Set<String> queuesSeen = new ConcurrentSkipListSet<>();
- tserverSet.getCurrentServers().forEach(tsi -> {
- executor.execute(() -> updateSummaries(tsi, queuesSeen));
- });
+ final ArrayList<Future<?>> tasks = new ArrayList<>();
+ Set<String> queuesSeen = new ConcurrentSkipListSet<>();
- executor.shutdown();
+ tserverSet.getCurrentServers().forEach(tsi -> {
+ tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi,
queuesSeen)));
+ });
- try {
- while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ // Wait for all tasks to complete
+ while (!tasks.isEmpty()) {
+ Iterator<Future<?>> iter = tasks.iterator();
+ while (iter.hasNext()) {
+ Future<?> f = iter.next();
+ if (f.isDone()) {
+ iter.remove();
+ }
}
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
- // remove any queues that were seen in the past, but were not seen in
the latest gathering of
- // summaries
- TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen);
+ // remove any queues that were seen in the past, but were not seen in the
latest gathering of
+ // summaries
+ TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen);
- // add any queues that were never seen before
- queuesSeen.forEach(q -> {
- TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k ->
System.currentTimeMillis());
- });
- } finally {
- executor.shutdownNow();
- }
+ // add any queues that were never seen before
+ queuesSeen.forEach(q -> {
+ TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k ->
System.currentTimeMillis());
+ });
}
private void updateSummaries(TServerInstance tsi, Set<String> queuesSeen) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 36a9c6ec88..3565bc8b89 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -157,6 +157,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.Uninterruptibles;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.api.trace.Span;
@@ -224,6 +225,8 @@ public class Manager extends AbstractServer
private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
private final AtomicBoolean managerUpgrading = new AtomicBoolean(false);
+ private ExecutorService tableInformationStatusPool = null;
+
@Override
public synchronized ManagerState getManagerState() {
return state;
@@ -958,11 +961,10 @@ public class Manager extends AbstractServer
Set<TServerInstance> currentServers,
SortedMap<TabletServerId,TServerStatus> balancerMap) {
final long rpcTimeout =
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
int threads =
getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
- ExecutorService tp = ThreadPools.getServerThreadPools()
- .createExecutorService(getConfiguration(),
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
long start = System.currentTimeMillis();
final SortedMap<TServerInstance,TabletServerStatus> result = new
ConcurrentSkipListMap<>();
final RateLimiter shutdownServerRateLimiter =
RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
+ final ArrayList<Future<?>> tasks = new ArrayList<>();
for (TServerInstance serverInstance : currentServers) {
final TServerInstance server = serverInstance;
if (threads == 0) {
@@ -971,7 +973,7 @@ public class Manager extends AbstractServer
// unresponsive tservers.
sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), MILLISECONDS);
}
- tp.execute(() -> {
+ tasks.add(tableInformationStatusPool.submit(() -> {
try {
Thread t = Thread.currentThread();
String oldName = t.getName();
@@ -1020,17 +1022,28 @@ public class Manager extends AbstractServer
badServers.remove(server);
}
}
- });
- }
- tp.shutdown();
- try {
- tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS);
- } catch (InterruptedException e) {
- log.debug("Interrupted while fetching status");
+ }));
+ }
+ final long timeToWaitForCompletion = Math.max(10000, rpcTimeout / 3);
+ final long startTime = System.nanoTime();
+ final long timeToCancelTasks = startTime +
MILLISECONDS.toNanos(timeToWaitForCompletion);
+ // Wait for all tasks to complete
+ while (!tasks.isEmpty()) {
+ boolean cancel = ((System.nanoTime() - startTime) > timeToCancelTasks);
+ Iterator<Future<?>> iter = tasks.iterator();
+ while (iter.hasNext()) {
+ Future<?> f = iter.next();
+ if (cancel) {
+ f.cancel(true);
+ } else {
+ if (f.isDone()) {
+ iter.remove();
+ }
+ }
+ }
+ Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
}
- tp.shutdownNow();
-
// Threads may still modify map after shutdownNow is called, so create an
immutable snapshot.
SortedMap<TServerInstance,TabletServerStatus> info =
ImmutableSortedMap.copyOf(result);
tserverStatus.forEach((tsi, status) -> balancerMap.put(new
TabletServerIdImpl(tsi),
@@ -1105,6 +1118,9 @@ public class Manager extends AbstractServer
context.getTableManager().addObserver(this);
+ tableInformationStatusPool = ThreadPools.getServerThreadPools()
+ .createExecutorService(getConfiguration(),
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
+
Thread statusThread = Threads.createThread("Status Thread", new
StatusThread());
statusThread.start();
@@ -1188,8 +1204,7 @@ public class Manager extends AbstractServer
context.getZooReaderWriter()),
HOURS.toMillis(8), System::currentTimeMillis);
- Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString);
- f.startTransactionRunners(getConfiguration());
+ Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString,
getConfiguration());
fateRef.set(f);
fateReadyLatch.countDown();
@@ -1261,6 +1276,8 @@ public class Manager extends AbstractServer
throw new IllegalStateException("Exception stopping status thread", e);
}
+ tableInformationStatusPool.shutdownNow();
+
// Signal that we want it to stop, and wait for it to do so.
if (authenticationTokenKeyManager != null) {
authenticationTokenKeyManager.gracefulStop();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index 159528f1b2..78696e6c49 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -63,10 +63,6 @@ public class CompactionExecutorsMetrics implements
MetricsProducer {
}
public CompactionExecutorsMetrics() {
- startUpdateThread();
- }
-
- protected void startUpdateThread() {
ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
.createScheduledExecutorService(1, "compactionExecutorsMetricsPoller",
false);
Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index af441015a1..5834179fc0 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -158,12 +158,11 @@ public class FateIT {
expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
replay(manager, sctx);
- Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString);
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+ Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString, config);
try {
- ConfigurationCopy config = new ConfigurationCopy();
- config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -219,12 +218,11 @@ public class FateIT {
expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
replay(manager, sctx);
- Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString);
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+ Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString, config);
try {
- ConfigurationCopy config = new ConfigurationCopy();
- config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -247,7 +245,7 @@ public class FateIT {
}
@Test
- public void testCancelWhileSubmittedNotRunning() throws Exception {
+ public void testCancelWhileSubmittedAndRunning() throws Exception {
final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT +
Constants.ZFATE, zk);
final AgeOffStore<Manager> store =
new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
@@ -259,45 +257,11 @@ public class FateIT {
expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
replay(manager, sctx);
- Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString);
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-
- // Notice that we did not start the transaction runners
-
- // Wait for the transaction runner to be scheduled.
- UtilWaitThread.sleep(3000);
-
- callStarted = new CountDownLatch(1);
- finishCall = new CountDownLatch(1);
-
- long txid = fate.startTransaction();
- LOG.debug("Starting test testCancelWhileSubmitted with {}",
FateTxId.formatTid(txid));
- assertEquals(NEW, getTxStatus(zk, txid));
- fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID),
true, "Test Op");
- assertEquals(SUBMITTED, getTxStatus(zk, txid));
- assertTrue(fate.cancel(txid));
- }
-
- @Test
- public void testCancelWhileSubmittedAndRunning() throws Exception {
- final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT +
Constants.ZFATE, zk);
- final AgeOffStore<Manager> store =
- new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
-
- Manager manager = createMock(Manager.class);
- ServerContext sctx = createMock(ServerContext.class);
- expect(manager.getContext()).andReturn(sctx).anyTimes();
- expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
- expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
- replay(manager, sctx);
-
- Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString);
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+ Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString, config);
try {
- ConfigurationCopy config = new ConfigurationCopy();
- config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);
@@ -334,12 +298,11 @@ public class FateIT {
expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
replay(manager, sctx);
- Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString);
+ ConfigurationCopy config = new ConfigurationCopy();
+ config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+ config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+ Fate<Manager> fate = new Fate<Manager>(manager, store,
TraceRepo::toLogString, config);
try {
- ConfigurationCopy config = new ConfigurationCopy();
- config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
- config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
- fate.startTransactionRunners(config);
// Wait for the transaction runner to be scheduled.
UtilWaitThread.sleep(3000);