This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 9e6585ef0 [CELEBORN-1655] Fix read buffer dispatcher thread terminate 
unexpectedly
9e6585ef0 is described below

commit 9e6585ef048000ba9d0c7fd882b81ff5d4e3e4b2
Author: mingji <[email protected]>
AuthorDate: Fri Oct 18 15:53:23 2024 +0800

    [CELEBORN-1655] Fix read buffer dispatcher thread terminate unexpectedly
    
    The read buffer dispatcher may lose its dispatcher thread which is not 
acceptable.
    
    1. Add a scheduler pool to ensure the dispatcher thread is alive.
    2. Add an unhandled exception handler to record possible exceptions that 
cause the thread to be lost.
    
    NO.
    
    Cluster test.
    
    Closes #2815 from FMX/b1655.
    
    Lead-authored-by: mingji <[email protected]>
    Co-authored-by: Ethan Feng <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit a94147cd9de3469f84bda9f738dc85e3e558117e)
    Signed-off-by: Shuang <[email protected]>
---
 .../apache/celeborn/client/write/DataPusher.java   |  75 ++++++-----
 .../network/ssl/ReloadingX509TrustManager.java     |   5 +-
 .../celeborn/common/util/ShutdownHookManager.java  |  40 +++---
 .../org/apache/celeborn/common/CelebornConf.scala  |  10 ++
 .../apache/celeborn/common/util/ThreadUtils.scala  |  12 ++
 .../org/apache/celeborn/common/util/Utils.scala    |  15 ++-
 .../network/TransportClientFactorySuiteJ.java      |   6 +-
 .../apache/celeborn/common/rpc/RpcEnvSuite.scala   |  14 +-
 .../celeborn/common/rpc/netty/InboxSuite.scala     |  19 +--
 .../service/deploy/master/MasterSuite.scala        |  29 ++--
 .../master/http/api/ApiMasterResourceSuite.scala   |  14 +-
 .../deploy/worker/memory/ReadBufferDispatcher.java | 150 +++++++++++++--------
 .../celeborn/service/deploy/worker/Worker.scala    |  36 ++---
 .../deploy/worker/WorkerStatusManager.scala        |  17 ++-
 .../deploy/memory/ReadBufferDispactherSuite.scala  |  15 ++-
 15 files changed, 272 insertions(+), 185 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java 
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index 807952d15..4c8b83abe 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
 
 public class DataPusher {
   private static final Logger logger = 
LoggerFactory.getLogger(DataPusher.class);
@@ -100,45 +100,46 @@ public class DataPusher {
     this.mapStatusLengths = mapStatusLengths;
 
     pushThread =
-        new Thread("celeborn-client-data-pusher-" + taskId) {
-          private void reclaimTask(PushTask task) throws InterruptedException {
-            idleLock.lockInterruptibly();
-            try {
-              idleQueue.put(task);
-              if (idleQueue.remainingCapacity() == 0) {
-                idleFull.signal();
+        ThreadUtils.newDaemonThread(
+            new Runnable() {
+              private void reclaimTask(PushTask task) throws 
InterruptedException {
+                idleLock.lockInterruptibly();
+                try {
+                  idleQueue.put(task);
+                  if (idleQueue.remainingCapacity() == 0) {
+                    idleFull.signal();
+                  }
+                } catch (InterruptedException e) {
+                  logger.error("DataPusher thread interrupted while reclaiming 
data.");
+                  throw e;
+                } finally {
+                  idleLock.unlock();
+                }
               }
-            } catch (InterruptedException e) {
-              logger.error("DataPusher thread interrupted while reclaiming 
data.");
-              throw e;
-            } finally {
-              idleLock.unlock();
-            }
-          }
-
-          @Override
-          public void run() {
-            while (stillRunning()) {
-              try {
-                ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
-                for (int i = 0; i < tasks.size(); i++) {
-                  PushTask task = tasks.get(i);
-                  pushData(task);
-                  reclaimTask(task);
+
+              @Override
+              public void run() {
+                while (stillRunning()) {
+                  try {
+                    ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
+                    for (int i = 0; i < tasks.size(); i++) {
+                      PushTask task = tasks.get(i);
+                      pushData(task);
+                      reclaimTask(task);
+                    }
+                  } catch (CelebornIOException e) {
+                    exceptionRef.set(e);
+                  } catch (IOException e) {
+                    exceptionRef.set(new CelebornIOException(e));
+                  } catch (InterruptedException e) {
+                    logger.error("DataPusher push thread interrupted while 
pushing data.");
+                    break;
+                  }
                 }
-              } catch (CelebornIOException e) {
-                exceptionRef.set(e);
-              } catch (IOException e) {
-                exceptionRef.set(new CelebornIOException(e));
-              } catch (InterruptedException e) {
-                logger.error("DataPusher push thread interrupted while pushing 
data.");
-                break;
               }
-            }
-          }
-        };
-    pushThread.setDaemon(true);
-    pushThread.setUncaughtExceptionHandler(new 
ThreadExceptionHandler("DataPusher-" + taskId));
+            },
+            "celeborn-client-data-pusher-" + taskId);
+
     pushThread.start();
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
 
b/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
index 950b5408d..c6d9f61da 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
@@ -35,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.celeborn.common.util.ThreadUtils;
+
 /**
  * A {@link TrustManager} implementation that reloads its configuration when 
the truststore file on
  * disk changes. This implementation is based off of the
@@ -90,8 +92,7 @@ public class ReloadingX509TrustManager implements 
X509TrustManager, Runnable {
 
   /** Starts the reloader thread. */
   public void init() {
-    reloader = new Thread(this, "Truststore reloader thread");
-    reloader.setDaemon(true);
+    reloader = ThreadUtils.newDaemonThread(this, "Truststore reloader thread");
     reloader.start();
   }
 
diff --git 
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java 
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
index 47659fda0..ef4f60605 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
@@ -67,25 +67,27 @@ public final class ShutdownHookManager {
     try {
       Runtime.getRuntime()
           .addShutdownHook(
-              new Thread() {
-                @Override
-                public void run() {
-                  if (MGR.shutdownInProgress.getAndSet(true)) {
-                    LOG.info("Shutdown process invoked a second time: 
ignoring");
-                    return;
-                  }
-                  long started = System.currentTimeMillis();
-                  int timeoutCount = executeShutdown();
-                  long ended = System.currentTimeMillis();
-                  LOG.debug(
-                      String.format(
-                          "Completed shutdown in %.3f seconds; Timeouts: %d",
-                          (ended - started) / 1000.0, timeoutCount));
-                  // each of the hooks have executed; now shut down the
-                  // executor itself.
-                  shutdownExecutor(new CelebornConf());
-                }
-              });
+              ThreadUtils.newThread(
+                  new Runnable() {
+                    @Override
+                    public void run() {
+                      if (MGR.shutdownInProgress.getAndSet(true)) {
+                        LOG.info("Shutdown process invoked a second time: 
ignoring");
+                        return;
+                      }
+                      long started = System.currentTimeMillis();
+                      int timeoutCount = executeShutdown();
+                      long ended = System.currentTimeMillis();
+                      LOG.debug(
+                          String.format(
+                              "Completed shutdown in %.3f seconds; Timeouts: 
%d",
+                              (ended - started) / 1000.0, timeoutCount));
+                      // each of the hooks have executed; now shut down the
+                      // executor itself.
+                      shutdownExecutor(new CelebornConf());
+                    }
+                  },
+                  "shutdown-hook-thread"));
     } catch (IllegalStateException ex) {
       // JVM is being shut down. Ignore
       LOG.warn("Failed to add the ShutdownHook", ex);
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e006e765f..4fd5965a5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1127,6 +1127,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def readBufferTargetUpdateInterval: Long = 
get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
   def readBufferTargetNotifyThreshold: Long = 
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
   def readBuffersToTriggerReadMin: Int = 
get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
+  def readBufferDispatcherCheckThreadInterval: Long = 
get(WORKER_READBUFFER_CHECK_THREAD_INTERVAL)
 
   // //////////////////////////////////////////////////////
   //                   Decommission                      //
@@ -3499,6 +3500,15 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(32)
 
+  val WORKER_READBUFFER_CHECK_THREAD_INTERVAL: ConfigEntry[Long] =
+    
buildConf("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval")
+      .categories("worker")
+      .version("0.5.2")
+      .internal
+      .doc("The interval for worker to check read buffer dispatcher thread. 0 
means disable.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(0)
+
   val WORKER_PUSH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
     buildConf("celeborn.worker.push.heartbeat.enabled")
       .categories("worker")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index caa3719a4..6ddbbb06d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -393,6 +393,18 @@ object ThreadUtils {
       pool.shutdownNow()
     }
   }
+
+  def newThread(runnable: Runnable, name: String): Thread = {
+    val thread = new Thread(runnable, name)
+    thread.setUncaughtExceptionHandler(new ThreadExceptionHandler(name))
+    thread
+  }
+
+  def newDaemonThread(runnable: Runnable, name: String): Thread = {
+    val thread = newThread(runnable, name)
+    thread.setDaemon(true)
+    thread
+  }
 }
 
 class ThreadExceptionHandler(executorService: String)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 959054554..0b0fa4ed7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -845,14 +845,15 @@ object Utils extends Logging {
       threadName: String,
       inputStream: InputStream,
       processLine: String => Unit): Thread = {
-    val t = new Thread(threadName) {
-      override def run(): Unit = {
-        for (line <- Source.fromInputStream(inputStream).getLines()) {
-          processLine(line)
+    val t = ThreadUtils.newDaemonThread(
+      new Runnable {
+        override def run(): Unit = {
+          for (line <- Source.fromInputStream(inputStream).getLines()) {
+            processLine(line)
+          }
         }
-      }
-    }
-    t.setDaemon(true)
+      },
+      threadName)
     t.start()
     t
   }
diff --git 
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
 
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
index b77a9c7d0..fb87d1af3 100644
--- 
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
+++ 
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
@@ -35,6 +35,7 @@ import 
org.apache.celeborn.common.network.server.BaseMessageHandler;
 import org.apache.celeborn.common.network.server.TransportServer;
 import org.apache.celeborn.common.network.util.TransportConf;
 import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
 
 public class TransportClientFactorySuiteJ {
 
@@ -94,7 +95,7 @@ public class TransportClientFactorySuiteJ {
     // Launch a bunch of threads to create new clients.
     for (int i = 0; i < attempts.length; i++) {
       attempts[i] =
-          new Thread(
+          ThreadUtils.newThread(
               () -> {
                 try {
                   TransportClient client = 
factory.createClient(getLocalHost(), server1.getPort());
@@ -105,7 +106,8 @@ public class TransportClientFactorySuiteJ {
                 } catch (InterruptedException e) {
                   throw new RuntimeException(e);
                 }
-              });
+              },
+              "test-thread");
 
       if (concurrent) {
         attempts[i].start();
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
index d1ab75f37..6011960f1 100644
--- a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
@@ -383,13 +383,15 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
         })
 
       (0 until 10) foreach { _ =>
-        new Thread {
-          override def run(): Unit = {
-            (0 until 100) foreach { _ =>
-              endpointRef.send("Hello")
+        ThreadUtils.newThread(
+          new Runnable {
+            override def run(): Unit = {
+              (0 until 100) foreach { _ =>
+                endpointRef.send("Hello")
+              }
             }
-          }
-        }.start()
+          },
+          "rpc-env-test-thread").start()
       }
 
       eventually(timeout(5.seconds), interval(5.milliseconds)) {
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
index ab86a57e8..553b791d8 100644
--- 
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
+++ 
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.rpc.{RpcAddress, TestRpcEndpoint}
+import org.apache.celeborn.common.util.ThreadUtils
 
 class InboxSuite extends CelebornFunSuite with BeforeAndAfter {
 
@@ -94,15 +95,17 @@ class InboxSuite extends CelebornFunSuite with 
BeforeAndAfter {
     val exitLatch = new CountDownLatch(10)
 
     for (_ <- 0 until 10) {
-      new Thread {
-        override def run(): Unit = {
-          for (_ <- 0 until 100) {
-            val message = OneWayMessage(null, "hi")
-            inbox.post(message)
+      ThreadUtils.newThread(
+        new Runnable {
+          override def run(): Unit = {
+            for (_ <- 0 until 100) {
+              val message = OneWayMessage(null, "hi")
+              inbox.post(message)
+            }
+            exitLatch.countDown()
           }
-          exitLatch.countDown()
-        }
-      }.start()
+        },
+        "inbox-test-thread").start()
     }
     // Try to process some messages
     inbox.process(dispatcher)
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index 26ea0b040..564d818a3 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -18,15 +18,14 @@
 package org.apache.celeborn.service.deploy.master
 
 import com.google.common.io.Files
-import org.mockito.Mockito.{mock, times, verify}
+import org.mockito.Mockito.mock
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout, 
PbRegisterWorker}
-import 
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, 
HeartbeatFromApplication}
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
 
 class MasterSuite extends AnyFunSuite
   with BeforeAndAfterAll
@@ -54,11 +53,13 @@ class MasterSuite extends AnyFunSuite
 
     val masterArgs = new MasterArguments(args, conf)
     val master = new Master(conf, masterArgs)
-    new Thread() {
-      override def run(): Unit = {
-        master.initialize()
-      }
-    }.start()
+    ThreadUtils.newThread(
+      new Runnable {
+        override def run(): Unit = {
+          master.initialize()
+        }
+      },
+      "master-init-thread").start()
     Thread.sleep(5000L)
     master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
     master.rpcEnv.shutdown()
@@ -76,11 +77,13 @@ class MasterSuite extends AnyFunSuite
 
     val masterArgs = new MasterArguments(args, conf)
     val master = new Master(conf, masterArgs)
-    new Thread() {
-      override def run(): Unit = {
-        master.initialize()
-      }
-    }.start()
+    ThreadUtils.newThread(
+      new Runnable {
+        override def run(): Unit = {
+          master.initialize()
+        }
+      },
+      "master-init-thread").start()
     Thread.sleep(5000L)
     master.receive.applyOrElse(
       PbCheckForWorkerTimeout.newBuilder().build(),
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
index 1fc92793c..1c1daaec9 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
@@ -23,7 +23,7 @@ import javax.ws.rs.core.{Form, MediaType}
 import com.google.common.io.Files
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
 import org.apache.celeborn.server.common.HttpService
 import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
 import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
@@ -52,11 +52,13 @@ class ApiMasterResourceSuite extends ApiBaseResourceSuite {
 
     val masterArgs = new MasterArguments(args, celebornConf)
     master = new Master(celebornConf, masterArgs)
-    new Thread() {
-      override def run(): Unit = {
-        master.initialize()
-      }
-    }.start()
+    ThreadUtils.newThread(
+      new Runnable {
+        override def run(): Unit = {
+          master.initialize()
+        }
+      },
+      "master-init-thread").start()
     super.beforeAll()
   }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index b433d3c2e..1646095ec 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -20,9 +20,12 @@ package org.apache.celeborn.service.deploy.worker.memory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.slf4j.Logger;
@@ -31,24 +34,45 @@ import org.slf4j.LoggerFactory;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.network.util.NettyUtils;
 import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadUtils;
 
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
   private final Logger logger = 
LoggerFactory.getLogger(ReadBufferDispatcher.class);
   private final LinkedBlockingQueue<ReadBufferRequest> requests = new 
LinkedBlockingQueue<>();
   private final MemoryManager memoryManager;
   private final PooledByteBufAllocator readBufferAllocator;
   private final LongAdder allocatedReadBuffers = new LongAdder();
   private final long readBufferAllocationWait;
-  private volatile boolean stopFlag = false;
+  @VisibleForTesting public volatile boolean stopFlag = false;
+  @VisibleForTesting public final AtomicReference<Thread> dispatcherThread;
 
   public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
-    this.readBufferAllocationWait = conf.readBufferAllocationWait();
+    readBufferAllocationWait = conf.readBufferAllocationWait();
+    long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
     // readBuffer is not a module name, it's a placeholder.
     readBufferAllocator =
         NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer", 
conf), null, true);
     this.memoryManager = memoryManager;
-    this.setName("Read-Buffer-Dispatcher");
-    this.start();
+    dispatcherThread =
+        new AtomicReference<>(
+            ThreadUtils.newThread(new DispatcherRunnable(), 
"read-buffer-dispatcher"));
+    dispatcherThread.get().start();
+
+    if (checkThreadInterval > 0) {
+      ScheduledExecutorService checkAliveThread =
+          
ThreadUtils.newDaemonSingleThreadScheduledExecutor("read-buffer-dispatcher-checker");
+      checkAliveThread.scheduleWithFixedDelay(
+          () -> {
+            if (!dispatcherThread.get().isAlive()) {
+              dispatcherThread.set(
+                  ThreadUtils.newThread(new DispatcherRunnable(), 
"read-buffer-dispatcher"));
+              dispatcherThread.get().start();
+            }
+          },
+          checkThreadInterval,
+          checkThreadInterval,
+          TimeUnit.MILLISECONDS);
+    }
   }
 
   public void addBufferRequest(ReadBufferRequest request) {
@@ -66,60 +90,6 @@ public class ReadBufferDispatcher extends Thread {
     memoryManager.changeReadBufferCounter(-1 * bufferSize);
   }
 
-  @Override
-  public void run() {
-    while (!stopFlag) {
-      ReadBufferRequest request = null;
-      try {
-        request = requests.poll(1000, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        logger.info("Buffer dispatcher is closing");
-      }
-
-      List<ByteBuf> buffers = null;
-      try {
-        if (request != null) {
-          long start = System.nanoTime();
-          int bufferSize = request.getBufferSize();
-          buffers = new ArrayList<>();
-          while (buffers.size() < request.getNumber()) {
-            if (memoryManager.readBufferAvailable(bufferSize)) {
-              ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
-              buffers.add(buf);
-              memoryManager.changeReadBufferCounter(bufferSize);
-              allocatedReadBuffers.increment();
-            } else {
-              try {
-                // If dispatcher can not allocate requested buffers, it will 
wait here until
-                // necessary buffers are get.
-                Thread.sleep(this.readBufferAllocationWait);
-              } catch (InterruptedException e) {
-                logger.info("Buffer dispatcher is closing");
-              }
-            }
-          }
-          long end = System.nanoTime();
-          logger.debug(
-              "process read buffer request using {} ms",
-              TimeUnit.NANOSECONDS.toMillis(end - start));
-          request.getBufferListener().notifyBuffers(buffers, null);
-        } else {
-          // Free buffer pool memory to main direct memory when dispatcher is 
idle.
-          readBufferAllocator.trimCurrentThreadCache();
-        }
-      } catch (Throwable e) {
-        logger.error(e.getMessage(), e);
-        // recycle all allocated buffers
-        if (buffers != null) {
-          buffers.forEach(this::recycle);
-        }
-
-        // notify listener has exception
-        request.getBufferListener().notifyBuffers(null, e);
-      }
-    }
-  }
-
   public int requestsLength() {
     return requests.size();
   }
@@ -132,4 +102,66 @@ public class ReadBufferDispatcher extends Thread {
     stopFlag = true;
     requests.clear();
   }
+
+  private class DispatcherRunnable implements Runnable {
+
+    public DispatcherRunnable() {}
+
+    @Override
+    public void run() {
+      while (!stopFlag) {
+        try {
+          ReadBufferRequest request;
+          request = requests.poll(1000, TimeUnit.MILLISECONDS);
+          List<ByteBuf> buffers = new ArrayList<>();
+          try {
+            if (request != null) {
+              processBufferRequest(request, buffers);
+            } else {
+              // Free buffer pool memory to main direct memory when dispatcher 
is idle.
+              readBufferAllocator.trimCurrentThreadCache();
+            }
+          } catch (Throwable e) {
+            logger.error(e.getMessage(), e);
+            try {
+              // recycle all allocated buffers
+              for (ByteBuf buffer : buffers) {
+                recycle(buffer);
+              }
+            } catch (Throwable e1) {
+              logger.error("Recycle read buffer failed.", e1);
+            }
+            request.getBufferListener().notifyBuffers(null, e);
+          }
+        } catch (Throwable e) {
+          logger.error("Read buffer dispatcher encountered error: {}", 
e.getMessage(), e);
+        }
+      }
+    }
+
+    void processBufferRequest(ReadBufferRequest request, List<ByteBuf> 
buffers) {
+      long start = System.nanoTime();
+      int bufferSize = request.getBufferSize();
+      while (buffers.size() < request.getNumber()) {
+        if (memoryManager.readBufferAvailable(bufferSize)) {
+          ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
+          buffers.add(buf);
+          memoryManager.changeReadBufferCounter(bufferSize);
+          allocatedReadBuffers.increment();
+        } else {
+          try {
+            // If dispatcher can not allocate requested buffers, it will wait 
here until
+            // necessary buffers are get.
+            Thread.sleep(readBufferAllocationWait);
+          } catch (InterruptedException e) {
+            logger.warn("Buffer dispatcher is closing");
+          }
+        }
+      }
+      long end = System.nanoTime();
+      logger.debug(
+          "process read buffer request using {} ms", 
TimeUnit.NANOSECONDS.toMillis(end - start));
+      request.getBufferListener().notifyBuffers(buffers, null);
+    }
+  }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index a64031f06..552f46ff6 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -933,25 +933,27 @@ private[celeborn] class Worker(
   }
 
   ShutdownHookManager.get().addShutdownHook(
-    new Thread(new Runnable {
-      override def run(): Unit = {
-        logInfo("Shutdown hook called.")
-        workerStatusManager.exitEventType match {
-          case WorkerEventType.Graceful =>
-            shutdownGracefully()
-          case WorkerEventType.Decommission =>
-            decommissionWorker()
-          case _ =>
-            exitImmediately()
-        }
+    ThreadUtils.newThread(
+      new Runnable {
+        override def run(): Unit = {
+          logInfo("Shutdown hook called.")
+          workerStatusManager.exitEventType match {
+            case WorkerEventType.Graceful =>
+              shutdownGracefully()
+            case WorkerEventType.Decommission =>
+              decommissionWorker()
+            case _ =>
+              exitImmediately()
+          }
 
-        if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
-          stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
-        } else {
-          stop(CelebornExitKind.EXIT_IMMEDIATELY)
+          if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
+            stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
+          } else {
+            stop(CelebornExitKind.EXIT_IMMEDIATELY)
+          }
         }
-      }
-    }),
+      },
+      "worker-shutdown-hook-thread"),
     WORKER_SHUTDOWN_PRIORITY)
 
   @VisibleForTesting
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
index d6648c2fa..849dee70d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
@@ -27,8 +27,9 @@ import com.google.common.collect.Sets
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.WorkerStatus
-import org.apache.celeborn.common.protocol.{PbWorkerStatus, WorkerEventType}
 import org.apache.celeborn.common.protocol.PbWorkerStatus.State
+import org.apache.celeborn.common.protocol.WorkerEventType
+import org.apache.celeborn.common.util.ThreadUtils
 import org.apache.celeborn.service.deploy.worker.storage.StorageManager
 
 private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends 
Logging {
@@ -113,12 +114,14 @@ private[celeborn] class WorkerStatusManager(conf: 
CelebornConf) extends Logging
 
     // Compatible with current exit logic
     // trigger shutdown hook to exit
-    new Thread() {
-      override def run(): Unit = {
-        Thread.sleep(10000)
-        System.exit(0)
-      }
-    }.start()
+    ThreadUtils.newThread(
+      new Runnable {
+        override def run(): Unit = {
+          Thread.sleep(10000)
+          System.exit(0)
+        }
+      },
+      "worker-exit-thread").start()
   }
 
   def transitionState(state: State): Unit = this.synchronized {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
index 031bb7c55..c1d6978f7 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
@@ -28,8 +28,7 @@ import org.mockito.stubbing.Answer
 
 import org.apache.celeborn.CelebornFunSuite
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.service.deploy.worker.memory.{MemoryManager, 
ReadBufferListener, ReadBufferRequest}
-import org.apache.celeborn.service.deploy.worker.memory.ReadBufferDispatcher
+import org.apache.celeborn.service.deploy.worker.memory.{MemoryManager, 
ReadBufferDispatcher, ReadBufferListener, ReadBufferRequest}
 
 class ReadBufferDispactherSuite extends CelebornFunSuite {
 
@@ -64,4 +63,16 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
     requestFuture.get(5, TimeUnit.SECONDS)
   }
 
+  test("Test check thread alive") {
+    val mockedMemoryManager = mock(classOf[MemoryManager])
+    val conf = new CelebornConf()
+    
conf.set("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval", 
"100ms")
+    val readBufferDispatcher = new ReadBufferDispatcher(mockedMemoryManager, 
conf)
+    val threadId1 = readBufferDispatcher.dispatcherThread.get().getId
+    readBufferDispatcher.stopFlag = true
+    Thread.sleep(1500)
+    readBufferDispatcher.stopFlag = false
+    val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
+    assert(threadId1 != threadId2)
+  }
 }

Reply via email to