This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new fca05bf7d3 Reuse ThreadPools instead of create new ones (#3167)
fca05bf7d3 is described below

commit fca05bf7d302e97d8160e2af4dcf8d9571466880
Author: Dave Marion <[email protected]>
AuthorDate: Fri Mar 17 14:20:09 2023 -0400

    Reuse ThreadPools instead of create new ones (#3167)
    
    Reuse ThreadPools where possible instead of recreating each time method is 
called
    
    Closes #3062
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 24 +++-----
 .../coordinator/CompactionCoordinator.java         | 55 ++++++++++--------
 .../java/org/apache/accumulo/manager/Manager.java  | 45 ++++++++++-----
 .../metrics/CompactionExecutorsMetrics.java        |  4 --
 .../accumulo/test/fate/zookeeper/FateIT.java       | 67 +++++-----------------
 5 files changed, 86 insertions(+), 109 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 28cd5e29af..17e07fc0a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -60,8 +60,8 @@ public class Fate<T> {
 
   private final TStore<T> store;
   private final T environment;
-  private ScheduledThreadPoolExecutor fatePoolWatcher;
-  private ExecutorService executor;
+  private final ScheduledThreadPoolExecutor fatePoolWatcher;
+  private final ExecutorService executor;
 
   private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, 
SUCCESSFUL, UNKNOWN);
 
@@ -220,24 +220,16 @@ public class Fate<T> {
 
   /**
    * Creates a Fault-tolerant executor.
-   * <p>
-   * Note: Users of this class should call {@link 
#startTransactionRunners(AccumuloConfiguration)}
-   * to launch the worker threads after creating a Fate object.
    *
    * @param toLogStrFunc A function that converts Repo to Strings that are 
suitable for logging
    */
-  public Fate(T environment, TStore<T> store, Function<Repo<T>,String> 
toLogStrFunc) {
+  public Fate(T environment, TStore<T> store, Function<Repo<T>,String> 
toLogStrFunc,
+      AccumuloConfiguration conf) {
     this.store = FateLogger.wrap(store, toLogStrFunc);
     this.environment = environment;
-  }
-
-  /**
-   * Launches the specified number of worker threads.
-   */
-  public void startTransactionRunners(AccumuloConfiguration conf) {
     final ThreadPoolExecutor pool = 
ThreadPools.getServerThreadPools().createExecutorService(conf,
         Property.MANAGER_FATE_THREADPOOL_SIZE, true);
-    fatePoolWatcher =
+    this.fatePoolWatcher =
         
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
     ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
       // resize the pool if the property changed
@@ -262,7 +254,7 @@ public class Fate<T> {
         }
       }
     }, 3, SECONDS));
-    executor = pool;
+    this.executor = pool;
   }
 
   // get a transaction id back to the requester before doing any work
@@ -400,7 +392,9 @@ public class Fate<T> {
   public void shutdown() {
     keepRunning.set(false);
     fatePoolWatcher.shutdown();
-    executor.shutdown();
+    if (executor != null) {
+      executor.shutdown();
+    }
   }
 
 }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index a25498f325..2edbc368c5 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -22,6 +22,8 @@ import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
 
 import java.lang.reflect.InvocationTargetException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -95,6 +98,7 @@ import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.collect.Sets;
 import com.google.common.net.HostAndPort;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 public class CompactionCoordinator extends AbstractServer
     implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener {
@@ -130,7 +134,8 @@ public class CompactionCoordinator extends AbstractServer
   // Exposed for tests
   protected volatile Boolean shutdown = false;
 
-  private ScheduledThreadPoolExecutor schedExecutor;
+  private final ScheduledThreadPoolExecutor schedExecutor;
+  private final ExecutorService summariesExecutor;
 
   protected CompactionCoordinator(ConfigOpts opts, String[] args) {
     this(opts, args, null);
@@ -140,6 +145,8 @@ public class CompactionCoordinator extends AbstractServer
     super("compaction-coordinator", opts, args);
     aconf = conf == null ? super.getConfiguration() : conf;
     schedExecutor = 
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
+    summariesExecutor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
+        "Compaction Summary Gatherer", false);
     compactionFinalizer = createCompactionFinalizer(schedExecutor);
     tserverSet = createLiveTServerSet();
     setupSecurity();
@@ -311,39 +318,39 @@ public class CompactionCoordinator extends AbstractServer
       }
     }
 
