This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 267353e3f [#2125] feat(common): Introduce ThreadPoolManager associated 
metrics (#2127)
267353e3f is described below

commit 267353e3f3dcc2fc985263b01f10a33deaba3f6a
Author: maobaolong <[email protected]>
AuthorDate: Wed Oct 9 11:49:04 2024 +0800

    [#2125] feat(common): Introduce ThreadPoolManager associated metrics (#2127)
    
    ### What changes were proposed in this pull request?
    
    Introduce ThreadPoolManager support metrics
    
    ### Why are the changes needed?
    
    Fix: #2125
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add batch of metrics related to the registered thread pool.
    
    ### How was this patch tested?
    
    Open the dashboard server metrics page.
    
    <img width="515" alt="image" 
src="https://github.com/user-attachments/assets/561c5b7d-442f-4b2a-b48f-3fbdcc814cb0";>
---
 .../MeasurableRejectedExecutionHandler.java        |  51 ++++++
 .../uniffle/common/executor/ThreadPoolManager.java | 197 +++++++++++++++++++++
 .../uniffle/common/metrics/CommonMetrics.java      |  48 +++++
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |   6 +-
 .../common/executor/ThreadPoolManagerTest.java     | 151 ++++++++++++++++
 .../coordinator/metric/CoordinatorMetrics.java     |   3 +
 .../coordinator/metric/CoordinatorMetricsTest.java |   2 +-
 .../uniffle/server/DefaultFlushEventHandler.java   |  10 +-
 .../uniffle/server/ShuffleServerMetrics.java       |   3 +
 9 files changed, 461 insertions(+), 10 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
 
b/common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
new file mode 100644
index 000000000..5ed8a0456
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/executor/MeasurableRejectedExecutionHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A handler to measure reject count. */
+public class MeasurableRejectedExecutionHandler implements 
RejectedExecutionHandler {
+  private AtomicLong counter = new AtomicLong(0L);
+
+  private final RejectedExecutionHandler handler;
+
+  /**
+   * Constructs a wrapped {@link RejectedExecutionHandler} to measure rejected 
count.
+   *
+   * @param handler the rejected execution handler
+   */
+  public MeasurableRejectedExecutionHandler(RejectedExecutionHandler handler) {
+    this.handler = handler;
+  }
+
+  @Override
+  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+    counter.incrementAndGet();
+    if (handler != null) {
+      handler.rejectedExecution(r, executor);
+    }
+  }
+
+  /** @return the rejected count */
+  public long getCount() {
+    return counter.get();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
 
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
new file mode 100644
index 000000000..842545c16
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+
+/** The threadPool manager which represents a manager to handle all thread 
pool executors. */
+public class ThreadPoolManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ThreadPoolManager.class);
+
+  private static final Map<Object, MeasurableThreadPoolExecutor> 
THREAD_POOL_MAP =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
+    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @param handler the handler to use when execution is blocked because the 
thread bounds and queue
+   *     capacities are reached
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,
+      RejectedExecutionHandler handler) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
+    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Register a thread pool to THREAD_POOL_MAP.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSize the core pool size supplier
+   * @param maximumPoolSize the maximum pool size supplier
+   * @param keepAliveTime the keep alive time supplier
+   * @param threadPoolExecutor the thread pool which will be registered
+   */
+  public static void registerThreadPool(
+      String name,
+      int corePoolSize,
+      int maximumPoolSize,
+      long keepAliveTime,
+      ThreadPoolExecutor threadPoolExecutor) {
+    THREAD_POOL_MAP.put(
+        threadPoolExecutor, new MeasurableThreadPoolExecutor(name, 
threadPoolExecutor));
+    LOG.info(
+        "{} thread pool, core size:{}, max size:{}, keep alive time:{}",
+        name,
+        corePoolSize,
+        maximumPoolSize,
+        keepAliveTime);
+  }
+
+  /**
+   * Unregister the thread pool executor related to the given key.
+   *
+   * @param key the key of thread pool executor to unregister
+   */
+  public static void unregister(Object key) {
+    MeasurableThreadPoolExecutor measurableThreadPoolExecutor = 
THREAD_POOL_MAP.remove(key);
+    if (measurableThreadPoolExecutor != null) {
+      measurableThreadPoolExecutor.close();
+    }
+  }
+
+  public static boolean exists(Object key) {
+    return THREAD_POOL_MAP.containsKey(key);
+  }
+
+  @VisibleForTesting
+  public static Map<Object, MeasurableThreadPoolExecutor> getThreadPoolMap() {
+    return Collections.unmodifiableMap(THREAD_POOL_MAP);
+  }
+
+  @VisibleForTesting
+  public static void clear() {
+    for (MeasurableThreadPoolExecutor executor : THREAD_POOL_MAP.values()) {
+      executor.close();
+    }
+    THREAD_POOL_MAP.clear();
+  }
+
+  @VisibleForTesting
+  public static class MeasurableThreadPoolExecutor implements Closeable {
+
+    private final String name;
+
+    MeasurableThreadPoolExecutor(String name, ThreadPoolExecutor 
threadPoolExecutor) {
+      this.name = name;
+      MeasurableRejectedExecutionHandler measurableRejectedExecutionHandler =
+          new 
MeasurableRejectedExecutionHandler(threadPoolExecutor.getRejectedExecutionHandler());
+      
threadPoolExecutor.setRejectedExecutionHandler(measurableRejectedExecutionHandler);
+      CommonMetrics.addLabeledGauge(
+          name + "_ThreadActiveCount", () -> (double) 
threadPoolExecutor.getActiveCount());
+      CommonMetrics.addLabeledGauge(
+          name + "_ThreadCurrentCount", () -> (double) 
threadPoolExecutor.getPoolSize());
+      CommonMetrics.addLabeledGauge(
+          name + "_ThreadMaxCount", () -> (double) 
threadPoolExecutor.getMaximumPoolSize());
+      CommonMetrics.addLabeledGauge(
+          name + "_ThreadMinCount", () -> (double) 
threadPoolExecutor.getCorePoolSize());
+      CommonMetrics.addLabeledGauge(
+          name + "_CompleteTaskCount", () -> (double) 
threadPoolExecutor.getCompletedTaskCount());
+      CommonMetrics.addLabeledGauge(
+          name + "_ThreadQueueWaitingTaskCount",
+          () -> (double) threadPoolExecutor.getQueue().size());
+      CommonMetrics.addLabeledGauge(
+          name + "_RejectCount", () -> (double) 
measurableRejectedExecutionHandler.getCount());
+    }
+
+    @VisibleForTesting
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public void close() {
+      CommonMetrics.unregisterSupplierGauge(name + "_ThreadActiveCount");
+      CommonMetrics.unregisterSupplierGauge(name + "_ThreadCurrentCount");
+      CommonMetrics.unregisterSupplierGauge(name + "_ThreadMaxCount");
+      CommonMetrics.unregisterSupplierGauge(name + "_ThreadMinCount");
+      CommonMetrics.unregisterSupplierGauge(name + "_CompleteTaskCount");
+      CommonMetrics.unregisterSupplierGauge(name + 
"_ThreadQueueWaitingTaskCount");
+      CommonMetrics.unregisterSupplierGauge(name + "_RejectCount");
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
index ad5b22710..6f9283224 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
@@ -17,8 +17,56 @@
 
 package org.apache.uniffle.common.metrics;
 
+import java.util.Map;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import io.prometheus.client.CollectorRegistry;
+
+import org.apache.uniffle.common.util.Constants;
+
 public class CommonMetrics {
   public static final String JVM_PAUSE_TOTAL_EXTRA_TIME = 
"JvmPauseMonitorTotalExtraTime";
   public static final String JVM_PAUSE_INFO_TIME_EXCEEDED = 
"JvmPauseMonitorInfoTimeExceeded";
   public static final String JVM_PAUSE_WARN_TIME_EXCEEDED = 
"JvmPauseMonitorWarnTimeExceeded";
+
+  private static MetricsManager metricsManager;
+  private static boolean isRegister = false;
+
+  @VisibleForTesting
+  public static void clear() {
+    isRegister = false;
+    CollectorRegistry.defaultRegistry.clear();
+  }
+
+  public static CollectorRegistry getCollectorRegistry() {
+    if (!isRegister) {
+      return null;
+    }
+    return metricsManager.getCollectorRegistry();
+  }
+
+  public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+    if (!isRegister) {
+      return;
+    }
+    metricsManager.addLabeledGauge(name, supplier);
+  }
+
+  public static void unregisterSupplierGauge(String name) {
+    if (!isRegister) {
+      return;
+    }
+    metricsManager.unregisterSupplierGauge(name);
+  }
+
+  public static void register(CollectorRegistry collectorRegistry, String 
tags) {
+    if (!isRegister) {
+      Map<String, String> labels = Maps.newHashMap();
+      labels.put(Constants.METRICS_TAG_LABEL_NAME, tags);
+      metricsManager = new MetricsManager(collectorRegistry, labels);
+      isRegister = true;
+    }
+  }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 0929fa248..bfbdd6483 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -43,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.executor.ThreadPoolManager;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ExitUtils;
@@ -58,7 +58,7 @@ public class GrpcServer implements ServerInterface {
   private Server server;
   private final int port;
   private int listenPort;
-  private final ExecutorService pool;
+  private final GrpcThreadPoolExecutor pool;
   private List<Pair<BindableService, List<ServerInterceptor>>> 
servicesWithInterceptors;
   private GRPCMetrics grpcMetrics;
   private RssBaseConf rssConf;
@@ -82,6 +82,7 @@ public class GrpcServer implements ServerInterface {
             Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
             ThreadUtils.getThreadFactory("Grpc"),
             grpcMetrics);
+    ThreadPoolManager.registerThreadPool("Grpc", rpcExecutorSize, 
rpcExecutorSize * 2, 10, pool);
   }
 
   // This method is only used for the sake of synchronizing one test
@@ -234,6 +235,7 @@ public class GrpcServer implements ServerInterface {
       LOG.info("GRPC server stopped!");
     }
     if (pool != null) {
+      ThreadPoolManager.unregister(pool);
       pool.shutdown();
     }
   }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
new file mode 100644
index 000000000..24714e7e7
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.executor;
+
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.prometheus.client.CollectorRegistry;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.metrics.CommonMetrics;
+import org.apache.uniffle.common.metrics.MetricsManager;
+import org.apache.uniffle.common.util.Constants;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test ThreadPoolManager. */
+public class ThreadPoolManagerTest {
+  @BeforeAll
+  public static void beforeAll() {
+    Map<String, String> labels = Maps.newHashMap();
+    labels.put(Constants.METRICS_TAG_LABEL_NAME, "test");
+    MetricsManager metricsManager = new 
MetricsManager(CollectorRegistry.defaultRegistry, labels);
+    CommonMetrics.register(metricsManager.getCollectorRegistry(), "test");
+  }
+
+  @BeforeEach
+  public void before() {
+    ThreadPoolManager.clear();
+  }
+
+  @AfterAll
+  public static void cleanup() {
+    CommonMetrics.clear();
+  }
+
+  @Test
+  public void test0() {
+    ThreadPoolExecutor threadPool =
+        new ThreadPoolExecutor(1, 4, 60_000L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<>());
+    ThreadPoolManager.registerThreadPool("test", 1, 4, 60_000L, threadPool);
+    testInternal(threadPool);
+  }
+
+  @Test
+  public void test1() {
+    ThreadPoolExecutor threadPool =
+        ThreadPoolManager.newThreadPool(
+            "test",
+            1,
+            4,
+            60_000L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new 
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build());
+    testInternal(threadPool);
+  }
+
+  @Test
+  public void test2() {
+    ThreadPoolExecutor threadPool =
+        ThreadPoolManager.newThreadPool(
+            "test",
+            1,
+            4,
+            60_000L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(),
+            new 
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build(),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    testInternal(threadPool);
+  }
+
+  @Test
+  public void testReject() {
+    ThreadPoolExecutor threadPool =
+        ThreadPoolManager.newThreadPool(
+            "test",
+            1,
+            1,
+            60_000L,
+            TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<>(1),
+            new 
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build(),
+            new ThreadPoolExecutor.AbortPolicy());
+    int rejectedCount = 0;
+    for (int i = 0; i < 10; i++) {
+      try {
+        threadPool.submit(
+            () -> {
+              try {
+                Thread.sleep(1000L);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            });
+      } catch (RejectedExecutionException e) {
+        rejectedCount++;
+      }
+    }
+    assertEquals(8, rejectedCount);
+    assertEquals(
+        rejectedCount,
+        CommonMetrics.getCollectorRegistry()
+            .getSampleValue("test_RejectCount", new String[] {"tags"}, new 
String[] {"test"})
+            .intValue());
+  }
+
+  private void testInternal(ThreadPoolExecutor threadPool) {
+    try {
+      assertTrue(ThreadPoolManager.exists(threadPool));
+      assertEquals(4, threadPool.getMaximumPoolSize());
+      assertEquals(60_000L, 
threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS));
+      Map<Object, ThreadPoolManager.MeasurableThreadPoolExecutor> map =
+          ThreadPoolManager.getThreadPoolMap();
+      assertEquals(1, map.size());
+      Map.Entry<Object, ThreadPoolManager.MeasurableThreadPoolExecutor> first =
+          map.entrySet().iterator().next();
+      assertEquals("test", first.getValue().getName());
+      assertEquals(4, ((ThreadPoolExecutor) 
(first.getKey())).getMaximumPoolSize());
+      assertEquals(1, ((ThreadPoolExecutor) 
(first.getKey())).getCorePoolSize());
+    } finally {
+      ThreadPoolManager.unregister(threadPool);
+      assertTrue(!ThreadPoolManager.exists(threadPool));
+    }
+  }
+}
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index 3acedbb48..a97892526 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -26,6 +26,7 @@ import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.uniffle.common.metrics.CommonMetrics;
 import org.apache.uniffle.common.metrics.MetricsManager;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.JavaUtils;
@@ -68,6 +69,7 @@ public class CoordinatorMetrics {
       metricsManager = new MetricsManager(collectorRegistry, labels);
       isRegister = true;
       setUpMetrics();
+      CommonMetrics.register(collectorRegistry, Constants.COORDINATOR_TAG);
     }
   }
 
@@ -81,6 +83,7 @@ public class CoordinatorMetrics {
     isRegister = false;
     GAUGE_USED_REMOTE_STORAGE.clear();
     CollectorRegistry.defaultRegistry.clear();
+    CommonMetrics.clear();
   }
 
   public static CollectorRegistry getCollectorRegistry() {
diff --git 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
index 7f2b84777..658831507 100644
--- 
a/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
+++ 
b/coordinator/src/test/java/org/apache/uniffle/coordinator/metric/CoordinatorMetricsTest.java
@@ -101,7 +101,7 @@ public class CoordinatorMetricsTest {
       }
       actualMetrics++;
     }
-    assertEquals(10, actualMetrics);
+    assertTrue(actualMetrics > 0);
   }
 
   @Test
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 024fc6b5d..f8b5f2dab 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -20,7 +20,6 @@ package org.apache.uniffle.server;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -30,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.executor.ThreadPoolManager;
 import org.apache.uniffle.common.function.ConsumerWithException;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.flush.EventDiscardException;
@@ -231,12 +231,8 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
         
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
     BlockingQueue<Runnable> waitQueue = 
Queues.newLinkedBlockingQueue(waitQueueSize);
     long keepAliveTime = 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
-    LOG.info(
-        "CreateFlushPool, poolSize:{}, keepAliveTime:{}, queueSize:{}",
-        poolSize,
-        keepAliveTime,
-        waitQueueSize);
-    return new ThreadPoolExecutor(
+    return ThreadPoolManager.newThreadPool(
+        threadFactoryName,
         poolSize,
         poolSize,
         keepAliveTime,
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 82f784d90..c24e1b7b7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -30,6 +30,7 @@ import io.prometheus.client.Summary;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.uniffle.common.config.ConfigUtils;
+import org.apache.uniffle.common.metrics.CommonMetrics;
 import org.apache.uniffle.common.metrics.MetricsManager;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.LocalStorage;
@@ -270,6 +271,7 @@ public class ShuffleServerMetrics {
       metricsManager = new MetricsManager(collectorRegistry, labels);
       isRegister = true;
       setUpMetrics(serverConf);
+      CommonMetrics.register(collectorRegistry, tags);
     }
   }
 
@@ -289,6 +291,7 @@ public class ShuffleServerMetrics {
   public static void clear() {
     isRegister = false;
     CollectorRegistry.defaultRegistry.clear();
+    CommonMetrics.clear();
   }
 
   public static CollectorRegistry getCollectorRegistry() {

Reply via email to