This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch EdColeman/normalize_metrics_names in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 2bddf8da35b10e76c46dea558cd3257d274269ff Author: Daniel Roberts <[email protected]> AuthorDate: Fri Jul 26 19:53:58 2024 +0000 Use constants for ThreadPool Names Create constants for thread pool names to help with metric names and ease of troubleshooting --- .../accumulo/core/clientImpl/ClientContext.java | 9 +- .../core/clientImpl/ConditionalWriterImpl.java | 3 +- .../core/clientImpl/InstanceOperationsImpl.java | 3 +- .../core/clientImpl/TableOperationsImpl.java | 3 +- .../core/clientImpl/TabletServerBatchReader.java | 3 +- .../core/clientImpl/TabletServerBatchWriter.java | 6 +- .../accumulo/core/clientImpl/bulk/BulkImport.java | 9 +- .../accumulo/core/file/BloomFilterLayer.java | 3 +- .../core/metrics/MetricsThreadPoolsDef.java | 53 ----------- .../util/compaction/ExternalCompactionUtil.java | 6 +- .../core/util/threads/ThreadPoolNames.java | 54 +++++++++++ .../accumulo/core/util/threads/ThreadPools.java | 100 +++++++++++---------- .../apache/accumulo/server/rpc/TServerUtils.java | 4 +- .../accumulo/coordinator/CompactionFinalizer.java | 8 +- .../manager/tableOps/bulkVer2/BulkImportMove.java | 4 +- .../tableOps/tableImport/MoveExportedFiles.java | 5 +- .../tserver/TabletServerResourceManager.java | 49 +++++----- .../compactions/InternalCompactionExecutor.java | 9 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 4 +- 19 files changed, 183 insertions(+), 152 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 1d333d37dd..4a97f19da4 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -26,6 +26,8 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_CLEANUP_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCANNER_READ_AHEAD_POOL_NAME; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.reflect.InvocationTargetException; @@ -260,7 +262,7 @@ public class ClientContext implements AccumuloClient { submitScannerReadAheadTask(Callable<List<KeyValue>> c) { ensureOpen(); if (scannerReadaheadPool == null) { - scannerReadaheadPool = clientThreadPools.getPoolBuilder("client.context.scanner.read.ahead") + scannerReadaheadPool = clientThreadPools.getPoolBuilder(SCANNER_READ_AHEAD_POOL_NAME) .numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(3L, SECONDS) .withQueue(new SynchronousQueue<>()).build(); } @@ -270,9 +272,8 @@ public class ClientContext implements AccumuloClient { public synchronized void executeCleanupTask(Runnable r) { ensureOpen(); if (cleanupThreadPool == null) { - cleanupThreadPool = - clientThreadPools.getPoolBuilder("client.context.conditional.writer.cleanup") - .numCoreThreads(1).withTimeOut(3L, SECONDS).build(); + cleanupThreadPool = clientThreadPools.getPoolBuilder(CONDITIONAL_WRITER_CLEANUP_POOL_NAME) + .numCoreThreads(1).withTimeOut(3L, SECONDS).build(); } this.cleanupThreadPool.execute(r); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java index cb7675196c..fb73e46e40 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ConditionalWriterImpl.java @@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.CONDITIONAL_WRITER_POOL_NAME; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -379,7 +380,7 @@ public class ConditionalWriterImpl implements ConditionalWriter { this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); this.threadPool = context.threadPools().createScheduledExecutorService( - config.getMaxWriteThreads(), this.getClass().getSimpleName()); + config.getMaxWriteThreads(), CONDITIONAL_WRITER_POOL_NAME.poolName); this.locator = new SyncingTabletLocator(context, tableId); this.serverQueues = new HashMap<>(); this.tableId = tableId; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 9c26e433bf..ce3003ebd9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -28,6 +28,7 @@ import static org.apache.accumulo.core.rpc.ThriftUtil.createClient; import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL_NAME; import java.util.ArrayList; import java.util.Collections; @@ -302,7 +303,7 @@ public class InstanceOperationsImpl implements InstanceOperations { int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256)); var executorService = - context.threadPools().getPoolBuilder("instance.ops.active.compactions.finder") + context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL_NAME) .numCoreThreads(numThreads).build(); try { List<Future<List<ActiveCompaction>>> futures = new ArrayList<>(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 0c2181f43d..a09cd008a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -30,6 +30,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL_NAME; import java.io.BufferedReader; import java.io.FileNotFoundException; @@ -496,7 +497,7 @@ public class TableOperationsImpl extends TableOperationsHelper { AtomicReference<Exception> exception = new AtomicReference<>(null); ExecutorService executor = - context.threadPools().getPoolBuilder("table.ops.add.splits").numCoreThreads(16).build(); + context.threadPools().getPoolBuilder(SPLIT_POOL_NAME).numCoreThreads(16).build(); try { executor.execute( new SplitTask(new SplitEnv(tableName, tableId, executor, latch, exception), splits)); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index 1e87d2e9da..78fbe2e33a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_SCANNER_POOL_NAME; import java.lang.ref.Cleaner.Cleanable; import java.util.ArrayList; @@ -71,7 +72,7 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan this.tableName = tableName; this.numThreads = numQueryThreads; - queryThreadPool = context.threadPools().getPoolBuilder("client.batch.reader.scanner") + queryThreadPool = context.threadPools().getPoolBuilder(BATCH_SCANNER_POOL_NAME) .numCoreThreads(numQueryThreads).build(); // Call shutdown on this thread pool in case the caller does not call close(). cleanable = CleanerUtil.shutdownThreadPoolExecutor(queryThreadPool, closed, log); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index 46193ac9bb..d6d1fda060 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_WRITER_BIN_MUTATIONS_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BATCH_WRITER_SEND_POOL_NAME; import java.io.IOException; import java.lang.management.CompilationMXBean; @@ -672,10 +674,10 @@ public class TabletServerBatchWriter implements AutoCloseable { public MutationWriter(int numSendThreads) { serversMutations = new HashMap<>(); queued = new HashSet<>(); - sendThreadPool = context.threadPools().getPoolBuilder("batch.writer.send") + sendThreadPool = context.threadPools().getPoolBuilder(BATCH_WRITER_SEND_POOL_NAME) .numCoreThreads(numSendThreads).build(); locators = new HashMap<>(); - binningThreadPool = context.threadPools().getPoolBuilder("batch.writer.bin.mutations") + binningThreadPool = context.threadPools().getPoolBuilder(BATCH_WRITER_BIN_MUTATIONS_POOL_NAME) .numCoreThreads(1).withQueue(new SynchronousQueue<>()).build(); binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 0b185a0a16..d7e5ab7e47 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -23,8 +23,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.groupingBy; import static org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.pathToCacheId; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_BULK_IMPORT_CLIENT_PREFIX; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_CLIENT_LOAD_POOL_NAME; import java.io.FileNotFoundException; import java.io.IOException; @@ -483,13 +484,13 @@ public class BulkImport implements ImportDestinationArguments, ImportMappingOpti if (this.executor != null) { executor = this.executor; } else if (numThreads > 0) { - executor = service = context.threadPools().getPoolBuilder("client.bulk.load") + executor = service = context.threadPools().getPoolBuilder(BULK_IMPORT_CLIENT_LOAD_POOL_NAME) .numCoreThreads(numThreads).enableThreadPoolMetrics().build(); } else { String threads = context.getConfiguration().get(ClientProperty.BULK_LOAD_THREADS.getKey()); executor = service = context.threadPools() - .getPoolBuilder( - METRICS_BULK_IMPORT_CLIENT_PREFIX + ClientProperty.BULK_LOAD_THREADS.getKey()) + .getPoolBuilder(ACCUMULO_POOL_PREFIX + ".bulk.import.client." + + ClientProperty.BULK_LOAD_THREADS.getKey()) .numCoreThreads(ConfigurationTypeHelper.getNumThreads(threads)).enableThreadPoolMetrics() .build(); } diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java index a937f3cfeb..19ae922935 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java +++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.file; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BLOOM_LOADER_POOL_NAME; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -80,7 +81,7 @@ public class BloomFilterLayer { } if (maxLoadThreads > 0) { - loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("bloom.loader") + loadThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder(BLOOM_LOADER_POOL_NAME) .numCoreThreads(0).numMaxThreads(maxLoadThreads).withTimeOut(60L, SECONDS).build(); } return loadThreadPool; diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsThreadPoolsDef.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsThreadPoolsDef.java deleted file mode 100644 index d6a5365886..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsThreadPoolsDef.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.metrics; - -/** - * Defines constants used for thread pool metrics names. - */ -public interface MetricsThreadPoolsDef { - - // metered pools and executor service metric names - String METRICS_POOL_PREFIX = "accumulo.pool."; - String METRICS_BULK_IMPORT_CLIENT_PREFIX = METRICS_POOL_PREFIX + "bulk.import.client."; - String METRICS_COORDINATOR_FINALIZER_BACKGROUND_POOL = - METRICS_POOL_PREFIX + "compaction.finalizer.background.pool"; - String METRICS_COORDINATOR_FINALIZER_NOTIFIER_POOL = - METRICS_POOL_PREFIX + "compaction.coordinator.compaction.finalizer"; - String METRICS_GC_DELETE_POOL = METRICS_POOL_PREFIX + "gc.threads.delete"; - String METRICS_GENERAL_SERVER_POOL = METRICS_POOL_PREFIX + "general.server"; - String METRICS_GENERAL_SERVER_SIMPLETIMER_POOL = - METRICS_POOL_PREFIX + "general.server.simpletimer"; - String METRICS_MANAGER_BULK_IMPORT_POOL = METRICS_POOL_PREFIX + "manager.bulk.import"; - String METRICS_MANAGER_FATE_POOL = METRICS_POOL_PREFIX + "manager.fate"; - String METRICS_MANAGER_RENAME_POOL = METRICS_POOL_PREFIX + "manager.rename"; - String METRICS_MANAGER_STATUS_POOL = METRICS_POOL_PREFIX + "manager.status"; - String METRICS_REPLICATION_WORKER_POOL = METRICS_POOL_PREFIX + "replication.worker"; - String METRICS_TSERVER_ASSIGNMENT_POOL = METRICS_POOL_PREFIX + "tserver.assignment"; - String METRICS_TSERVER_COMPACTION_MINOR_POOL = METRICS_POOL_PREFIX + "tserver.compaction.minor"; - String METRICS_TSERVER_MIGRATIONS_POOL = METRICS_POOL_PREFIX + "tserver.migrations"; - String METRICS_TSERVER_MINOR_COMPACTOR_POOL = METRICS_POOL_PREFIX + "tserver.minor.compactor"; - String METRICS_TSERVER_SUMMARY_PARTITION_POOL = METRICS_POOL_PREFIX + "tserver.summary.partition"; - String METRICS_TSERVER_SUMMARY_REMOTE_POOL = METRICS_POOL_PREFIX + "tserver.summary.remote"; - String METRICS_TSERVER_SUMMARY_RETRIEVAL_POOL = METRICS_POOL_PREFIX + "tserver.summary.retrieval"; - String METRICS_TSERVER_TABLET_MIGRATION_POOL = METRICS_POOL_PREFIX + "tserver.tablet.migration"; - String METRICS_TSERVER_WORKQ_POOL = METRICS_POOL_PREFIX + "tserver.workq"; - String METRICS_TSERV_WAL_SORT_CONCURRENT_POOL = - METRICS_POOL_PREFIX + "tserver.wal.sort.concurrent"; -} diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 1eb5af8f1a..9af40818e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -19,6 +19,8 @@ package org.apache.accumulo.core.util.compaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTION_RUNNING_COMPACTION_IDS_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COMPACTOR_RUNNING_COMPACTIONS_POOL_NAME; import java.util.ArrayList; import java.util.Collection; @@ -224,7 +226,7 @@ public class ExternalCompactionUtil { public static List<RunningCompaction> getCompactionsRunningOnCompactors(ClientContext context) { final List<RunningCompactionFuture> rcFutures = new ArrayList<>(); final ExecutorService executor = ThreadPools.getServerThreadPools() - .getPoolBuilder("compactor.running.compactions").numCoreThreads(16).build(); + .getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL_NAME).numCoreThreads(16).build(); getCompactorAddrs(context).forEach((q, hp) -> { hp.forEach(hostAndPort -> { rcFutures.add(new RunningCompactionFuture(q, hostAndPort, @@ -251,7 +253,7 @@ public class ExternalCompactionUtil { public static Collection<ExternalCompactionId> getCompactionIdsRunningOnCompactors(ClientContext context) { final ExecutorService executor = ThreadPools.getServerThreadPools() - .getPoolBuilder("compactor.running.compaction.ids").numCoreThreads(16).build(); + .getPoolBuilder(COMPACTION_RUNNING_COMPACTION_IDS_POOL_NAME).numCoreThreads(16).build(); List<Future<ExternalCompactionId>> futures = new ArrayList<>(); getCompactorAddrs(context).forEach((q, hp) -> { diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java new file mode 100644 index 0000000000..ff8bebcf4b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -0,0 +1,54 @@ +package org.apache.accumulo.core.util.threads; + +public enum ThreadPoolNames { + + ACCUMULO_POOL_PREFIX("accumulo.pool"), + BATCH_SCANNER_POOL_NAME("accumulo.pool.client.batch.reader.scanner"), + BATCH_WRITER_SEND_POOL_NAME("accumulo.pool.batch.writer.send"), + BATCH_WRITER_BIN_MUTATIONS_POOL_NAME("accumulo.pool.batch.writer.bin.mutations"), + BULK_IMPORT_CLIENT_LOAD_POOL_NAME("accumulo.pool.bulk.import.client.bulk.load"), + BULK_IMPORT_DIR_MOVE_POOL_NAME("accumulo.pool.bulk.dir.move"), + BLOOM_LOADER_POOL_NAME("accumulo.pool.bloom.loader"), + SCANNER_READ_AHEAD_POOL_NAME("accumulo.pool.client.context.scanner.read.ahead"), + COMPACTOR_RUNNING_COMPACTIONS_POOL_NAME("accumulo.pool.compactor.running.compactions"), + COMPACTION_RUNNING_COMPACTION_IDS_POOL_NAME("accumulo.pool.compactor.running.compaction.ids"), + CONDITIONAL_WRITER_POOL_NAME("accumulo.pool.conditional.writer"), + CONDITIONAL_WRITER_CLEANUP_POOL_NAME("accumulo.pool.client.context.conditional.writer.cleanup"), + COORDINATOR_FINALIZER_BACKGROUND_POOL_NAME("accumulo.pool.compaction.finalizer.background.pool"), + COORDINATOR_FINALIZER_NOTIFIER_POOL_NAME( + "accumulo.pool.compaction.coordinator.compaction.finalizer"), + GC_DELETE_POOL_NAME("accumulo.pool.gc.threads.delete"), + GENERAL_SERVER_POOL_NAME("accumulo.pool.general.server"), + GENERAL_SERVER_SIMPLETIMER_POOL_NAME("accumulo.pool.general.server.simpletimer"), + INSTANCE_OPS_COMPACTIONS_FINDER_POOL_NAME("accumulo.pool.instance.ops.active.compactions.finder"), + IMPORT_TABLE_RENAME_POOL_NAME("accumulo.pool.import.table.rename"), + MANAGER_BULK_IMPORT_POOL_NAME("accumulo.pool.manager.bulk.import"), + MANAGER_FATE_POOL_NAME("accumulo.pool.manager.fate"), + MANAGER_RENAME_POOL_NAME("accumulo.pool.manager.rename"), + MANAGER_STATUS_POOL_NAME("accumulo.pool.manager.status"), + METADATA_DEFAULT_SPLIT_POOL_NAME("accumulo.pool.metadata.tablet.default.splitter"), + METADATA_TABLET_MIGRATION_POOL_NAME("accumulo.pool.metadata.tablet.migration"), + METADATA_TABLET_ASSIGNMENT_POOL_NAME("accumulo.pool.metadata.tablet.assignment"), + REPLICATION_WORKER_POOL_NAME("accumulo.pool.replication.worker"), + SCAN_POOL_NAME("accumulo.pool.scan"), + SPLIT_POOL_NAME("accumulo.pool.table.ops.add.splits"), + SCHEDULED_FUTURE_CHECKER_POOL_NAME("accumulo.pool.scheduled.future.checker"), + TABLET_ASSIGNMENT_POOL_NAME("accumulo.pool.tablet.assignment.pool"), + TSERVER_SUMMARY_RETRIEVAL_POOL_NAME("accumulo.pool.tserver.summary.retrieval"), + TSERVER_SUMMARY_FILE_RETRIEVER_POOL_NAME("accumulo.pool.tserver.summary.file.retriever.pool"), + TSERVER_SUMMARY_REMOTE_POOL_NAME("accumulo.pool.tserver.summary.remote"), + TSERVER_MIGRATIONS_POOL_NAME("accumulo.pool.tserver.migrations"), + TSERVER_ASSIGNMENT_POOL_NAME("accumulo.pool.tserver.assignment"), + TSERVER_COMPACTION_MINOR_POOL_NAME("accumulo.pool.tserver.compaction.minor"), + TSERVER_MINOR_COMPACTOR_POOL_NAME("accumulo.pool.tserver.minor.compactor"), + TSERVER_SUMMARY_PARTITION_POOL_NAME("accumulo.pool.tserver.summary.partition"), + TSERVER_TABLET_MIGRATION_POOL_NAME("accumulo.pool.tserver.tablet.migration"), + TSERVER_WAL_SORT_CONCURRENT_POOL_NAME("accumulo.pool.tserver.wal.sort.concurrent"), + TSERVER_WORKQ_POOL_NAME("accumulo.pool.tserver.workq"); + + public final String poolName; + + ThreadPoolNames(String poolName) { + this.poolName = poolName; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index d44f8db728..5a2c08c67f 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -21,22 +21,23 @@ package org.apache.accumulo.core.util.threads; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_GC_DELETE_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_GENERAL_SERVER_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_GENERAL_SERVER_SIMPLETIMER_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_MANAGER_BULK_IMPORT_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_MANAGER_FATE_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_MANAGER_RENAME_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_MANAGER_STATUS_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_POOL_PREFIX; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_REPLICATION_WORKER_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_ASSIGNMENT_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_COMPACTION_MINOR_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_MIGRATIONS_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_SUMMARY_PARTITION_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_SUMMARY_REMOTE_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_SUMMARY_RETRIEVAL_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_WORKQ_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_SIMPLETIMER_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_BULK_IMPORT_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_RENAME_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.REPLICATION_WORKER_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHEDULED_FUTURE_CHECKER_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_RETRIEVAL_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL_NAME; import java.lang.Thread.UncaughtExceptionHandler; import java.util.Iterator; @@ -98,8 +99,8 @@ public class ThreadPools { return new ThreadPools(ueh); } - private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL = - getServerThreadPools().getPoolBuilder("scheduled.future.checker").numCoreThreads(1).build(); + private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL = getServerThreadPools() + .getPoolBuilder(SCHEDULED_FUTURE_CHECKER_POOL_NAME).numCoreThreads(1).build(); private static final ConcurrentLinkedQueue<ScheduledFuture<?>> CRITICAL_RUNNING_TASKS = new ConcurrentLinkedQueue<>(); @@ -289,12 +290,12 @@ public class ThreadPools { switch (p) { case GENERAL_SIMPLETIMER_THREADPOOL_SIZE: return createScheduledExecutorService(conf.getCount(p), - METRICS_GENERAL_SERVER_SIMPLETIMER_POOL); + GENERAL_SERVER_SIMPLETIMER_POOL_NAME.poolName); case GENERAL_THREADPOOL_SIZE: - return createScheduledExecutorService(conf.getCount(p), METRICS_GENERAL_SERVER_POOL, + return createScheduledExecutorService(conf.getCount(p), GENERAL_SERVER_POOL_NAME.poolName, emitThreadPoolMetrics); case MANAGER_BULK_THREADPOOL_SIZE: - builder = getPoolBuilder(METRICS_MANAGER_BULK_IMPORT_POOL).numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(MANAGER_BULK_IMPORT_POOL_NAME).numCoreThreads(conf.getCount(p)) .withTimeOut(conf.getTimeInMillis(Property.MANAGER_BULK_THREADPOOL_TIMEOUT), MILLISECONDS); if (emitThreadPoolMetrics) { @@ -302,19 +303,19 @@ public class ThreadPools { } return builder.build(); case MANAGER_RENAME_THREADS: - builder = getPoolBuilder(METRICS_MANAGER_RENAME_POOL).numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(MANAGER_RENAME_POOL_NAME).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case MANAGER_FATE_THREADPOOL_SIZE: - builder = getPoolBuilder(METRICS_MANAGER_FATE_POOL).numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(MANAGER_FATE_POOL_NAME).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case MANAGER_STATUS_THREAD_POOL_SIZE: - builder = getPoolBuilder(METRICS_MANAGER_STATUS_POOL); + builder = getPoolBuilder(MANAGER_STATUS_POOL_NAME); int threads = conf.getCount(p); if (threads == 0) { builder.numCoreThreads(0).numMaxThreads(Integer.MAX_VALUE).withTimeOut(60L, SECONDS) @@ -327,57 +328,57 @@ public class ThreadPools { } return builder.build(); case TSERV_WORKQ_THREADS: - builder = getPoolBuilder(METRICS_TSERVER_WORKQ_POOL).numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(TSERVER_WORKQ_POOL_NAME).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_MINC_MAXCONCURRENT: - builder = getPoolBuilder(METRICS_TSERVER_COMPACTION_MINOR_POOL) - .numCoreThreads(conf.getCount(p)).withTimeOut(0L, MILLISECONDS); + builder = getPoolBuilder(TSERVER_MINOR_COMPACTOR_POOL_NAME).numCoreThreads(conf.getCount(p)) + .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_MIGRATE_MAXCONCURRENT: - builder = getPoolBuilder(METRICS_TSERVER_MIGRATIONS_POOL).numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(TSERVER_MIGRATIONS_POOL_NAME).numCoreThreads(conf.getCount(p)) .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_ASSIGNMENT_MAXCONCURRENT: - builder = getPoolBuilder(METRICS_TSERVER_ASSIGNMENT_POOL).numCoreThreads(conf.getCount(p)) + builder = getPoolBuilder(TSERVER_ASSIGNMENT_POOL_NAME).numCoreThreads(conf.getCount(p)) .withTimeOut(0L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_SUMMARY_RETRIEVAL_THREADS: - builder = getPoolBuilder(METRICS_TSERVER_SUMMARY_RETRIEVAL_POOL) + builder = getPoolBuilder(TSERVER_SUMMARY_RETRIEVAL_POOL_NAME) .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_SUMMARY_REMOTE_THREADS: - builder = getPoolBuilder(METRICS_TSERVER_SUMMARY_REMOTE_POOL) - .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + builder = getPoolBuilder(TSERVER_SUMMARY_REMOTE_POOL_NAME).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case TSERV_SUMMARY_PARTITION_THREADS: - builder = getPoolBuilder(METRICS_TSERVER_SUMMARY_PARTITION_POOL) + builder = getPoolBuilder(TSERVER_SUMMARY_PARTITION_POOL_NAME) .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } return builder.build(); case GC_DELETE_THREADS: - return getPoolBuilder(METRICS_GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); + return getPoolBuilder(GC_DELETE_POOL_NAME).numCoreThreads(conf.getCount(p)).build(); case REPLICATION_WORKER_THREADS: - builder = getPoolBuilder(METRICS_REPLICATION_WORKER_POOL).numCoreThreads(conf.getCount(p)); + builder = getPoolBuilder(REPLICATION_WORKER_POOL_NAME).numCoreThreads(conf.getCount(p)); if (emitThreadPoolMetrics) { builder.enableThreadPoolMetrics(); } @@ -391,10 +392,25 @@ public class ThreadPools { /** * Fet a fluent-style pool builder. * - * @param name the pool name - the name will be prepended with METRICS_POOL_PREFIX + * @param pool the constant pool name + */ + public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final ThreadPoolNames pool) { + return new ThreadPoolExecutorBuilder(pool.poolName); + } + + /** + * Fet a fluent-style pool builder. + * + * @param name the pool name - the name trimed and prepended with the ACCUMULO_POOL_PREFIX so that + * pool names begin with a consistent prefix. */ public ThreadPoolExecutorBuilder getPoolBuilder(@NonNull final String name) { - return new ThreadPoolExecutorBuilder(name); + String trimmed = name.trim(); + if (trimmed.startsWith(ACCUMULO_POOL_PREFIX.poolName)) { + return new ThreadPoolExecutorBuilder(trimmed); + } else { + return new ThreadPoolExecutorBuilder(ACCUMULO_POOL_PREFIX + trimmed); + } } public class ThreadPoolExecutorBuilder { @@ -409,16 +425,10 @@ public class ThreadPools { /** * A fluent-style build to create a ThreadPoolExecutor. The name is used when creating - * named-threads for the pool. The name trimmed and prepended with the METRICS_POOL_PREFIX so - * that pool names begin with a consistent prefix. + * named-threads for the pool. */ ThreadPoolExecutorBuilder(@NonNull final String name) { - String trimmed = name.trim(); - if (trimmed.startsWith(METRICS_POOL_PREFIX)) { - this.name = trimmed; - } else { - this.name = METRICS_POOL_PREFIX + trimmed; - } + this.name = name; } public ThreadPoolExecutor build() { diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 1af333074c..17f5088197 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -20,7 +20,7 @@ package org.apache.accumulo.server.rpc; import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import java.io.IOException; import java.net.InetAddress; @@ -311,7 +311,7 @@ public class TServerUtils { private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) { - String poolName = METRICS_POOL_PREFIX + serverName.toLowerCase() + ".client"; + String poolName = ACCUMULO_POOL_PREFIX + serverName.toLowerCase() + ".client"; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().getPoolBuilder(poolName).numCoreThreads(executorThreads) .withTimeOut(threadTimeOut, MILLISECONDS).enableThreadPoolMetrics().build(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index b2bbedd187..b14dfb3efd 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -22,8 +22,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_COORDINATOR_FINALIZER_BACKGROUND_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_COORDINATOR_FINALIZER_NOTIFIER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_FINALIZER_BACKGROUND_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_FINALIZER_NOTIFIER_POOL_NAME; import java.util.ArrayList; import java.util.Iterator; @@ -78,11 +78,11 @@ public class CompactionFinalizer { .getCount(Property.COMPACTION_COORDINATOR_FINALIZER_TSERVER_NOTIFIER_MAXTHREADS); this.ntfyExecutor = ThreadPools.getServerThreadPools() - .getPoolBuilder(METRICS_COORDINATOR_FINALIZER_NOTIFIER_POOL).numCoreThreads(3) + .getPoolBuilder(COORDINATOR_FINALIZER_NOTIFIER_POOL_NAME).numCoreThreads(3) .numMaxThreads(max).withTimeOut(1L, MINUTES).enableThreadPoolMetrics().build(); this.backgroundExecutor = ThreadPools.getServerThreadPools() - .getPoolBuilder(METRICS_COORDINATOR_FINALIZER_BACKGROUND_POOL).numCoreThreads(1) + .getPoolBuilder(COORDINATOR_FINALIZER_BACKGROUND_POOL_NAME).numCoreThreads(1) .enableThreadPoolMetrics().build(); backgroundExecutor.execute(() -> { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java index 3b19aaf8f1..a5123baf08 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.manager.tableOps.bulkVer2; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.BULK_IMPORT_DIR_MOVE_POOL_NAME; import java.io.IOException; import java.util.HashMap; @@ -119,7 +119,7 @@ class BulkImportMove extends ManagerRepo { oldToNewMap.put(originalPath, newPath); } try { - fs.bulkRename(oldToNewMap, workerCount, METRICS_POOL_PREFIX + "bulk.dir.move", fmtTid); + fs.bulkRename(oldToNewMap, workerCount, BULK_IMPORT_DIR_MOVE_POOL_NAME.poolName, fmtTid); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(bulkInfo.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java index 57763ec56d..a777fb8a32 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/tableImport/MoveExportedFiles.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.manager.tableOps.tableImport; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.IMPORT_TABLE_RENAME_POOL_NAME; import java.io.IOException; import java.util.Arrays; @@ -114,8 +114,7 @@ class MoveExportedFiles extends ManagerRepo { } } try { - fs.bulkRename(oldToNewPaths, workerCount, METRICS_POOL_PREFIX + "import.table.rename", - fmtTid); + fs.bulkRename(oldToNewPaths, workerCount, IMPORT_TABLE_RENAME_POOL_NAME.poolName, fmtTid); } catch (IOException ioe) { throw new AcceptableThriftTableOperationException(tableInfo.tableId.canonical(), null, TableOperation.IMPORT, TableOperationExceptionType.OTHER, ioe.getCause().getMessage()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 5a3af4ea13..e144143ed5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -22,10 +22,18 @@ import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableMap; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_POOL_PREFIX; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_MINOR_COMPACTOR_POOL; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERVER_TABLET_MIGRATION_POOL; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_DEFAULT_SPLIT_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SPLIT_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_REMOTE_POOL_NAME; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_TABLET_MIGRATION_POOL_NAME; import java.io.IOException; import java.util.ArrayList; @@ -133,13 +141,13 @@ public class TabletServerResourceManager { * pool executor * * @param maxThreads max threads - * @param name name of thread pool + * @param pool name of thread pool * @param tp executor */ - private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name, + private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String pool, final ThreadPoolExecutor tp) { ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay( - () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, SECONDS)); + () -> ThreadPools.resizePool(tp, maxThreads, pool), 1, 10, SECONDS)); } private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec, @@ -187,13 +195,14 @@ public class TabletServerResourceManager { } scanExecQueues.put(sec.name, queue); - - ThreadPoolExecutor es = ThreadPools.getServerThreadPools().getPoolBuilder("scan." + sec.name) + ThreadPoolExecutor es = ThreadPools.getServerThreadPools() + .getPoolBuilder(ACCUMULO_POOL_PREFIX + ".scan." + sec.name) .numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads()) .withTimeOut(0L, MILLISECONDS).withQueue(queue).atPriority(sec.priority) .enableThreadPoolMetrics(enableMetrics).build(); + modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, - METRICS_POOL_PREFIX + "scan." + sec.name, es); + ACCUMULO_POOL_PREFIX + ".scan." + sec.name, es); return es; } @@ -310,24 +319,24 @@ public class TabletServerResourceManager { Property.TSERV_MINC_MAXCONCURRENT, true); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), - METRICS_TSERVER_MINOR_COMPACTOR_POOL, minorCompactionThreadPool); + TSERVER_MINOR_COMPACTOR_POOL_NAME.poolName, minorCompactionThreadPool); - splitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("split").numCoreThreads(0) - .numMaxThreads(1).withTimeOut(1, SECONDS).build(); + splitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder(SPLIT_POOL_NAME) + .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS).build(); defaultSplitThreadPool = - ThreadPools.getServerThreadPools().getPoolBuilder("metadata.tablet.default.splitter") + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_DEFAULT_SPLIT_POOL_NAME) .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); defaultMigrationPool = - ThreadPools.getServerThreadPools().getPoolBuilder("metadata.tablet.migration") + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_MIGRATION_POOL_NAME) .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT), - METRICS_TSERVER_TABLET_MIGRATION_POOL, migrationPool); + TSERVER_TABLET_MIGRATION_POOL_NAME.poolName, migrationPool); // not sure if concurrent assignments can run safely... even if they could there is probably no // benefit at startup because @@ -338,10 +347,10 @@ public class TabletServerResourceManager { Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT), - "tablet.assignment.pool", assignmentPool); + TABLET_ASSIGNMENT_POOL_NAME.poolName, assignmentPool); assignMetaDataPool = - ThreadPools.getServerThreadPools().getPoolBuilder("metadata.tablet.assignment") + ThreadPools.getServerThreadPools().getPoolBuilder(METADATA_TABLET_ASSIGNMENT_POOL_NAME) .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS).build(); activeAssignments = new ConcurrentHashMap<>(); @@ -350,19 +359,19 @@ public class TabletServerResourceManager { Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS), - "tserver.summary.file.retriever.pool", summaryRetrievalPool); + TSERVER_SUMMARY_FILE_RETRIEVER_POOL_NAME.poolName, summaryRetrievalPool); summaryRemotePool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS), - "tserver.summary.remote.pool", summaryRemotePool); + TSERVER_SUMMARY_REMOTE_POOL_NAME.poolName, summaryRemotePool); summaryPartitionPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS), - "tserver.summary.partition.pool", summaryPartitionPool); + TSERVER_SUMMARY_PARTITION_POOL_NAME.poolName, summaryPartitionPool); boolean isScanServer = (tserver instanceof ScanServer); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index 2433357605..67324f3797 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -19,7 +19,7 @@ package org.apache.accumulo.tserver.compactions; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; import java.util.ArrayList; import java.util.Collections; @@ -175,8 +175,9 @@ public class InternalCompactionExecutor implements CompactionExecutor { queue = new PriorityBlockingQueue<>(100, comparator); threadPool = ThreadPools.getServerThreadPools() - .getPoolBuilder("compaction.service.internal.compaction." + ceid).numCoreThreads(threads) - .numMaxThreads(threads).withTimeOut(60L, SECONDS).withQueue(queue).build(); + .getPoolBuilder(ACCUMULO_POOL_PREFIX + ".compaction.service.internal.compaction." + ceid) + .numCoreThreads(threads).numMaxThreads(threads).withTimeOut(60L, SECONDS).withQueue(queue) + .build(); metricCloser = ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedJob.size()); @@ -206,7 +207,7 @@ public class InternalCompactionExecutor implements CompactionExecutor { public void setThreads(int numThreads) { ThreadPools.resizePool(threadPool, () -> numThreads, - METRICS_POOL_PREFIX + "compaction." + ceid); + ACCUMULO_POOL_PREFIX + "accumulo.pool.compaction." + ceid); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index ccbfeff8fa..1e9970adda 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -19,7 +19,7 @@ package org.apache.accumulo.tserver.log; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metrics.MetricsThreadPoolsDef.METRICS_TSERV_WAL_SORT_CONCURRENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WAL_SORT_CONCURRENT_POOL_NAME; import java.io.DataInputStream; import java.io.EOFException; @@ -298,7 +298,7 @@ public class LogSorter { int threadPoolSize = this.conf.getCount(this.conf .resolve(Property.TSERV_WAL_SORT_MAX_CONCURRENT, Property.TSERV_RECOVERY_MAX_CONCURRENT)); ThreadPoolExecutor threadPool = - ThreadPools.getServerThreadPools().getPoolBuilder(METRICS_TSERV_WAL_SORT_CONCURRENT_POOL) + ThreadPools.getServerThreadPools().getPoolBuilder(TSERVER_WAL_SORT_CONCURRENT_POOL_NAME) .numCoreThreads(threadPoolSize).enableThreadPoolMetrics().build(); new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf, context).startProcessing(new LogProcessor(), threadPool);
