This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 45450e793 [CELEBORN-1832] MapPartitionData should create fixed thread
pool with registration of ThreadPoolSource
45450e793 is described below
commit 45450e793cbf28706f03ea2d385930e12cc2e776
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jan 15 15:15:34 2025 +0800
[CELEBORN-1832] MapPartitionData should create fixed thread pool with
registration of ThreadPoolSource
### What changes were proposed in this pull request?
`MapPartitionData` creates fixed thread pool with registration of
`ThreadPoolSource`.
### Why are the changes needed?
`MapPartitionData` creates fixed thread pool without registering
`ThreadPoolSource` at present, which causes that map partition reader thread of
worker is lack of thread pool metrics. Therefore, `MapPartitionData` should
create fixed thread pool with registration of `ThreadPoolSource`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3064 from SteNicholas/CELEBORN-1832.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/util/ThreadUtils.scala | 19 +++++++++++++++++--
.../deploy/worker/storage/MapPartitionData.java | 14 ++++----------
2 files changed, 21 insertions(+), 12 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index c8b6491b2..33c6d7c82 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -122,8 +122,15 @@ object ThreadUtils {
* Create a thread factory that names threads with a prefix and also sets
the threads to daemon.
*/
def namedThreadFactory(threadNamePrefix: String): ThreadFactory = {
+ namedThreadFactory(threadNamePrefix, daemon = true)
+ }
+
+ /**
+ * Create a thread factory that generates threads with a specified name
prefix and daemon setting.
+ */
+ def namedThreadFactory(threadNamePrefix: String, daemon: Boolean):
ThreadFactory = {
new ThreadFactoryBuilder()
- .setDaemon(true)
+ .setDaemon(daemon)
.setNameFormat(s"$threadNamePrefix-%d")
.setUncaughtExceptionHandler(new
ThreadExceptionHandler(threadNamePrefix))
.build()
@@ -176,7 +183,15 @@ object ThreadUtils {
* unique, sequentially assigned integer.
*/
def newDaemonFixedThreadPool(nThreads: Int, prefix: String):
ThreadPoolExecutor = {
- val threadPool = Executors.newFixedThreadPool(nThreads,
namedThreadFactory(prefix))
+ newFixedThreadPool(nThreads, prefix, daemon = true)
+ }
+
+ /**
+ * Wrapper over newFixedThreadPool with daemon setting. Thread names are
formatted as prefix-ID, where ID is a
+ * unique, sequentially assigned integer.
+ */
+ def newFixedThreadPool(nThreads: Int, prefix: String, daemon: Boolean):
ThreadPoolExecutor = {
+ val threadPool = Executors.newFixedThreadPool(nThreads,
namedThreadFactory(prefix, daemon))
.asInstanceOf[ThreadPoolExecutor]
ThreadPoolSource.registerSource(prefix, threadPool)
threadPool
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index f94c17c95..b1372037d 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -23,12 +23,10 @@ import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.apache.commons.io.IOUtils;
@@ -39,6 +37,7 @@ import org.apache.celeborn.common.meta.DiskFileInfo;
import org.apache.celeborn.common.meta.MapFileMeta;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
import org.apache.celeborn.service.deploy.worker.memory.BufferRecycler;
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
@@ -93,15 +92,10 @@ public class MapPartitionData implements
MemoryManager.ReadBufferTargetChangeLis
storageFetcherPool.computeIfAbsent(
mapFileMeta.getMountPoint(),
k ->
- Executors.newFixedThreadPool(
+ ThreadUtils.newFixedThreadPool(
threadsPerMountPoint,
- new ThreadFactoryBuilder()
- .setNameFormat(mapFileMeta.getMountPoint() +
"-reader-thread-%d")
- .setUncaughtExceptionHandler(
- (t1, t2) -> {
- logger.warn("StorageFetcherPool thread:{}:{}",
t1, t2);
- })
- .build()));
+ String.format("worker-map-partition-%s-reader",
mapFileMeta.getMountPoint()),
+ false));
this.dataFileChanel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
this.indexChannel =
FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
this.indexSize = indexChannel.size();