+    summariesExecutor.shutdownNow();
     LOG.info("Shutting down");
   }
 
   private void updateSummaries() {
-    ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
-        "Compaction Summary Gatherer", false);
-    try {
-      Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
-      tserverSet.getCurrentServers().forEach(tsi -> {
-        executor.execute(() -> updateSummaries(tsi, queuesSeen));
-      });
+    final ArrayList<Future<?>> tasks = new ArrayList<>();
+    Set<String> queuesSeen = new ConcurrentSkipListSet<>();
 
-      executor.shutdown();
+    tserverSet.getCurrentServers().forEach(tsi -> {
+      tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, 
queuesSeen)));
+    });
 
-      try {
-        while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {}
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+    // Wait for all tasks to complete
+    while (!tasks.isEmpty()) {
+      Iterator<Future<?>> iter = tasks.iterator();
+      while (iter.hasNext()) {
+        Future<?> f = iter.next();
+        if (f.isDone()) {
+          iter.remove();
+        }
       }
+      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+    }
 
-      // remove any queues that were seen in the past, but were not seen in 
the latest gathering of
-      // summaries
-      TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen);
+    // remove any queues that were seen in the past, but were not seen in the 
latest gathering of
+    // summaries
+    TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen);
 
-      // add any queues that were never seen before
-      queuesSeen.forEach(q -> {
-        TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> 
System.currentTimeMillis());
-      });
-    } finally {
-      executor.shutdownNow();
-    }
+    // add any queues that were never seen before
+    queuesSeen.forEach(q -> {
+      TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> 
System.currentTimeMillis());
+    });
   }
 
   private void updateSummaries(TServerInstance tsi, Set<String> queuesSeen) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 36a9c6ec88..3565bc8b89 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -157,6 +157,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.opentelemetry.api.trace.Span;
@@ -224,6 +225,8 @@ public class Manager extends AbstractServer
   private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
   private final AtomicBoolean managerUpgrading = new AtomicBoolean(false);
 
+  private ExecutorService tableInformationStatusPool = null;
+
   @Override
   public synchronized ManagerState getManagerState() {
     return state;
@@ -958,11 +961,10 @@ public class Manager extends AbstractServer
       Set<TServerInstance> currentServers, 
SortedMap<TabletServerId,TServerStatus> balancerMap) {
     final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
     int threads = 
getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
-    ExecutorService tp = ThreadPools.getServerThreadPools()
-        .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
     long start = System.currentTimeMillis();
     final SortedMap<TServerInstance,TabletServerStatus> result = new 
ConcurrentSkipListMap<>();
     final RateLimiter shutdownServerRateLimiter = 
RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
+    final ArrayList<Future<?>> tasks = new ArrayList<>();
     for (TServerInstance serverInstance : currentServers) {
       final TServerInstance server = serverInstance;
       if (threads == 0) {
@@ -971,7 +973,7 @@ public class Manager extends AbstractServer
         // unresponsive tservers.
         sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), MILLISECONDS);
       }
-      tp.execute(() -> {
+      tasks.add(tableInformationStatusPool.submit(() -> {
         try {
           Thread t = Thread.currentThread();
           String oldName = t.getName();
@@ -1020,17 +1022,28 @@ public class Manager extends AbstractServer
             badServers.remove(server);
           }
         }
-      });
-    }
-    tp.shutdown();
-    try {
-      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS);
-    } catch (InterruptedException e) {
-      log.debug("Interrupted while fetching status");
+      }));
+    }
+    final long timeToWaitForCompletion = Math.max(10000, rpcTimeout / 3);
+    final long startTime = System.nanoTime();
+    final long timeToCancelTasks = startTime + 
MILLISECONDS.toNanos(timeToWaitForCompletion);
+    // Wait for all tasks to complete
+    while (!tasks.isEmpty()) {
+      boolean cancel = ((System.nanoTime() - startTime) > timeToCancelTasks);
+      Iterator<Future<?>> iter = tasks.iterator();
+      while (iter.hasNext()) {
+        Future<?> f = iter.next();
+        if (cancel) {
+          f.cancel(true);
+        } else {
+          if (f.isDone()) {
+            iter.remove();
+          }
+        }
+      }
+      Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
     }
 
-    tp.shutdownNow();
-
     // Threads may still modify map after shutdownNow is called, so create an 
immutable snapshot.
     SortedMap<TServerInstance,TabletServerStatus> info = 
