This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 267353e3f [#2125] feat(common): Introduce ThreadPoolManager associated
metrics (#2127)
267353e3f is described below
commit 267353e3f3dcc2fc985263b01f10a33deaba3f6a
Author: maobaolong <[email protected]>
AuthorDate: Wed Oct 9 11:49:04 2024 +0800
[#2125] feat(common): Introduce ThreadPoolManager associated metrics (#2127)
### What changes were proposed in this pull request?
Introduce ThreadPoolManager support metrics
### Why are the changes needed?
Fix: #2125
### Does this PR introduce _any_ user-facing change?
Add batch of metrics related to the registered thread pool.
### How was this patch tested?
Open the dashboard server metrics page.
<img width="515" alt="image"
src="https://github.com/user-attachments/assets/561c5b7d-442f-4b2a-b48f-3fbdcc814cb0">
---
.../MeasurableRejectedExecutionHandler.java | 51 ++++++
.../uniffle/common/executor/ThreadPoolManager.java | 197 +++++++++++++++++++++
.../uniffle/common/metrics/CommonMetrics.java | 48 +++++
.../org/apache/uniffle/common/rpc/GrpcServer.java | 6 +-
.../common/executor/ThreadPoolManagerTest.java | 151 ++++++++++++++++
.../coordinator/metric/CoordinatorMetrics.java | 3 +
.../coordinator/metric/CoordinatorMetricsTest.java | 2 +-
.../uniffle/server/DefaultFlushEventHandler.java | 10 +-
.../uniffle/server/ShuffleServerMetrics.java | 3 +
9 files changed, 461 insertions(+), 10 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
b/common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
new file mode 100644
index 000000000..5ed8a0456
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A handler to measure reject count. */
+public class MeasurableRejectedExecutionHandler implements
RejectedExecutionHandler {
+ private AtomicLong counter = new AtomicLong(0L);
+
+ private final RejectedExecutionHandler handler;
+
+ /**
+ * Constructs a wrapped {@link RejectedExecutionHandler} to measure rejected
count.
+ *
+ * @param handler the rejected execution handler
+ */
+ public MeasurableRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ counter.incrementAndGet();
+ if (handler != null) {
+ handler.rejectedExecution(r, executor);
+ }
+ }
+
+ /** @return the rejected count */
+ public long getCount() {
+ return counter.get();
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
new file mode 100644
index 000000000..842545c16
--- /dev/null
+++
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+
+/** The threadPool manager which represents a manager to handle all thread
pool executors. */
+public class ThreadPoolManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(ThreadPoolManager.class);
+
+ private static final Map<Object, MeasurableThreadPoolExecutor>
THREAD_POOL_MAP =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Add a thread pool.
+ *
+ * @param name the name of the thread pool
+ * @param corePoolSize the core pool size supplier
+ * @param maximumPoolSize the maximum pool size supplier
+ * @param keepAliveTime the keep alive time supplier
+ * @param unit the unit
+ * @param workQueue the work queue
+ * @param threadFactory the thread factory
+ * @return the registered thread pool
+ */
+ public static ThreadPoolExecutor newThreadPool(
+ String name,
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
+ ThreadPoolExecutor threadPoolExecutor =
+ new ThreadPoolExecutor(
+ corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime,
threadPoolExecutor);
+ return threadPoolExecutor;
+ }
+
+ /**
+ * Add a thread pool.
+ *
+ * @param name the name of the thread pool
+ * @param corePoolSize the core pool size supplier
+ * @param maximumPoolSize the maximum pool size supplier
+ * @param keepAliveTime the keep alive time supplier
+ * @param unit the unit
+ * @param workQueue the work queue
+ * @param threadFactory the thread factory
+ * @param handler the handler to use when execution is blocked because the
thread bounds and queue
+ * capacities are reached
+ * @return the registered thread pool
+ */
+ public static ThreadPoolExecutor newThreadPool(
+ String name,
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ ThreadPoolExecutor threadPoolExecutor =
+ new ThreadPoolExecutor(
+ corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
+ registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime,
threadPoolExecutor);
+ return threadPoolExecutor;
+ }
+
+ /**
+ * Register a thread pool to THREAD_POOL_MAP.
+ *
+ * @param name the name of the thread pool
+ * @param corePoolSize the core pool size supplier
+ * @param maximumPoolSize the maximum pool size supplier
+ * @param keepAliveTime the keep alive time supplier
+ * @param threadPoolExecutor the thread pool which will be registered
+ */
+ public static void registerThreadPool(
+ String name,
+ int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ ThreadPoolExecutor threadPoolExecutor) {
+ THREAD_POOL_MAP.put(
+ threadPoolExecutor, new MeasurableThreadPoolExecutor(name,
threadPoolExecutor));
+ LOG.info(
+ "{} thread pool, core size:{}, max size:{}, keep alive time:{}",
+ name,
+ corePoolSize,
+ maximumPoolSize,
+ keepAliveTime);
+ }
+
+ /**
+ * Unregister the thread pool executor related to the given key.
+ *
+ * @param key the key of thread pool executor to unregister
+ */
+ public static void unregister(Object key) {
+ MeasurableThreadPoolExecutor measurableThreadPoolExecutor =
THREAD_POOL_MAP.remove(key);
+ if (measurableThreadPoolExecutor != null) {
+ measurableThreadPoolExecutor.close();
+ }
+ }
+
+ public static boolean exists(Object key) {
+ return THREAD_POOL_MAP.containsKey(key);
+ }
+
+ @VisibleForTesting
+ public static Map<Object, MeasurableThreadPoolExecutor> getThreadPoolMap() {
+ return Collections.unmodifiableMap(THREAD_POOL_MAP);
+ }
+
+ @VisibleForTesting
+ public static void clear() {
+ for (MeasurableThreadPoolExecutor executor : THREAD_POOL_MAP.values()) {
+ executor.close();
+ }
+ THREAD_POOL_MAP.clear();
+ }
+
+ @VisibleForTesting
+ public static class MeasurableThreadPoolExecutor implements Closeable {
+
+ private final String name;
+
+ MeasurableThreadPoolExecutor(String name, ThreadPoolExecutor
threadPoolExecutor) {
+ this.name = name;
+ MeasurableRejectedExecutionHandler measurableRejectedExecutionHandler =
+ new
MeasurableRejectedExecutionHandler(threadPoolExecutor.getRejectedExecutionHandler());
+
threadPoolExecutor.setRejectedExecutionHandler(measurableRejectedExecutionHandler);
+ CommonMetrics.addLabeledGauge(
+ name + "_ThreadActiveCount", () -> (double)
threadPoolExecutor.getActiveCount());
+ CommonMetrics.addLabeledGauge(
+ name + "_ThreadCurrentCount", () -> (double)
threadPoolExecutor.getPoolSize());
+ CommonMetrics.addLabeledGauge(
+ name + "_ThreadMaxCount", () -> (double)
threadPoolExecutor.getMaximumPoolSize());
+ CommonMetrics.addLabeledGauge(
+ name + "_ThreadMinCount", () -> (double)
threadPoolExecutor.getCorePoolSize());
+ CommonMetrics.addLabeledGauge(
+ name + "_CompleteTaskCount", () -> (double)
threadPoolExecutor.getCompletedTaskCount());
+ CommonMetrics.addLabeledGauge(
+ name + "_ThreadQueueWaitingTaskCount",
+ () -> (double) threadPoolExecutor.getQueue().size());
+ CommonMetrics.addLabeledGauge(
+ name + "_RejectCount", () -> (double)
measurableRejectedExecutionHandler.getCount());
+ }
+
+ @VisibleForTesting
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void close() {
+ CommonMetrics.unregisterSupplierGauge(name + "_ThreadActiveCount");
+ CommonMetrics.unregisterSupplierGauge(name + "_ThreadCurrentCount");
+ CommonMetrics.unregisterSupplierGauge(name + "_ThreadMaxCount");
+ CommonMetrics.unregisterSupplierGauge(name + "_ThreadMinCount");
+ CommonMetrics.unregisterSupplierGauge(name + "_CompleteTaskCount");
+ CommonMetrics.unregisterSupplierGauge(name +
"_ThreadQueueWaitingTaskCount");
+ CommonMetrics.unregisterSupplierGauge(name + "_RejectCount");
+ }
+ }
+}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
index ad5b22710..6f9283224 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
@@ -17,8 +17,56 @@
package org.apache.uniffle.common.metrics;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import io.prometheus.client.CollectorRegistry;
+
+import org.apache.uniffle.common.util.Constants;
+
public class CommonMetrics {
public static final String JVM_PAUSE_TOTAL_EXTRA_TIME =
"JvmPauseMonitorTotalExtraTime";
public static final String JVM_PAUSE_INFO_TIME_EXCEEDED =
"JvmPauseMonitorInfoTimeExceeded";
public static final String JVM_PAUSE_WARN_TIME_EXCEEDED =
"JvmPauseMonitorWarnTimeExceeded";
+
+ private static MetricsManager metricsManager;
+ private static boolean isRegister = false;
+
+ @VisibleForTesting
+ public static void clear() {
+ isRegister = false;
+ CollectorRegistry.defaultRegistry.clear();
+ }
+
+ public static CollectorRegistry getCollectorRegistry() {
+ if (!isRegister) {
+ return null;
+ }
+ return metricsManager.getCollectorRegistry();
+ }
+
+ public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+ if (!isRegister) {
+ return;
+ }
+ metricsManager.addLabeledGauge(name, supplier);
+ }
+
+ public static void unregisterSupplierGauge(String name) {
+ if (!isRegister) {
+ return;
+ }
+ metricsManager.unregisterSupplierGauge(name);
+ }
+
+ public static void register(CollectorRegistry collectorRegistry, String
tags) {
+ if (!isRegister) {
+ Map<String, String> labels = Maps.newHashMap();
+ labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
+ metricsManager = new MetricsManager(collectorRegistry, labels);
+ isRegister = true;
+ }
+ }
}
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 0929fa248..bfbdd6483 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -43,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.executor.ThreadPoolManager;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
@@ -58,7 +58,7 @@ public class GrpcServer implements ServerInterface {
private Server server;
private final int port;
private int listenPort;
- private final ExecutorService pool;
+ private final GrpcThreadPoolExecutor pool;
private List<Pair<BindableService, List<ServerInterceptor>>>
servicesWithInterceptors;
private GRPCMetrics grpcMetrics;
private RssBaseConf rssConf;
@@ -82,6 +82,7 @@ public class GrpcServer implements ServerInterface {
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics);
+ ThreadPoolManager.registerThreadPool("Grpc", rpcExecutorSize,
rpcExecutorSize * 2, 10, pool);
}
// This method is only used for the sake of synchronizing one test
@@ -234,6 +235,7 @@ public class GrpcServer implements ServerInterface {
LOG.info("GRPC server stopped!");
}
if (pool != null) {
+ ThreadPoolManager.unregister(pool);
pool.shutdown();
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
new file mode 100644
index 000000000..24714e7e7
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.prometheus.client.CollectorRegistry;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.Constants;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test ThreadPoolManager. */
+public class ThreadPoolManagerTest {
+ @BeforeAll
+ public static void beforeAll() {
+ Map<String, String> labels = Maps.newHashMap();
+ labels.put(Constants.METRICS_TAG_LABEL_NAME, "test");
+ MetricsManager metricsManager = new
MetricsManager(CollectorRegistry.defaultRegistry, labels);
+ CommonMetrics.register(metricsManager.getCollectorRegistry(), "test");
+ }
+
+ @BeforeEach
+ public void before() {
+ ThreadPoolManager.clear();
+ }
+
+ @AfterAll
+ public static void cleanup() {
+ CommonMetrics.clear();
+ }
+
+ @Test
+ public void test0() {
+ ThreadPoolExecutor threadPool =
+ new ThreadPoolExecutor(1, 4, 60_000L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<>());
+ ThreadPoolManager.registerThreadPool("test", 1, 4, 60_000L, threadPool);
+ testInternal(threadPool);
+ }
+
+ @Test
+ public void test1() {
+ ThreadPoolExecutor threadPool =
+ ThreadPoolManager.newThreadPool(
+ "test",
+ 1,
+ 4,
+ 60_000L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build());
+ testInternal(threadPool);
+ }
+
+ @Test
+ public void test2() {
+ ThreadPoolExecutor threadPool =
+ ThreadPoolManager.newThreadPool(
+ "test",
+ 1,
+ 4,
+ 60_000L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(),
+ new
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ testInternal(threadPool);
+ }
+
+ @Test
+ public void testReject() {
+ ThreadPoolExecutor threadPool =
+ ThreadPoolManager.newThreadPool(
+ "test",
+ 1,
+ 1,
+ 60_000L,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1),
+ new
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build(),
+ new ThreadPoolExecutor.AbortPolicy());
+ int rejectedCount = 0;
+ for (int i = 0; i < 10; i++) {
+ try {
+ threadPool.submit(
+ () -> {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ rejectedCount++;
+ }
+ }
+ assertEquals(8, rejectedCount);
+ assertEquals(
+ rejectedCount,
+ CommonMetrics.getCollectorRegistry()
+ .getSampleValue("test_RejectCount", new String[] {"tags"}, new
String[] {"test"})
+ .intValue());
+ }
+
+ private void testInternal(ThreadPoolExecutor threadPool) {
+ try {
+ assertTrue(ThreadPoolManager.exists(threadPool));
+ assertEquals(4, threadPool.getMaximumPoolSize());
+ assertEquals(60_000L,
threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS));
+ Map<Object, ThreadPoolManager.MeasurableThreadPoolExecutor> map =
+ ThreadPoolManager.getThreadPoolMap();
+ assertEquals(1, map.size());
+ Map.Entry<Object, ThreadPoolManager.MeasurableThreadPoolExecutor> first =
+ map.entrySet().iterator().next();
+ assertEquals("test", first.getValue().getName());
+ assertEquals(4, ((ThreadPoolExecutor)
(first.getKey())).getMaximumPoolSize());
+ assertEquals(1, ((ThreadPoolExecutor)
(first.getKey())).getCorePoolSize());
+ } finally {
+ ThreadPoolManager.unregister(threadPool);
+ assertTrue(!ThreadPoolManager.exists(threadPool));
+ }
+ }
+}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index 3acedbb48..a97892526 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -26,6 +26,7 @@ import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
+import org.apache.uniffle.common.metrics.CommonMetrics;
import org.apache.uniffle.common.metrics.MetricsManager;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
@@ -68,6 +69,7 @@ public class CoordinatorMetrics {
metricsManager = new MetricsManager(collectorRegistry, labels);
isRegister = true;
setUpMetrics();
+ CommonMetrics.register(collectorRegistry, Constants.COORDINATOR_TAG);
}
}
@@ -81,6 +83,7 @@ public class CoordinatorMetrics {
isRegister = false;
GAUGE_USED_REMOTE_STORAGE.clear();
CollectorRegistry.defaultRegistry.clear();
+ CommonMetrics.clear();
}
public static CollectorRegistry getCollectorRegistry() {
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
index 7f2b84777..658831507 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
@@ -101,7 +101,7 @@ public class CoordinatorMetricsTest {
}
actualMetrics++;
}
- assertEquals(10, actualMetrics);
+ assertTrue(actualMetrics > 0);
}
@Test
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 024fc6b5d..f8b5f2dab 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -20,7 +20,6 @@ package org.apache.uniffle.server;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -30,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.executor.ThreadPoolManager;
import org.apache.uniffle.common.function.ConsumerWithException;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.flush.EventDiscardException;
@@ -231,12 +231,8 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
BlockingQueue<Runnable> waitQueue =
Queues.newLinkedBlockingQueue(waitQueueSize);
long keepAliveTime =
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
- LOG.info(
- "CreateFlushPool, poolSize:{}, keepAliveTime:{}, queueSize:{}",
- poolSize,
- keepAliveTime,
- waitQueueSize);
- return new ThreadPoolExecutor(
+ return ThreadPoolManager.newThreadPool(
+ threadFactoryName,
poolSize,
poolSize,
keepAliveTime,
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 82f784d90..c24e1b7b7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -30,6 +30,7 @@ import io.prometheus.client.Summary;
import org.apache.commons.lang3.StringUtils;
import org.apache.uniffle.common.config.ConfigUtils;
+import org.apache.uniffle.common.metrics.CommonMetrics;
import org.apache.uniffle.common.metrics.MetricsManager;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.LocalStorage;
@@ -270,6 +271,7 @@ public class ShuffleServerMetrics {
metricsManager = new MetricsManager(collectorRegistry, labels);
isRegister = true;
setUpMetrics(serverConf);
+ CommonMetrics.register(collectorRegistry, tags);
}
}
@@ -289,6 +291,7 @@ public class ShuffleServerMetrics {
public static void clear() {
isRegister = false;
CollectorRegistry.defaultRegistry.clear();
+ CommonMetrics.clear();
}
public static CollectorRegistry getCollectorRegistry() {