This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 705271176 [#2169] feat(server)(coordinator): Support dynamic thread 
number update about ThreadPool (#2168)
705271176 is described below

commit 7052711760ce9d3469175dd155a8fd5da468d853
Author: maobaolong <[email protected]>
AuthorDate: Wed Oct 16 11:26:24 2024 +0800

    [#2169] feat(server)(coordinator): Support dynamic thread number update 
about ThreadPool (#2168)
    
    ### What changes were proposed in this pull request?
    
    Support dynamic update ThreadPool
    
    ### Why are the changes needed?
    
    Fix: #2169
    
    ### Does this PR introduce _any_ user-facing change?
    
    Change update conf rest api to support update multiply keys one time.
    
    ### How was this patch tested?
    
    - curl command line
    ```
    
    ➜  ~ curl -X POST http://localhost:19978/api/shuffleServer/confOps/update \
    -H "Content-Type: application/json" \
    -d '{"update":{"rss.server.flush.localfile.threadPool.size": "122", 
"rss.server.flush.thread.alive": "112"}}'
    temporarily effective until restart: Update successfully%                   
                                                                                
                                                                                
       ➜  ~ curl -X POST 
http://localhost:19978/api/shuffleServer/confOps/update \
    -H "Content-Type: application/json" \
    -d '{"update":{"rss.server.flush.localfile.threadPool.size": "122", 
"rss.server.flush.thread.alive": "112", "rss.rpc.executor.size": "123"}}'
    temporarily effective until restart: Update successfully%
    ```
    
    - server.log
    ```
    [2024-10-11 23:48:37.508] [Jetty-5] [INFO] ConfOpsResource - Dynamic 
updating ConfVO{update={rss.server.flush.localfile.threadPool.size=122, 
rss.server.flush.thread.alive=112}, delete=[]}
    [2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated 
thread pool LocalFileFlushEventThreadPool keep alive time from 5000 to 112000
    [2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated 
thread pool LocalFileFlushEventThreadPool MaximumPoolSize from 10 to 122
    [2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated 
thread pool LocalFileFlushEventThreadPool CorePoolSize from 10 to 122
    [2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated 
thread pool HadoopFlushEventThreadPool keep alive time from 5000 to 112000
    [2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated 
thread pool FallBackFlushEventThreadPool keep alive time from 5000 to 112000
    [2024-10-11 23:50:53.749] [Jetty-8] [INFO] ConfOpsResource - Dynamic 
updating ConfVO{update={rss.server.flush.localfile.threadPool.size=122, 
rss.server.flush.thread.alive=112, rss.rpc.executor.size=123}, delete=[]}
    [2024-10-11 23:50:53.757] [Jetty-8] [INFO] ThreadPoolManager - Updated 
thread pool Grpc MaximumPoolSize from 2000 to 246
    [2024-10-11 23:50:53.757] [Jetty-8] [INFO] ThreadPoolManager - Updated 
thread pool Grpc CorePoolSize from 1000 to 123
    ```
---
 .../uniffle/common/executor/ThreadPoolManager.java | 252 ++++++++++++++++++---
 .../uniffle/common/metrics/CommonMetrics.java      |   9 +-
 .../org/apache/uniffle/common/rpc/GrpcServer.java  |   7 +-
 .../common/executor/ThreadPoolManagerTest.java     |  44 ++--
 .../uniffle/server/DefaultFlushEventHandler.java   |  30 ++-
 5 files changed, 274 insertions(+), 68 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
 
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
index 842545c16..6f5f9ac7e 100644
--- 
a/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
+++ 
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
@@ -20,17 +20,21 @@ package org.apache.uniffle.common.executor;
 import java.io.Closeable;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.ReconfigurableRegistry;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.metrics.CommonMetrics;
 
 /** The threadPool manager which represents a manager to handle all thread 
pool executors. */
@@ -44,14 +48,15 @@ public class ThreadPoolManager {
    * Add a thread pool.
    *
    * @param name the name of the thread pool
-   * @param corePoolSize the core pool size supplier
-   * @param maximumPoolSize the maximum pool size supplier
-   * @param keepAliveTime the keep alive time supplier
+   * @param corePoolSize the core pool size
+   * @param maximumPoolSize the maximum pool size
+   * @param keepAliveTime the keep alive time
    * @param unit the unit
    * @param workQueue the work queue
    * @param threadFactory the thread factory
    * @return the registered thread pool
    */
+  @VisibleForTesting
   public static ThreadPoolExecutor newThreadPool(
       String name,
       int corePoolSize,
@@ -60,20 +65,23 @@ public class ThreadPoolManager {
       TimeUnit unit,
       BlockingQueue<Runnable> workQueue,
       ThreadFactory threadFactory) {
-    ThreadPoolExecutor threadPoolExecutor =
-        new ThreadPoolExecutor(
-            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory);
-    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
-    return threadPoolExecutor;
+    return newThreadPool(
+        name,
+        () -> corePoolSize,
+        () -> maximumPoolSize,
+        () -> keepAliveTime,
+        unit,
+        workQueue,
+        threadFactory);
   }
 
   /**
    * Add a thread pool.
    *
    * @param name the name of the thread pool
-   * @param corePoolSize the core pool size supplier
-   * @param maximumPoolSize the maximum pool size supplier
-   * @param keepAliveTime the keep alive time supplier
+   * @param corePoolSize the core pool size
+   * @param maximumPoolSize the maximum pool size
+   * @param keepAliveTime the keep alive time
    * @param unit the unit
    * @param workQueue the work queue
    * @param threadFactory the thread factory
@@ -81,6 +89,7 @@ public class ThreadPoolManager {
    *     capacities are reached
    * @return the registered thread pool
    */
+  @VisibleForTesting
   public static ThreadPoolExecutor newThreadPool(
       String name,
       int corePoolSize,
@@ -90,36 +99,145 @@ public class ThreadPoolManager {
       BlockingQueue<Runnable> workQueue,
       ThreadFactory threadFactory,
       RejectedExecutionHandler handler) {
+    return newThreadPool(
+        name,
+        () -> corePoolSize,
+        () -> maximumPoolSize,
+        () -> keepAliveTime,
+        unit,
+        workQueue,
+        threadFactory,
+        handler);
+  }
+
+  /**
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSizeSupplier the core pool size supplier
+   * @param maximumPoolSizeSupplier the maximum pool size supplier
+   * @param keepAliveTimeSupplier the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      Supplier<Integer> corePoolSizeSupplier,
+      Supplier<Integer> maximumPoolSizeSupplier,
+      Supplier<Long> keepAliveTimeSupplier,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory) {
     ThreadPoolExecutor threadPoolExecutor =
         new ThreadPoolExecutor(
-            corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
threadFactory, handler);
-    registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime, 
threadPoolExecutor);
+            corePoolSizeSupplier.get(),
+            maximumPoolSizeSupplier.get(),
+            keepAliveTimeSupplier.get(),
+            unit,
+            workQueue,
+            threadFactory);
+    registerThreadPool(
+        name,
+        corePoolSizeSupplier,
+        maximumPoolSizeSupplier,
+        () -> keepAliveTimeSupplier.get() * unit.toMillis(1),
+        threadPoolExecutor);
     return threadPoolExecutor;
   }
 
   /**
-   * Register a thread pool to THREAD_POOL_MAP.
+   * Add a thread pool.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSizeSupplier the core pool size supplier
+   * @param maximumPoolSizeSupplier the maximum pool size supplier
+   * @param keepAliveTimeSupplier the keep alive time supplier
+   * @param unit the unit
+   * @param workQueue the work queue
+   * @param threadFactory the thread factory
+   * @param handler the handler to use when execution is blocked because the 
thread bounds and queue
+   *     capacities are reached
+   * @return the registered thread pool
+   */
+  public static ThreadPoolExecutor newThreadPool(
+      String name,
+      Supplier<Integer> corePoolSizeSupplier,
+      Supplier<Integer> maximumPoolSizeSupplier,
+      Supplier<Long> keepAliveTimeSupplier,
+      TimeUnit unit,
+      BlockingQueue<Runnable> workQueue,
+      ThreadFactory threadFactory,
+      RejectedExecutionHandler handler) {
+    ThreadPoolExecutor threadPoolExecutor =
+        new ThreadPoolExecutor(
+            corePoolSizeSupplier.get(),
+            maximumPoolSizeSupplier.get(),
+            keepAliveTimeSupplier.get(),
+            unit,
+            workQueue,
+            threadFactory,
+            handler);
+    registerThreadPool(
+        name,
+        corePoolSizeSupplier,
+        maximumPoolSizeSupplier,
+        keepAliveTimeSupplier,
+        threadPoolExecutor);
+    return threadPoolExecutor;
+  }
+
+  /**
+   * Add a thread pool.
    *
    * @param name the name of the thread pool
-   * @param corePoolSize the core pool size supplier
-   * @param maximumPoolSize the maximum pool size supplier
-   * @param keepAliveTime the keep alive time supplier
+   * @param corePoolSize the core pool size
+   * @param maximumPoolSize the maximum pool size
+   * @param keepAliveTime the keep alive time
    * @param threadPoolExecutor the thread pool which will be registered
+   * @return the registered thread pool
    */
+  @VisibleForTesting
   public static void registerThreadPool(
       String name,
       int corePoolSize,
       int maximumPoolSize,
       long keepAliveTime,
       ThreadPoolExecutor threadPoolExecutor) {
+    registerThreadPool(
+        name, () -> corePoolSize, () -> maximumPoolSize, () -> keepAliveTime, 
threadPoolExecutor);
+  }
+
+  /**
+   * Register a thread pool to THREAD_POOL_MAP.
+   *
+   * @param name the name of the thread pool
+   * @param corePoolSizeSupplier the core pool size supplier
+   * @param maximumPoolSizeSupplier the maximum pool size supplier
+   * @param keepAliveTimeSupplier the keep alive time supplier
+   * @param threadPoolExecutor the thread pool which will be registered
+   */
+  public static void registerThreadPool(
+      String name,
+      Supplier<Integer> corePoolSizeSupplier,
+      Supplier<Integer> maximumPoolSizeSupplier,
+      Supplier<Long> keepAliveTimeSupplier,
+      ThreadPoolExecutor threadPoolExecutor) {
     THREAD_POOL_MAP.put(
-        threadPoolExecutor, new MeasurableThreadPoolExecutor(name, 
threadPoolExecutor));
+        threadPoolExecutor,
+        new MeasurableThreadPoolExecutor(
+            name,
+            threadPoolExecutor,
+            corePoolSizeSupplier,
+            maximumPoolSizeSupplier,
+            keepAliveTimeSupplier));
     LOG.info(
         "{} thread pool, core size:{}, max size:{}, keep alive time:{}",
         name,
-        corePoolSize,
-        maximumPoolSize,
-        keepAliveTime);
+        corePoolSizeSupplier,
+        maximumPoolSizeSupplier,
+        keepAliveTimeSupplier);
   }
 
   /**
@@ -152,30 +270,43 @@ public class ThreadPoolManager {
   }
 
   @VisibleForTesting
-  public static class MeasurableThreadPoolExecutor implements Closeable {
-
+  public static class MeasurableThreadPoolExecutor
+      implements Closeable, ReconfigurableRegistry.ReconfigureListener {
     private final String name;
+    private final ThreadPoolExecutor threadPoolExecutor;
+    private final Supplier<Integer> corePoolSizeSupplier;
+    private final Supplier<Integer> maximumPoolSizeSupplier;
+    private final Supplier<Long> keepAliveTimeSupplier;
 
-    MeasurableThreadPoolExecutor(String name, ThreadPoolExecutor 
threadPoolExecutor) {
+    MeasurableThreadPoolExecutor(
+        String name,
+        ThreadPoolExecutor threadPoolExecutor,
+        Supplier<Integer> corePoolSizeSupplier,
+        Supplier<Integer> maximumPoolSizeSupplier,
+        Supplier<Long> keepAliveTimeSupplier) {
       this.name = name;
+      this.threadPoolExecutor = threadPoolExecutor;
+      this.corePoolSizeSupplier = corePoolSizeSupplier;
+      this.maximumPoolSizeSupplier = maximumPoolSizeSupplier;
+      this.keepAliveTimeSupplier = keepAliveTimeSupplier;
+
       MeasurableRejectedExecutionHandler measurableRejectedExecutionHandler =
           new 
MeasurableRejectedExecutionHandler(threadPoolExecutor.getRejectedExecutionHandler());
       
threadPoolExecutor.setRejectedExecutionHandler(measurableRejectedExecutionHandler);
       CommonMetrics.addLabeledGauge(
-          name + "_ThreadActiveCount", () -> (double) 
threadPoolExecutor.getActiveCount());
-      CommonMetrics.addLabeledGauge(
-          name + "_ThreadCurrentCount", () -> (double) 
threadPoolExecutor.getPoolSize());
-      CommonMetrics.addLabeledGauge(
-          name + "_ThreadMaxCount", () -> (double) 
threadPoolExecutor.getMaximumPoolSize());
+          name + "_ThreadActiveCount", threadPoolExecutor::getActiveCount);
+      CommonMetrics.addLabeledGauge(name + "_ThreadCurrentCount", 
threadPoolExecutor::getPoolSize);
       CommonMetrics.addLabeledGauge(
-          name + "_ThreadMinCount", () -> (double) 
threadPoolExecutor.getCorePoolSize());
+          name + "_ThreadMaxCount", threadPoolExecutor::getMaximumPoolSize);
+      CommonMetrics.addLabeledGauge(name + "_ThreadMinCount", 
threadPoolExecutor::getCorePoolSize);
       CommonMetrics.addLabeledGauge(
-          name + "_CompleteTaskCount", () -> (double) 
threadPoolExecutor.getCompletedTaskCount());
+          name + "_CompleteTaskCount", 
threadPoolExecutor::getCompletedTaskCount);
       CommonMetrics.addLabeledGauge(
-          name + "_ThreadQueueWaitingTaskCount",
-          () -> (double) threadPoolExecutor.getQueue().size());
+          name + "_ThreadQueueWaitingTaskCount", 
threadPoolExecutor.getQueue()::size);
       CommonMetrics.addLabeledGauge(
-          name + "_RejectCount", () -> (double) 
measurableRejectedExecutionHandler.getCount());
+          name + "_RejectCount", measurableRejectedExecutionHandler::getCount);
+
+      ReconfigurableRegistry.register(this);
     }
 
     @VisibleForTesting
@@ -185,6 +316,7 @@ public class ThreadPoolManager {
 
     @Override
     public void close() {
+      ReconfigurableRegistry.unregister(this);
       CommonMetrics.unregisterSupplierGauge(name + "_ThreadActiveCount");
       CommonMetrics.unregisterSupplierGauge(name + "_ThreadCurrentCount");
       CommonMetrics.unregisterSupplierGauge(name + "_ThreadMaxCount");
@@ -193,5 +325,57 @@ public class ThreadPoolManager {
       CommonMetrics.unregisterSupplierGauge(name + 
"_ThreadQueueWaitingTaskCount");
       CommonMetrics.unregisterSupplierGauge(name + "_RejectCount");
     }
+
+    @Override
+    public void update(RssConf conf, Set<String> changedProperties) {
+      int newCorePoolSize = this.corePoolSizeSupplier.get();
+      int newMaximumPoolSize = this.maximumPoolSizeSupplier.get();
+      if (this.keepAliveTimeSupplier != null) {
+        long keepAliveTime = keepAliveTimeSupplier.get();
+        if (keepAliveTime > 0
+            && keepAliveTime != 
threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS)) {
+          LOG.info(
+              "Updated thread pool {} keep alive time from {} to {}",
+              name,
+              threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS),
+              keepAliveTime);
+          threadPoolExecutor.setKeepAliveTime(keepAliveTime, 
TimeUnit.MILLISECONDS);
+        }
+      }
+      if (newCorePoolSize != threadPoolExecutor.getPoolSize()
+          && newMaximumPoolSize != threadPoolExecutor.getMaximumPoolSize()) {
+        LOG.info(
+            "Updated thread pool {} MaximumPoolSize from {} to {}",
+            name,
+            threadPoolExecutor.getMaximumPoolSize(),
+            newMaximumPoolSize);
+        LOG.info(
+            "Updated thread pool {} CorePoolSize from {} to {}",
+            name,
+            threadPoolExecutor.getCorePoolSize(),
+            newCorePoolSize);
+        if (newCorePoolSize > threadPoolExecutor.getMaximumPoolSize()) {
+          threadPoolExecutor.setMaximumPoolSize(newMaximumPoolSize);
+          threadPoolExecutor.setCorePoolSize(newCorePoolSize);
+        } else {
+          threadPoolExecutor.setCorePoolSize(newCorePoolSize);
+          threadPoolExecutor.setMaximumPoolSize(newMaximumPoolSize);
+        }
+      } else if (newMaximumPoolSize != 
threadPoolExecutor.getMaximumPoolSize()) {
+        LOG.info(
+            "Updated thread pool {} MaximumPoolSize from {} to {}",
+            name,
+            threadPoolExecutor.getMaximumPoolSize(),
+            newMaximumPoolSize);
+        threadPoolExecutor.setMaximumPoolSize(newMaximumPoolSize);
+      } else if (newCorePoolSize != threadPoolExecutor.getCorePoolSize()) {
+        LOG.info(
+            "Updated thread pool {} CorePoolSize from {} to {}",
+            name,
+            threadPoolExecutor.getCorePoolSize(),
+            newCorePoolSize);
+        threadPoolExecutor.setCorePoolSize(newCorePoolSize);
+      }
+    }
   }
 }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
index 6f9283224..ef8a98d20 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
@@ -47,11 +47,16 @@ public class CommonMetrics {
     return metricsManager.getCollectorRegistry();
   }
 
-  public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+  public static <T extends Number> void addLabeledGauge(String name, 
Supplier<T> supplier) {
+    addLabeledCacheGauge(name, supplier, 0);
+  }
+
+  public static <T extends Number> void addLabeledCacheGauge(
+      String name, Supplier<T> supplier, long updateInterval) {
     if (!isRegister) {
       return;
     }
-    metricsManager.addLabeledGauge(name, supplier);
+    metricsManager.addLabeledCacheGauge(name, supplier, updateInterval);
   }
 
   public static void unregisterSupplierGauge(String name) {
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index bfbdd6483..2e51cce55 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -82,7 +82,12 @@ public class GrpcServer implements ServerInterface {
             Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
             ThreadUtils.getThreadFactory("Grpc"),
             grpcMetrics);
-    ThreadPoolManager.registerThreadPool("Grpc", rpcExecutorSize, 
rpcExecutorSize * 2, 10, pool);
+    ThreadPoolManager.registerThreadPool(
+        "Grpc",
+        () -> conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE),
+        () -> conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE) * 2,
+        () -> TimeUnit.MINUTES.toMillis(10),
+        pool);
   }
 
   // This method is only used for the sake of synchronizing one test
diff --git 
a/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
 
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
index 24714e7e7..f1a60410e 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
@@ -107,27 +107,32 @@ public class ThreadPoolManagerTest {
             new LinkedBlockingQueue<>(1),
             new 
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build(),
             new ThreadPoolExecutor.AbortPolicy());
-    int rejectedCount = 0;
-    for (int i = 0; i < 10; i++) {
-      try {
-        threadPool.submit(
-            () -> {
-              try {
-                Thread.sleep(1000L);
-              } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-            });
-      } catch (RejectedExecutionException e) {
-        rejectedCount++;
+    try {
+      int rejectedCount = 0;
+      for (int i = 0; i < 10; i++) {
+        try {
+          threadPool.submit(
+              () -> {
+                try {
+                  Thread.sleep(1000L);
+                } catch (InterruptedException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+        } catch (RejectedExecutionException e) {
+          rejectedCount++;
+        }
       }
+      assertEquals(8, rejectedCount);
+      assertEquals(
+          rejectedCount,
+          CommonMetrics.getCollectorRegistry()
+              .getSampleValue("test_RejectCount", new String[] {"tags"}, new 
String[] {"test"})
+              .intValue());
+    } finally {
+      ThreadPoolManager.unregister(threadPool);
+      threadPool.shutdown();
     }
-    assertEquals(8, rejectedCount);
-    assertEquals(
-        rejectedCount,
-        CommonMetrics.getCollectorRegistry()
-            .getSampleValue("test_RejectCount", new String[] {"tags"}, new 
String[] {"test"})
-            .intValue());
   }
 
   private void testInternal(ThreadPoolExecutor threadPool) {
@@ -146,6 +151,7 @@ public class ThreadPoolManagerTest {
     } finally {
       ThreadPoolManager.unregister(threadPool);
       assertTrue(!ThreadPoolManager.exists(threadPool));
+      threadPool.shutdown();
     }
   }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index f8b5f2dab..5b06bce76 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
@@ -165,17 +166,22 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
 
   protected void initFlushEventExecutor() {
     if (StorageType.withLocalfile(storageType)) {
-      int poolSize =
-          
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE);
       localFileThreadPoolExecutor =
-          createFlushEventExecutor(poolSize, "LocalFileFlushEventThreadPool");
+          createFlushEventExecutor(
+              () ->
+                  shuffleServerConf.getInteger(
+                      
ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE),
+              "LocalFileFlushEventThreadPool");
     }
     if (StorageType.withHadoop(storageType)) {
-      int poolSize =
-          
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE);
-      hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize, 
"HadoopFlushEventThreadPool");
+      hadoopThreadPoolExecutor =
+          createFlushEventExecutor(
+              () ->
+                  shuffleServerConf.getInteger(
+                      ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE),
+              "HadoopFlushEventThreadPool");
     }
-    fallbackThreadPoolExecutor = createFlushEventExecutor(5, 
"FallBackFlushEventThreadPool");
+    fallbackThreadPoolExecutor = createFlushEventExecutor(() -> 5, 
"FallBackFlushEventThreadPool");
     ShuffleServerMetrics.addLabeledGauge(EVENT_QUEUE_SIZE, () -> (double) 
flushQueue.size());
     startEventProcessor();
   }
@@ -226,16 +232,16 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
     }
   }
 
-  protected Executor createFlushEventExecutor(int poolSize, String 
threadFactoryName) {
+  protected Executor createFlushEventExecutor(
+      Supplier<Integer> poolSizeSupplier, String threadFactoryName) {
     int waitQueueSize =
         
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
     BlockingQueue<Runnable> waitQueue = 
Queues.newLinkedBlockingQueue(waitQueueSize);
-    long keepAliveTime = 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
     return ThreadPoolManager.newThreadPool(
         threadFactoryName,
-        poolSize,
-        poolSize,
-        keepAliveTime,
+        poolSizeSupplier,
+        poolSizeSupplier,
+        () -> 
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE),
         TimeUnit.SECONDS,
         waitQueue,
         ThreadUtils.getThreadFactory(threadFactoryName));

Reply via email to