This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new a48b1e6c22 Micrometer thread pool fixes (#5571)
a48b1e6c22 is described below
commit a48b1e6c22ccdfa22128b8500f18fa021a88e980
Author: Dave Marion <[email protected]>
AuthorDate: Thu May 22 11:42:56 2025 -0400
Micrometer thread pool fixes (#5571)
* Remove duplicate thread pools, disable metrics
Modified code to use existing shared general
scheduled thread pool from the context. Disabled
metrics for the general scheduled thread pool so
that messages regarding duplicate gauges being
registered does not pollute the logs.
* Modified BatchWriter and ThreadPools
---
.../DefaultContextClassLoaderFactory.java | 24 +++++++------
.../core/clientImpl/TabletServerBatchWriter.java | 5 +--
.../accumulo/core/util/threads/ThreadPools.java | 38 +++++++++++++++++----
.../accumulo/server/metrics/MetricsInfoImpl.java | 4 +++
.../coordinator/CompactionCoordinator.java | 39 ++++++++++------------
.../coordinator/CompactionCoordinatorTest.java | 10 +++---
.../TestCompactionCoordinatorForOfflineTable.java | 10 +++---
7 files changed, 79 insertions(+), 51 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
index 19a8d38579..cc829a908e 100644
---
a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
+++
b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -72,17 +73,18 @@ public class DefaultContextClassLoaderFactory implements
ContextClassLoaderFacto
private static void startCleanupThread(final AccumuloConfiguration conf,
final Supplier<Map<String,String>> contextConfigSupplier) {
- ScheduledFuture<?> future = ThreadPools.getClientThreadPools((t, e) -> {
- LOG.error("context classloader cleanup thread has failed.", e);
- }).createGeneralScheduledExecutorService(conf)
- .scheduleWithFixedDelay(Threads.createNamedRunnable(className +
"-cleanup", () -> {
- LOG.trace("{}-cleanup thread, properties: {}", className, conf);
- Set<String> contextsInUse =
contextConfigSupplier.get().keySet().stream()
- .map(p ->
p.substring(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length()))
- .collect(Collectors.toSet());
- LOG.trace("{}-cleanup thread, contexts in use: {}", className,
contextsInUse);
- removeUnusedContexts(contextsInUse);
- }), 1, 1, MINUTES);
+ ScheduledFuture<?> future =
+ ((ScheduledThreadPoolExecutor) ThreadPools.getClientThreadPools((t, e)
-> {
+ LOG.error("context classloader cleanup thread has failed.", e);
+ }).createExecutorService(conf, Property.GENERAL_THREADPOOL_SIZE,
false))
+ .scheduleWithFixedDelay(Threads.createNamedRunnable(className +
"-cleanup", () -> {
+ LOG.trace("{}-cleanup thread, properties: {}", className, conf);
+ Set<String> contextsInUse =
contextConfigSupplier.get().keySet().stream()
+ .map(p ->
p.substring(VFS_CONTEXT_CLASSPATH_PROPERTY.getKey().length()))
+ .collect(Collectors.toSet());
+ LOG.trace("{}-cleanup thread, contexts in use: {}", className,
contextsInUse);
+ removeUnusedContexts(contextsInUse);
+ }), 1, 1, MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
LOG.debug("Context cleanup timer started at 60s intervals");
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index 46fce95cfc..cda943d5c5 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -117,6 +117,7 @@ import io.opentelemetry.context.Scope;
public class TabletServerBatchWriter implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(TabletServerBatchWriter.class);
+ private static final AtomicInteger numWritersCreated = new AtomicInteger(0);
// basic configuration
private final ClientContext context;
@@ -210,8 +211,8 @@ public class TabletServerBatchWriter implements
AutoCloseable {
public TabletServerBatchWriter(ClientContext context, BatchWriterConfig
config) {
this.context = context;
- this.executor = context.threadPools()
-
.createGeneralScheduledExecutorService(this.context.getConfiguration());
+ this.executor = context.threadPools().createScheduledExecutorService(2,
+ "BatchWriterThreads-" + numWritersCreated.incrementAndGet(), true);
this.failedMutations = new FailedMutations();
this.maxMem = config.getMaxMemory();
this.maxLatency = config.getMaxLatency(MILLISECONDS) <= 0 ? Long.MAX_VALUE
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 136065beb4..b30505c1fc 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -41,6 +41,7 @@ import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMM
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_WORKQ_POOL;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
@@ -57,6 +58,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntSupplier;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -69,6 +71,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
@@ -97,7 +100,9 @@ public class ThreadPools {
}
public static final ThreadPools
getClientThreadPools(UncaughtExceptionHandler ueh) {
- return new ThreadPools(ueh);
+ ThreadPools clientPools = new ThreadPools(ueh);
+ clientPools.setMeterRegistry(Metrics.globalRegistry);
+ return clientPools;
}
private static final ThreadPoolExecutor SCHEDULED_FUTURE_CHECKER_POOL =
@@ -601,7 +606,7 @@ public class ThreadPools {
result.allowCoreThreadTimeOut(true);
}
if (emitThreadPoolMetrics) {
- ThreadPools.addExecutorServiceMetrics(result, name);
+ addExecutorServiceMetrics(result, name);
}
return result;
}
@@ -644,7 +649,7 @@ public class ThreadPools {
* errors over long time periods.
* @return ScheduledThreadPoolExecutor
*/
- private ScheduledThreadPoolExecutor createScheduledExecutorService(int
numThreads,
+ public ScheduledThreadPoolExecutor createScheduledExecutorService(int
numThreads,
final String name, boolean emitThreadPoolMetrics) {
LOG.trace("Creating ScheduledThreadPoolExecutor for {} with {} threads",
name, numThreads);
var result =
@@ -708,13 +713,34 @@ public class ThreadPools {
};
if (emitThreadPoolMetrics) {
- ThreadPools.addExecutorServiceMetrics(result, name);
+ addExecutorServiceMetrics(result, name);
}
return result;
}
- private static void addExecutorServiceMetrics(ExecutorService executor,
String name) {
- new ExecutorServiceMetrics(executor, name,
List.of()).bindTo(Metrics.globalRegistry);
+ private final AtomicReference<MeterRegistry> registry = new
AtomicReference<>();
+ private final List<ExecutorServiceMetrics> earlyExecutorServices = new
ArrayList<>();
+
+ private void addExecutorServiceMetrics(ExecutorService executor, String
name) {
+ ExecutorServiceMetrics esm = new ExecutorServiceMetrics(executor, name,
List.of());
+ synchronized (earlyExecutorServices) {
+ MeterRegistry r = registry.get();
+ if (r != null) {
+ esm.bindTo(r);
+ } else {
+ earlyExecutorServices.add(esm);
+ }
+ }
+ }
+
+ public void setMeterRegistry(MeterRegistry r) {
+ if (registry.compareAndSet(null, r)) {
+ synchronized (earlyExecutorServices) {
+ earlyExecutorServices.forEach(e -> e.bindTo(r));
+ }
+ } else {
+ throw new IllegalStateException("setMeterRegistry called more than
once");
+ }
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
index 679c70cdae..1249d901c0 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
@@ -163,6 +164,9 @@ public class MetricsInfoImpl implements MetricsInfo {
}
}
+ // Set the MeterRegistry on the ThreadPools
+
ThreadPools.getServerThreadPools().setMeterRegistry(Metrics.globalRegistry);
+
if (jvmMetricsEnabled) {
LOG.info("enabling detailed jvm, classloader, jvm gc and process
metrics");
new ClassLoaderMetrics().bindTo(Metrics.globalRegistry);
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 11b89fccf6..59a0dd2782 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -139,8 +138,6 @@ public class CompactionCoordinator extends AbstractServer
implements
private ServiceLock coordinatorLock;
- private final ScheduledThreadPoolExecutor schedExecutor;
-
private final LoadingCache<String,Integer> compactorCounts;
protected CompactionCoordinator(ServerOpts opts, String[] args) {
@@ -150,14 +147,13 @@ public class CompactionCoordinator extends AbstractServer
implements
protected CompactionCoordinator(ServerOpts opts, String[] args,
AccumuloConfiguration conf) {
super("compaction-coordinator", opts, args);
aconf = conf == null ? super.getConfiguration() : conf;
- schedExecutor =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(aconf);
- compactionFinalizer = createCompactionFinalizer(schedExecutor);
+ compactionFinalizer = createCompactionFinalizer();
tserverSet = createLiveTServerSet();
setupSecurity();
- startGCLogger(schedExecutor);
+ startGCLogger();
printStartupMsg();
- startCompactionCleaner(schedExecutor);
- startRunningCleaner(schedExecutor);
+ startCompactionCleaner();
+ startRunningCleaner();
compactorCounts = Caffeine.newBuilder().expireAfterWrite(30,
TimeUnit.SECONDS)
.build(queue -> ExternalCompactionUtil.countCompactors(queue,
getContext()));
}
@@ -167,9 +163,8 @@ public class CompactionCoordinator extends AbstractServer
implements
return aconf;
}
- protected CompactionFinalizer
- createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) {
- return new CompactionFinalizer(getContext(), schedExecutor);
+ protected CompactionFinalizer createCompactionFinalizer() {
+ return new CompactionFinalizer(getContext(),
getContext().getScheduledExecutor());
}
protected LiveTServerSet createLiveTServerSet() {
@@ -180,22 +175,22 @@ public class CompactionCoordinator extends AbstractServer
implements
security = getContext().getSecurityOperation();
}
- protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
- ScheduledFuture<?> future =
- schedExecutor.scheduleWithFixedDelay(() ->
gcLogger.logGCInfo(getConfiguration()), 0,
- TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ protected void startGCLogger() {
+ ScheduledFuture<?> future =
getContext().getScheduledExecutor().scheduleWithFixedDelay(
+ () -> gcLogger.logGCInfo(getConfiguration()), 0,
TIME_BETWEEN_GC_CHECKS,
+ TimeUnit.MILLISECONDS);
ThreadPools.watchNonCriticalScheduledTask(future);
}
- protected void startCompactionCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
- ScheduledFuture<?> future =
- schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5,
TimeUnit.MINUTES);
+ protected void startCompactionCleaner() {
+ ScheduledFuture<?> future = getContext().getScheduledExecutor()
+ .scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5,
TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}
- protected void startRunningCleaner(ScheduledThreadPoolExecutor
schedExecutor) {
- ScheduledFuture<?> future =
- schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5,
TimeUnit.MINUTES);
+ protected void startRunningCleaner() {
+ ScheduledFuture<?> future = getContext().getScheduledExecutor()
+ .scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES);
ThreadPools.watchNonCriticalScheduledTask(future);
}
@@ -447,7 +442,7 @@ public class CompactionCoordinator extends AbstractServer
implements
}
protected void startDeadCompactionDetector() {
- new DeadCompactionDetector(getContext(), this, schedExecutor).start();
+ new DeadCompactionDetector(getContext(), this,
getContext().getScheduledExecutor()).start();
}
protected long getMissingCompactorWarningTime() {
diff --git
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 1f62ede587..8b320aac34 100644
---
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -37,7 +37,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
@@ -139,13 +138,16 @@ public class CompactionCoordinatorTest {
}
@Override
- protected void startCompactionCleaner(ScheduledThreadPoolExecutor
schedExecutor) {}
+ protected void startCompactionCleaner() {}
@Override
- protected CompactionFinalizer
createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
+ protected CompactionFinalizer createCompactionFinalizer() {
return null;
}
+ @Override
+ protected void startRunningCleaner() {}
+
@Override
protected LiveTServerSet createLiveTServerSet() {
return null;
@@ -155,7 +157,7 @@ public class CompactionCoordinatorTest {
protected void setupSecurity() {}
@Override
- protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {}
+ protected void startGCLogger() {}
@Override
protected void printStartupMsg() {}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
index a3cc78905e..a49f9439d6 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
@@ -18,8 +18,6 @@
*/
package org.apache.accumulo.test.compaction;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
import org.apache.accumulo.coordinator.CompactionCoordinator;
import org.apache.accumulo.coordinator.CompactionFinalizer;
import org.apache.accumulo.core.client.BatchWriter;
@@ -45,8 +43,8 @@ public class TestCompactionCoordinatorForOfflineTable extends
CompactionCoordina
private static final Logger LOG =
LoggerFactory.getLogger(NonNotifyingCompactionFinalizer.class);
- NonNotifyingCompactionFinalizer(ServerContext context,
ScheduledThreadPoolExecutor stpe) {
- super(context, stpe);
+ NonNotifyingCompactionFinalizer(ServerContext context) {
+ super(context, context.getScheduledExecutor());
}
@Override
@@ -75,8 +73,8 @@ public class TestCompactionCoordinatorForOfflineTable extends
CompactionCoordina
}
@Override
- protected CompactionFinalizer
createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
- return new NonNotifyingCompactionFinalizer(getContext(), stpe);
+ protected CompactionFinalizer createCompactionFinalizer() {
+ return new NonNotifyingCompactionFinalizer(getContext());
}
public static void main(String[] args) throws Exception {