ImmutableSortedMap.copyOf(result);
     tserverStatus.forEach((tsi, status) -> balancerMap.put(new 
TabletServerIdImpl(tsi),
@@ -1105,6 +1118,9 @@ public class Manager extends AbstractServer
 
     context.getTableManager().addObserver(this);
 
+    tableInformationStatusPool = ThreadPools.getServerThreadPools()
+        .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
+
     Thread statusThread = Threads.createThread("Status Thread", new 
StatusThread());
     statusThread.start();
 
@@ -1188,8 +1204,7 @@ public class Manager extends AbstractServer
               context.getZooReaderWriter()),
           HOURS.toMillis(8), System::currentTimeMillis);
 
-      Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString);
-      f.startTransactionRunners(getConfiguration());
+      Fate<Manager> f = new Fate<>(this, store, TraceRepo::toLogString, 
getConfiguration());
       fateRef.set(f);
       fateReadyLatch.countDown();
 
@@ -1261,6 +1276,8 @@ public class Manager extends AbstractServer
       throw new IllegalStateException("Exception stopping status thread", e);
     }
 
+    tableInformationStatusPool.shutdownNow();
+
     // Signal that we want it to stop, and wait for it to do so.
     if (authenticationTokenKeyManager != null) {
       authenticationTokenKeyManager.gracefulStop();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index 159528f1b2..78696e6c49 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -63,10 +63,6 @@ public class CompactionExecutorsMetrics implements 
MetricsProducer {
   }
 
   public CompactionExecutorsMetrics() {
-    startUpdateThread();
-  }
-
-  protected void startUpdateThread() {
     ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
         .createScheduledExecutorService(1, "compactionExecutorsMetricsPoller", 
false);
     Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
index af441015a1..5834179fc0 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java
@@ -158,12 +158,11 @@ public class FateIT {
     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
     replay(manager, sctx);
 
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
     try {
-      ConfigurationCopy config = new ConfigurationCopy();
-      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
@@ -219,12 +218,11 @@ public class FateIT {
     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
     replay(manager, sctx);
 
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
     try {
-      ConfigurationCopy config = new ConfigurationCopy();
-      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
@@ -247,7 +245,7 @@ public class FateIT {
   }
 
   @Test
-  public void testCancelWhileSubmittedNotRunning() throws Exception {
+  public void testCancelWhileSubmittedAndRunning() throws Exception {
     final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
     final AgeOffStore<Manager> store =
         new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
@@ -259,45 +257,11 @@ public class FateIT {
     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
     replay(manager, sctx);
 
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
     ConfigurationCopy config = new ConfigurationCopy();
     config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-
-    // Notice that we did not start the transaction runners
-
-    // Wait for the transaction runner to be scheduled.
-    UtilWaitThread.sleep(3000);
-
-    callStarted = new CountDownLatch(1);
-    finishCall = new CountDownLatch(1);
-
-    long txid = fate.startTransaction();
-    LOG.debug("Starting test testCancelWhileSubmitted with {}", 
FateTxId.formatTid(txid));
-    assertEquals(NEW, getTxStatus(zk, txid));
-    fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), 
true, "Test Op");
-    assertEquals(SUBMITTED, getTxStatus(zk, txid));
-    assertTrue(fate.cancel(txid));
-  }
-
-  @Test
-  public void testCancelWhileSubmittedAndRunning() throws Exception {
-    final ZooStore<Manager> zooStore = new ZooStore<Manager>(ZK_ROOT + 
Constants.ZFATE, zk);
-    final AgeOffStore<Manager> store =
-        new AgeOffStore<Manager>(zooStore, 3000, System::currentTimeMillis);
-
-    Manager manager = createMock(Manager.class);
-    ServerContext sctx = createMock(ServerContext.class);
-    expect(manager.getContext()).andReturn(sctx).anyTimes();
-    expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes();
-    expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
-    replay(manager, sctx);
-
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
     try {
-      ConfigurationCopy config = new ConfigurationCopy();
-      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);
@@ -334,12 +298,11 @@ public class FateIT {
     expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes();
     replay(manager, sctx);
 
-    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString);
+    ConfigurationCopy config = new ConfigurationCopy();
+    config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
+    config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
+    Fate<Manager> fate = new Fate<Manager>(manager, store, 
TraceRepo::toLogString, config);
     try {
-      ConfigurationCopy config = new ConfigurationCopy();
-      config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
-      config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
-      fate.startTransactionRunners(config);
 
       // Wait for the transaction runner to be scheduled.
       UtilWaitThread.sleep(3000);

Reply via email to