This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 705271176 [#2169] feat(server)(coordinator): Support dynamic thread
number update about ThreadPool (#2168)
705271176 is described below
commit 7052711760ce9d3469175dd155a8fd5da468d853
Author: maobaolong <[email protected]>
AuthorDate: Wed Oct 16 11:26:24 2024 +0800
[#2169] feat(server)(coordinator): Support dynamic thread number update
about ThreadPool (#2168)
### What changes were proposed in this pull request?
Support dynamic update ThreadPool
### Why are the changes needed?
Fix: #2169
### Does this PR introduce _any_ user-facing change?
Change update conf rest api to support update multiply keys one time.
### How was this patch tested?
- curl command line
```
➜ ~ curl -X POST http://localhost:19978/api/shuffleServer/confOps/update \
-H "Content-Type: application/json" \
-d '{"update":{"rss.server.flush.localfile.threadPool.size": "122",
"rss.server.flush.thread.alive": "112"}}'
temporarily effective until restart: Update successfully%
➜ ~ curl -X POST
http://localhost:19978/api/shuffleServer/confOps/update \
-H "Content-Type: application/json" \
-d '{"update":{"rss.server.flush.localfile.threadPool.size": "122",
"rss.server.flush.thread.alive": "112", "rss.rpc.executor.size": "123"}}'
temporarily effective until restart: Update successfully%
```
- server.log
```
[2024-10-11 23:48:37.508] [Jetty-5] [INFO] ConfOpsResource - Dynamic
updating ConfVO{update={rss.server.flush.localfile.threadPool.size=122,
rss.server.flush.thread.alive=112}, delete=[]}
[2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated
thread pool LocalFileFlushEventThreadPool keep alive time from 5000 to 112000
[2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated
thread pool LocalFileFlushEventThreadPool MaximumPoolSize from 10 to 122
[2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated
thread pool LocalFileFlushEventThreadPool CorePoolSize from 10 to 122
[2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated
thread pool HadoopFlushEventThreadPool keep alive time from 5000 to 112000
[2024-10-11 23:48:37.513] [Jetty-5] [INFO] ThreadPoolManager - Updated
thread pool FallBackFlushEventThreadPool keep alive time from 5000 to 112000
[2024-10-11 23:50:53.749] [Jetty-8] [INFO] ConfOpsResource - Dynamic
updating ConfVO{update={rss.server.flush.localfile.threadPool.size=122,
rss.server.flush.thread.alive=112, rss.rpc.executor.size=123}, delete=[]}
[2024-10-11 23:50:53.757] [Jetty-8] [INFO] ThreadPoolManager - Updated
thread pool Grpc MaximumPoolSize from 2000 to 246
[2024-10-11 23:50:53.757] [Jetty-8] [INFO] ThreadPoolManager - Updated
thread pool Grpc CorePoolSize from 1000 to 123
```
---
.../uniffle/common/executor/ThreadPoolManager.java | 252 ++++++++++++++++++---
.../uniffle/common/metrics/CommonMetrics.java | 9 +-
.../org/apache/uniffle/common/rpc/GrpcServer.java | 7 +-
.../common/executor/ThreadPoolManagerTest.java | 44 ++--
.../uniffle/server/DefaultFlushEventHandler.java | 30 ++-
5 files changed, 274 insertions(+), 68 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
index 842545c16..6f5f9ac7e 100644
---
a/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
+++
b/common/src/main/java/org/apache/uniffle/common/executor/ThreadPoolManager.java
@@ -20,17 +20,21 @@ package org.apache.uniffle.common.executor;
import java.io.Closeable;
import java.util.Collections;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.ReconfigurableRegistry;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.CommonMetrics;
/** The threadPool manager which represents a manager to handle all thread
pool executors. */
@@ -44,14 +48,15 @@ public class ThreadPoolManager {
* Add a thread pool.
*
* @param name the name of the thread pool
- * @param corePoolSize the core pool size supplier
- * @param maximumPoolSize the maximum pool size supplier
- * @param keepAliveTime the keep alive time supplier
+ * @param corePoolSize the core pool size
+ * @param maximumPoolSize the maximum pool size
+ * @param keepAliveTime the keep alive time
* @param unit the unit
* @param workQueue the work queue
* @param threadFactory the thread factory
* @return the registered thread pool
*/
+ @VisibleForTesting
public static ThreadPoolExecutor newThreadPool(
String name,
int corePoolSize,
@@ -60,20 +65,23 @@ public class ThreadPoolManager {
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
- ThreadPoolExecutor threadPoolExecutor =
- new ThreadPoolExecutor(
- corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
- registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime,
threadPoolExecutor);
- return threadPoolExecutor;
+ return newThreadPool(
+ name,
+ () -> corePoolSize,
+ () -> maximumPoolSize,
+ () -> keepAliveTime,
+ unit,
+ workQueue,
+ threadFactory);
}
/**
* Add a thread pool.
*
* @param name the name of the thread pool
- * @param corePoolSize the core pool size supplier
- * @param maximumPoolSize the maximum pool size supplier
- * @param keepAliveTime the keep alive time supplier
+ * @param corePoolSize the core pool size
+ * @param maximumPoolSize the maximum pool size
+ * @param keepAliveTime the keep alive time
* @param unit the unit
* @param workQueue the work queue
* @param threadFactory the thread factory
@@ -81,6 +89,7 @@ public class ThreadPoolManager {
* capacities are reached
* @return the registered thread pool
*/
+ @VisibleForTesting
public static ThreadPoolExecutor newThreadPool(
String name,
int corePoolSize,
@@ -90,36 +99,145 @@ public class ThreadPoolManager {
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
+ return newThreadPool(
+ name,
+ () -> corePoolSize,
+ () -> maximumPoolSize,
+ () -> keepAliveTime,
+ unit,
+ workQueue,
+ threadFactory,
+ handler);
+ }
+
+ /**
+ * Add a thread pool.
+ *
+ * @param name the name of the thread pool
+ * @param corePoolSizeSupplier the core pool size supplier
+ * @param maximumPoolSizeSupplier the maximum pool size supplier
+ * @param keepAliveTimeSupplier the keep alive time supplier
+ * @param unit the unit
+ * @param workQueue the work queue
+ * @param threadFactory the thread factory
+ * @return the registered thread pool
+ */
+ public static ThreadPoolExecutor newThreadPool(
+ String name,
+ Supplier<Integer> corePoolSizeSupplier,
+ Supplier<Integer> maximumPoolSizeSupplier,
+ Supplier<Long> keepAliveTimeSupplier,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
- corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
- registerThreadPool(name, corePoolSize, maximumPoolSize, keepAliveTime,
threadPoolExecutor);
+ corePoolSizeSupplier.get(),
+ maximumPoolSizeSupplier.get(),
+ keepAliveTimeSupplier.get(),
+ unit,
+ workQueue,
+ threadFactory);
+ registerThreadPool(
+ name,
+ corePoolSizeSupplier,
+ maximumPoolSizeSupplier,
+ () -> keepAliveTimeSupplier.get() * unit.toMillis(1),
+ threadPoolExecutor);
return threadPoolExecutor;
}
/**
- * Register a thread pool to THREAD_POOL_MAP.
+ * Add a thread pool.
+ *
+ * @param name the name of the thread pool
+ * @param corePoolSizeSupplier the core pool size supplier
+ * @param maximumPoolSizeSupplier the maximum pool size supplier
+ * @param keepAliveTimeSupplier the keep alive time supplier
+ * @param unit the unit
+ * @param workQueue the work queue
+ * @param threadFactory the thread factory
+ * @param handler the handler to use when execution is blocked because the
thread bounds and queue
+ * capacities are reached
+ * @return the registered thread pool
+ */
+ public static ThreadPoolExecutor newThreadPool(
+ String name,
+ Supplier<Integer> corePoolSizeSupplier,
+ Supplier<Integer> maximumPoolSizeSupplier,
+ Supplier<Long> keepAliveTimeSupplier,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue,
+ ThreadFactory threadFactory,
+ RejectedExecutionHandler handler) {
+ ThreadPoolExecutor threadPoolExecutor =
+ new ThreadPoolExecutor(
+ corePoolSizeSupplier.get(),
+ maximumPoolSizeSupplier.get(),
+ keepAliveTimeSupplier.get(),
+ unit,
+ workQueue,
+ threadFactory,
+ handler);
+ registerThreadPool(
+ name,
+ corePoolSizeSupplier,
+ maximumPoolSizeSupplier,
+ keepAliveTimeSupplier,
+ threadPoolExecutor);
+ return threadPoolExecutor;
+ }
+
+ /**
+ * Add a thread pool.
*
* @param name the name of the thread pool
- * @param corePoolSize the core pool size supplier
- * @param maximumPoolSize the maximum pool size supplier
- * @param keepAliveTime the keep alive time supplier
+ * @param corePoolSize the core pool size
+ * @param maximumPoolSize the maximum pool size
+ * @param keepAliveTime the keep alive time
* @param threadPoolExecutor the thread pool which will be registered
+ * @return the registered thread pool
*/
+ @VisibleForTesting
public static void registerThreadPool(
String name,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
ThreadPoolExecutor threadPoolExecutor) {
+ registerThreadPool(
+ name, () -> corePoolSize, () -> maximumPoolSize, () -> keepAliveTime,
threadPoolExecutor);
+ }
+
+ /**
+ * Register a thread pool to THREAD_POOL_MAP.
+ *
+ * @param name the name of the thread pool
+ * @param corePoolSizeSupplier the core pool size supplier
+ * @param maximumPoolSizeSupplier the maximum pool size supplier
+ * @param keepAliveTimeSupplier the keep alive time supplier
+ * @param threadPoolExecutor the thread pool which will be registered
+ */
+ public static void registerThreadPool(
+ String name,
+ Supplier<Integer> corePoolSizeSupplier,
+ Supplier<Integer> maximumPoolSizeSupplier,
+ Supplier<Long> keepAliveTimeSupplier,
+ ThreadPoolExecutor threadPoolExecutor) {
THREAD_POOL_MAP.put(
- threadPoolExecutor, new MeasurableThreadPoolExecutor(name,
threadPoolExecutor));
+ threadPoolExecutor,
+ new MeasurableThreadPoolExecutor(
+ name,
+ threadPoolExecutor,
+ corePoolSizeSupplier,
+ maximumPoolSizeSupplier,
+ keepAliveTimeSupplier));
LOG.info(
"{} thread pool, core size:{}, max size:{}, keep alive time:{}",
name,
- corePoolSize,
- maximumPoolSize,
- keepAliveTime);
+ corePoolSizeSupplier,
+ maximumPoolSizeSupplier,
+ keepAliveTimeSupplier);
}
/**
@@ -152,30 +270,43 @@ public class ThreadPoolManager {
}
@VisibleForTesting
- public static class MeasurableThreadPoolExecutor implements Closeable {
-
+ public static class MeasurableThreadPoolExecutor
+ implements Closeable, ReconfigurableRegistry.ReconfigureListener {
private final String name;
+ private final ThreadPoolExecutor threadPoolExecutor;
+ private final Supplier<Integer> corePoolSizeSupplier;
+ private final Supplier<Integer> maximumPoolSizeSupplier;
+ private final Supplier<Long> keepAliveTimeSupplier;
- MeasurableThreadPoolExecutor(String name, ThreadPoolExecutor
threadPoolExecutor) {
+ MeasurableThreadPoolExecutor(
+ String name,
+ ThreadPoolExecutor threadPoolExecutor,
+ Supplier<Integer> corePoolSizeSupplier,
+ Supplier<Integer> maximumPoolSizeSupplier,
+ Supplier<Long> keepAliveTimeSupplier) {
this.name = name;
+ this.threadPoolExecutor = threadPoolExecutor;
+ this.corePoolSizeSupplier = corePoolSizeSupplier;
+ this.maximumPoolSizeSupplier = maximumPoolSizeSupplier;
+ this.keepAliveTimeSupplier = keepAliveTimeSupplier;
+
MeasurableRejectedExecutionHandler measurableRejectedExecutionHandler =
new
MeasurableRejectedExecutionHandler(threadPoolExecutor.getRejectedExecutionHandler());
threadPoolExecutor.setRejectedExecutionHandler(measurableRejectedExecutionHandler);
CommonMetrics.addLabeledGauge(
- name + "_ThreadActiveCount", () -> (double)
threadPoolExecutor.getActiveCount());
- CommonMetrics.addLabeledGauge(
- name + "_ThreadCurrentCount", () -> (double)
threadPoolExecutor.getPoolSize());
- CommonMetrics.addLabeledGauge(
- name + "_ThreadMaxCount", () -> (double)
threadPoolExecutor.getMaximumPoolSize());
+ name + "_ThreadActiveCount", threadPoolExecutor::getActiveCount);
+ CommonMetrics.addLabeledGauge(name + "_ThreadCurrentCount",
threadPoolExecutor::getPoolSize);
CommonMetrics.addLabeledGauge(
- name + "_ThreadMinCount", () -> (double)
threadPoolExecutor.getCorePoolSize());
+ name + "_ThreadMaxCount", threadPoolExecutor::getMaximumPoolSize);
+ CommonMetrics.addLabeledGauge(name + "_ThreadMinCount",
threadPoolExecutor::getCorePoolSize);
CommonMetrics.addLabeledGauge(
- name + "_CompleteTaskCount", () -> (double)
threadPoolExecutor.getCompletedTaskCount());
+ name + "_CompleteTaskCount",
threadPoolExecutor::getCompletedTaskCount);
CommonMetrics.addLabeledGauge(
- name + "_ThreadQueueWaitingTaskCount",
- () -> (double) threadPoolExecutor.getQueue().size());
+ name + "_ThreadQueueWaitingTaskCount",
threadPoolExecutor.getQueue()::size);
CommonMetrics.addLabeledGauge(
- name + "_RejectCount", () -> (double)
measurableRejectedExecutionHandler.getCount());
+ name + "_RejectCount", measurableRejectedExecutionHandler::getCount);
+
+ ReconfigurableRegistry.register(this);
}
@VisibleForTesting
@@ -185,6 +316,7 @@ public class ThreadPoolManager {
@Override
public void close() {
+ ReconfigurableRegistry.unregister(this);
CommonMetrics.unregisterSupplierGauge(name + "_ThreadActiveCount");
CommonMetrics.unregisterSupplierGauge(name + "_ThreadCurrentCount");
CommonMetrics.unregisterSupplierGauge(name + "_ThreadMaxCount");
@@ -193,5 +325,57 @@ public class ThreadPoolManager {
CommonMetrics.unregisterSupplierGauge(name +
"_ThreadQueueWaitingTaskCount");
CommonMetrics.unregisterSupplierGauge(name + "_RejectCount");
}
+
+ @Override
+ public void update(RssConf conf, Set<String> changedProperties) {
+ int newCorePoolSize = this.corePoolSizeSupplier.get();
+ int newMaximumPoolSize = this.maximumPoolSizeSupplier.get();
+ if (this.keepAliveTimeSupplier != null) {
+ long keepAliveTime = keepAliveTimeSupplier.get();
+ if (keepAliveTime > 0
+ && keepAliveTime !=
threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS)) {
+ LOG.info(
+ "Updated thread pool {} keep alive time from {} to {}",
+ name,
+ threadPoolExecutor.getKeepAliveTime(TimeUnit.MILLISECONDS),
+ keepAliveTime);
+ threadPoolExecutor.setKeepAliveTime(keepAliveTime,
TimeUnit.MILLISECONDS);
+ }
+ }
+ if (newCorePoolSize != threadPoolExecutor.getPoolSize()
+ && newMaximumPoolSize != threadPoolExecutor.getMaximumPoolSize()) {
+ LOG.info(
+ "Updated thread pool {} MaximumPoolSize from {} to {}",
+ name,
+ threadPoolExecutor.getMaximumPoolSize(),
+ newMaximumPoolSize);
+ LOG.info(
+ "Updated thread pool {} CorePoolSize from {} to {}",
+ name,
+ threadPoolExecutor.getCorePoolSize(),
+ newCorePoolSize);
+ if (newCorePoolSize > threadPoolExecutor.getMaximumPoolSize()) {
+ threadPoolExecutor.setMaximumPoolSize(newMaximumPoolSize);
+ threadPoolExecutor.setCorePoolSize(newCorePoolSize);
+ } else {
+ threadPoolExecutor.setCorePoolSize(newCorePoolSize);
+ threadPoolExecutor.setMaximumPoolSize(newMaximumPoolSize);
+ }
+ } else if (newMaximumPoolSize !=
threadPoolExecutor.getMaximumPoolSize()) {
+ LOG.info(
+ "Updated thread pool {} MaximumPoolSize from {} to {}",
+ name,
+ threadPoolExecutor.getMaximumPoolSize(),
+ newMaximumPoolSize);
+ threadPoolExecutor.setMaximumPoolSize(newMaximumPoolSize);
+ } else if (newCorePoolSize != threadPoolExecutor.getCorePoolSize()) {
+ LOG.info(
+ "Updated thread pool {} CorePoolSize from {} to {}",
+ name,
+ threadPoolExecutor.getCorePoolSize(),
+ newCorePoolSize);
+ threadPoolExecutor.setCorePoolSize(newCorePoolSize);
+ }
+ }
}
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
index 6f9283224..ef8a98d20 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/CommonMetrics.java
@@ -47,11 +47,16 @@ public class CommonMetrics {
return metricsManager.getCollectorRegistry();
}
- public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+ public static <T extends Number> void addLabeledGauge(String name,
Supplier<T> supplier) {
+ addLabeledCacheGauge(name, supplier, 0);
+ }
+
+ public static <T extends Number> void addLabeledCacheGauge(
+ String name, Supplier<T> supplier, long updateInterval) {
if (!isRegister) {
return;
}
- metricsManager.addLabeledGauge(name, supplier);
+ metricsManager.addLabeledCacheGauge(name, supplier, updateInterval);
}
public static void unregisterSupplierGauge(String name) {
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index bfbdd6483..2e51cce55 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -82,7 +82,12 @@ public class GrpcServer implements ServerInterface {
Queues.newLinkedBlockingQueue(Integer.MAX_VALUE),
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics);
- ThreadPoolManager.registerThreadPool("Grpc", rpcExecutorSize,
rpcExecutorSize * 2, 10, pool);
+ ThreadPoolManager.registerThreadPool(
+ "Grpc",
+ () -> conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE),
+ () -> conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE) * 2,
+ () -> TimeUnit.MINUTES.toMillis(10),
+ pool);
}
// This method is only used for the sake of synchronizing one test
diff --git
a/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
index 24714e7e7..f1a60410e 100644
---
a/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
+++
b/common/src/test/java/org/apache/uniffle/common/executor/ThreadPoolManagerTest.java
@@ -107,27 +107,32 @@ public class ThreadPoolManagerTest {
new LinkedBlockingQueue<>(1),
new
ThreadFactoryBuilder().setDaemon(false).setNameFormat("test-thread-pool").build(),
new ThreadPoolExecutor.AbortPolicy());
- int rejectedCount = 0;
- for (int i = 0; i < 10; i++) {
- try {
- threadPool.submit(
- () -> {
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
- } catch (RejectedExecutionException e) {
- rejectedCount++;
+ try {
+ int rejectedCount = 0;
+ for (int i = 0; i < 10; i++) {
+ try {
+ threadPool.submit(
+ () -> {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ rejectedCount++;
+ }
}
+ assertEquals(8, rejectedCount);
+ assertEquals(
+ rejectedCount,
+ CommonMetrics.getCollectorRegistry()
+ .getSampleValue("test_RejectCount", new String[] {"tags"}, new
String[] {"test"})
+ .intValue());
+ } finally {
+ ThreadPoolManager.unregister(threadPool);
+ threadPool.shutdown();
}
- assertEquals(8, rejectedCount);
- assertEquals(
- rejectedCount,
- CommonMetrics.getCollectorRegistry()
- .getSampleValue("test_RejectCount", new String[] {"tags"}, new
String[] {"test"})
- .intValue());
}
private void testInternal(ThreadPoolExecutor threadPool) {
@@ -146,6 +151,7 @@ public class ThreadPoolManagerTest {
} finally {
ThreadPoolManager.unregister(threadPool);
assertTrue(!ThreadPoolManager.exists(threadPool));
+ threadPool.shutdown();
}
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index f8b5f2dab..5b06bce76 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
@@ -165,17 +166,22 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
protected void initFlushEventExecutor() {
if (StorageType.withLocalfile(storageType)) {
- int poolSize =
-
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE);
localFileThreadPoolExecutor =
- createFlushEventExecutor(poolSize, "LocalFileFlushEventThreadPool");
+ createFlushEventExecutor(
+ () ->
+ shuffleServerConf.getInteger(
+
ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE),
+ "LocalFileFlushEventThreadPool");
}
if (StorageType.withHadoop(storageType)) {
- int poolSize =
-
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE);
- hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize,
"HadoopFlushEventThreadPool");
+ hadoopThreadPoolExecutor =
+ createFlushEventExecutor(
+ () ->
+ shuffleServerConf.getInteger(
+ ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE),
+ "HadoopFlushEventThreadPool");
}
- fallbackThreadPoolExecutor = createFlushEventExecutor(5,
"FallBackFlushEventThreadPool");
+ fallbackThreadPoolExecutor = createFlushEventExecutor(() -> 5,
"FallBackFlushEventThreadPool");
ShuffleServerMetrics.addLabeledGauge(EVENT_QUEUE_SIZE, () -> (double)
flushQueue.size());
startEventProcessor();
}
@@ -226,16 +232,16 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
}
}
- protected Executor createFlushEventExecutor(int poolSize, String
threadFactoryName) {
+ protected Executor createFlushEventExecutor(
+ Supplier<Integer> poolSizeSupplier, String threadFactoryName) {
int waitQueueSize =
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE);
BlockingQueue<Runnable> waitQueue =
Queues.newLinkedBlockingQueue(waitQueueSize);
- long keepAliveTime =
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
return ThreadPoolManager.newThreadPool(
threadFactoryName,
- poolSize,
- poolSize,
- keepAliveTime,
+ poolSizeSupplier,
+ poolSizeSupplier,
+ () ->
shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE),
TimeUnit.SECONDS,
waitQueue,
ThreadUtils.getThreadFactory(threadFactoryName));