This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 45450e793 [CELEBORN-1832] MapPartitionData should create fixed thread 
pool with registration of ThreadPoolSource
45450e793 is described below

commit 45450e793cbf28706f03ea2d385930e12cc2e776
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jan 15 15:15:34 2025 +0800

    [CELEBORN-1832] MapPartitionData should create fixed thread pool with 
registration of ThreadPoolSource
    
    ### What changes were proposed in this pull request?
    
    `MapPartitionData` creates fixed thread pool with registration of 
`ThreadPoolSource`.
    
    ### Why are the changes needed?
    
    `MapPartitionData` creates fixed thread pool without registering 
`ThreadPoolSource` at present, which causes that map partition reader thread of 
worker is lack of thread pool metrics. Therefore, `MapPartitionData` should 
create fixed thread pool with registration of `ThreadPoolSource`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3064 from SteNicholas/CELEBORN-1832.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/util/ThreadUtils.scala | 19 +++++++++++++++++--
 .../deploy/worker/storage/MapPartitionData.java       | 14 ++++----------
 2 files changed, 21 insertions(+), 12 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index c8b6491b2..33c6d7c82 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -122,8 +122,15 @@ object ThreadUtils {
    * Create a thread factory that names threads with a prefix and also sets 
the threads to daemon.
    */
   def namedThreadFactory(threadNamePrefix: String): ThreadFactory = {
+    namedThreadFactory(threadNamePrefix, daemon = true)
+  }
+
+  /**
+   * Create a thread factory that generates threads with a specified name 
prefix and daemon setting.
+   */
+  def namedThreadFactory(threadNamePrefix: String, daemon: Boolean): 
ThreadFactory = {
     new ThreadFactoryBuilder()
-      .setDaemon(true)
+      .setDaemon(daemon)
       .setNameFormat(s"$threadNamePrefix-%d")
       .setUncaughtExceptionHandler(new 
ThreadExceptionHandler(threadNamePrefix))
       .build()
@@ -176,7 +183,15 @@ object ThreadUtils {
    * unique, sequentially assigned integer.
    */
   def newDaemonFixedThreadPool(nThreads: Int, prefix: String): 
ThreadPoolExecutor = {
-    val threadPool = Executors.newFixedThreadPool(nThreads, 
namedThreadFactory(prefix))
+    newFixedThreadPool(nThreads, prefix, daemon = true)
+  }
+
+  /**
+   * Wrapper over newFixedThreadPool with daemon setting. Thread names are 
formatted as prefix-ID, where ID is a
+   * unique, sequentially assigned integer.
+   */
+  def newFixedThreadPool(nThreads: Int, prefix: String, daemon: Boolean): 
ThreadPoolExecutor = {
+    val threadPool = Executors.newFixedThreadPool(nThreads, 
namedThreadFactory(prefix, daemon))
       .asInstanceOf[ThreadPoolExecutor]
     ThreadPoolSource.registerSource(prefix, threadPool)
     threadPool
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index f94c17c95..b1372037d 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -23,12 +23,10 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import org.apache.commons.io.IOUtils;
@@ -39,6 +37,7 @@ import org.apache.celeborn.common.meta.DiskFileInfo;
 import org.apache.celeborn.common.meta.MapFileMeta;
 import org.apache.celeborn.common.util.FileChannelUtils;
 import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
 import org.apache.celeborn.service.deploy.worker.memory.BufferRecycler;
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager;
@@ -93,15 +92,10 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
         storageFetcherPool.computeIfAbsent(
             mapFileMeta.getMountPoint(),
             k ->
-                Executors.newFixedThreadPool(
+                ThreadUtils.newFixedThreadPool(
                     threadsPerMountPoint,
-                    new ThreadFactoryBuilder()
-                        .setNameFormat(mapFileMeta.getMountPoint() + 
"-reader-thread-%d")
-                        .setUncaughtExceptionHandler(
-                            (t1, t2) -> {
-                              logger.warn("StorageFetcherPool thread:{}:{}", 
t1, t2);
-                            })
-                        .build()));
+                    String.format("worker-map-partition-%s-reader", 
mapFileMeta.getMountPoint()),
+                    false));
     this.dataFileChanel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
     this.indexChannel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
     this.indexSize = indexChannel.size();

Reply via email to