This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 9e6585ef0 [CELEBORN-1655] Fix read buffer dispatcher thread terminate
unexpectedly
9e6585ef0 is described below
commit 9e6585ef048000ba9d0c7fd882b81ff5d4e3e4b2
Author: mingji <[email protected]>
AuthorDate: Fri Oct 18 15:53:23 2024 +0800
[CELEBORN-1655] Fix read buffer dispatcher thread terminate unexpectedly
The read buffer dispatcher may lose its dispatcher thread which is not
acceptable.
1. Add a scheduler pool to ensure the dispatcher thread is alive.
2. Add an unhandled exception handler to record possible exceptions that
cause the thread to be lost.
NO.
Cluster test.
Closes #2815 from FMX/b1655.
Lead-authored-by: mingji <[email protected]>
Co-authored-by: Ethan Feng <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit a94147cd9de3469f84bda9f738dc85e3e558117e)
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/client/write/DataPusher.java | 75 ++++++-----
.../network/ssl/ReloadingX509TrustManager.java | 5 +-
.../celeborn/common/util/ShutdownHookManager.java | 40 +++---
.../org/apache/celeborn/common/CelebornConf.scala | 10 ++
.../apache/celeborn/common/util/ThreadUtils.scala | 12 ++
.../org/apache/celeborn/common/util/Utils.scala | 15 ++-
.../network/TransportClientFactorySuiteJ.java | 6 +-
.../apache/celeborn/common/rpc/RpcEnvSuite.scala | 14 +-
.../celeborn/common/rpc/netty/InboxSuite.scala | 19 +--
.../service/deploy/master/MasterSuite.scala | 29 ++--
.../master/http/api/ApiMasterResourceSuite.scala | 14 +-
.../deploy/worker/memory/ReadBufferDispatcher.java | 150 +++++++++++++--------
.../celeborn/service/deploy/worker/Worker.scala | 36 ++---
.../deploy/worker/WorkerStatusManager.scala | 17 ++-
.../deploy/memory/ReadBufferDispactherSuite.scala | 15 ++-
15 files changed, 272 insertions(+), 185 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
index 807952d15..4c8b83abe 100644
--- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
+++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
-import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
public class DataPusher {
private static final Logger logger =
LoggerFactory.getLogger(DataPusher.class);
@@ -100,45 +100,46 @@ public class DataPusher {
this.mapStatusLengths = mapStatusLengths;
pushThread =
- new Thread("celeborn-client-data-pusher-" + taskId) {
- private void reclaimTask(PushTask task) throws InterruptedException {
- idleLock.lockInterruptibly();
- try {
- idleQueue.put(task);
- if (idleQueue.remainingCapacity() == 0) {
- idleFull.signal();
+ ThreadUtils.newDaemonThread(
+ new Runnable() {
+ private void reclaimTask(PushTask task) throws
InterruptedException {
+ idleLock.lockInterruptibly();
+ try {
+ idleQueue.put(task);
+ if (idleQueue.remainingCapacity() == 0) {
+ idleFull.signal();
+ }
+ } catch (InterruptedException e) {
+ logger.error("DataPusher thread interrupted while reclaiming
data.");
+ throw e;
+ } finally {
+ idleLock.unlock();
+ }
}
- } catch (InterruptedException e) {
- logger.error("DataPusher thread interrupted while reclaiming
data.");
- throw e;
- } finally {
- idleLock.unlock();
- }
- }
-
- @Override
- public void run() {
- while (stillRunning()) {
- try {
- ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
- for (int i = 0; i < tasks.size(); i++) {
- PushTask task = tasks.get(i);
- pushData(task);
- reclaimTask(task);
+
+ @Override
+ public void run() {
+ while (stillRunning()) {
+ try {
+ ArrayList<PushTask> tasks = dataPushQueue.takePushTasks();
+ for (int i = 0; i < tasks.size(); i++) {
+ PushTask task = tasks.get(i);
+ pushData(task);
+ reclaimTask(task);
+ }
+ } catch (CelebornIOException e) {
+ exceptionRef.set(e);
+ } catch (IOException e) {
+ exceptionRef.set(new CelebornIOException(e));
+ } catch (InterruptedException e) {
+ logger.error("DataPusher push thread interrupted while
pushing data.");
+ break;
+ }
}
- } catch (CelebornIOException e) {
- exceptionRef.set(e);
- } catch (IOException e) {
- exceptionRef.set(new CelebornIOException(e));
- } catch (InterruptedException e) {
- logger.error("DataPusher push thread interrupted while pushing
data.");
- break;
}
- }
- }
- };
- pushThread.setDaemon(true);
- pushThread.setUncaughtExceptionHandler(new
ThreadExceptionHandler("DataPusher-" + taskId));
+ },
+ "celeborn-client-data-pusher-" + taskId);
+
pushThread.start();
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
b/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
index 950b5408d..c6d9f61da 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/ssl/ReloadingX509TrustManager.java
@@ -35,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.celeborn.common.util.ThreadUtils;
+
/**
* A {@link TrustManager} implementation that reloads its configuration when
the truststore file on
* disk changes. This implementation is based off of the
@@ -90,8 +92,7 @@ public class ReloadingX509TrustManager implements
X509TrustManager, Runnable {
/** Starts the reloader thread. */
public void init() {
- reloader = new Thread(this, "Truststore reloader thread");
- reloader.setDaemon(true);
+ reloader = ThreadUtils.newDaemonThread(this, "Truststore reloader thread");
reloader.start();
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
index 47659fda0..ef4f60605 100644
---
a/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
+++
b/common/src/main/java/org/apache/celeborn/common/util/ShutdownHookManager.java
@@ -67,25 +67,27 @@ public final class ShutdownHookManager {
try {
Runtime.getRuntime()
.addShutdownHook(
- new Thread() {
- @Override
- public void run() {
- if (MGR.shutdownInProgress.getAndSet(true)) {
- LOG.info("Shutdown process invoked a second time:
ignoring");
- return;
- }
- long started = System.currentTimeMillis();
- int timeoutCount = executeShutdown();
- long ended = System.currentTimeMillis();
- LOG.debug(
- String.format(
- "Completed shutdown in %.3f seconds; Timeouts: %d",
- (ended - started) / 1000.0, timeoutCount));
- // each of the hooks have executed; now shut down the
- // executor itself.
- shutdownExecutor(new CelebornConf());
- }
- });
+ ThreadUtils.newThread(
+ new Runnable() {
+ @Override
+ public void run() {
+ if (MGR.shutdownInProgress.getAndSet(true)) {
+ LOG.info("Shutdown process invoked a second time:
ignoring");
+ return;
+ }
+ long started = System.currentTimeMillis();
+ int timeoutCount = executeShutdown();
+ long ended = System.currentTimeMillis();
+ LOG.debug(
+ String.format(
+ "Completed shutdown in %.3f seconds; Timeouts:
%d",
+ (ended - started) / 1000.0, timeoutCount));
+ // each of the hooks have executed; now shut down the
+ // executor itself.
+ shutdownExecutor(new CelebornConf());
+ }
+ },
+ "shutdown-hook-thread"));
} catch (IllegalStateException ex) {
// JVM is being shut down. Ignore
LOG.warn("Failed to add the ShutdownHook", ex);
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index e006e765f..4fd5965a5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1127,6 +1127,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def readBufferTargetUpdateInterval: Long =
get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
def readBufferTargetNotifyThreshold: Long =
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
def readBuffersToTriggerReadMin: Int =
get(WORKER_READBUFFERS_TOTRIGGERREAD_MIN)
+ def readBufferDispatcherCheckThreadInterval: Long =
get(WORKER_READBUFFER_CHECK_THREAD_INTERVAL)
// //////////////////////////////////////////////////////
// Decommission //
@@ -3499,6 +3500,15 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(32)
+ val WORKER_READBUFFER_CHECK_THREAD_INTERVAL: ConfigEntry[Long] =
+
buildConf("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval")
+ .categories("worker")
+ .version("0.5.2")
+ .internal
+ .doc("The interval for worker to check read buffer dispatcher thread. 0
means disable.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(0)
+
val WORKER_PUSH_HEARTBEAT_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.push.heartbeat.enabled")
.categories("worker")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
index caa3719a4..6ddbbb06d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/ThreadUtils.scala
@@ -393,6 +393,18 @@ object ThreadUtils {
pool.shutdownNow()
}
}
+
+ def newThread(runnable: Runnable, name: String): Thread = {
+ val thread = new Thread(runnable, name)
+ thread.setUncaughtExceptionHandler(new ThreadExceptionHandler(name))
+ thread
+ }
+
+ def newDaemonThread(runnable: Runnable, name: String): Thread = {
+ val thread = newThread(runnable, name)
+ thread.setDaemon(true)
+ thread
+ }
}
class ThreadExceptionHandler(executorService: String)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 959054554..0b0fa4ed7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -845,14 +845,15 @@ object Utils extends Logging {
threadName: String,
inputStream: InputStream,
processLine: String => Unit): Thread = {
- val t = new Thread(threadName) {
- override def run(): Unit = {
- for (line <- Source.fromInputStream(inputStream).getLines()) {
- processLine(line)
+ val t = ThreadUtils.newDaemonThread(
+ new Runnable {
+ override def run(): Unit = {
+ for (line <- Source.fromInputStream(inputStream).getLines()) {
+ processLine(line)
+ }
}
- }
- }
- t.setDaemon(true)
+ },
+ threadName)
t.start()
t
}
diff --git
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
index b77a9c7d0..fb87d1af3 100644
---
a/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/network/TransportClientFactorySuiteJ.java
@@ -35,6 +35,7 @@ import
org.apache.celeborn.common.network.server.BaseMessageHandler;
import org.apache.celeborn.common.network.server.TransportServer;
import org.apache.celeborn.common.network.util.TransportConf;
import org.apache.celeborn.common.util.JavaUtils;
+import org.apache.celeborn.common.util.ThreadUtils;
public class TransportClientFactorySuiteJ {
@@ -94,7 +95,7 @@ public class TransportClientFactorySuiteJ {
// Launch a bunch of threads to create new clients.
for (int i = 0; i < attempts.length; i++) {
attempts[i] =
- new Thread(
+ ThreadUtils.newThread(
() -> {
try {
TransportClient client =
factory.createClient(getLocalHost(), server1.getPort());
@@ -105,7 +106,8 @@ public class TransportClientFactorySuiteJ {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
- });
+ },
+ "test-thread");
if (concurrent) {
attempts[i].start();
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
index d1ab75f37..6011960f1 100644
--- a/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/rpc/RpcEnvSuite.scala
@@ -383,13 +383,15 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
})
(0 until 10) foreach { _ =>
- new Thread {
- override def run(): Unit = {
- (0 until 100) foreach { _ =>
- endpointRef.send("Hello")
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ (0 until 100) foreach { _ =>
+ endpointRef.send("Hello")
+ }
}
- }
- }.start()
+ },
+ "rpc-env-test-thread").start()
}
eventually(timeout(5.seconds), interval(5.milliseconds)) {
diff --git
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
index ab86a57e8..553b791d8 100644
---
a/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/rpc/netty/InboxSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.rpc.{RpcAddress, TestRpcEndpoint}
+import org.apache.celeborn.common.util.ThreadUtils
class InboxSuite extends CelebornFunSuite with BeforeAndAfter {
@@ -94,15 +95,17 @@ class InboxSuite extends CelebornFunSuite with
BeforeAndAfter {
val exitLatch = new CountDownLatch(10)
for (_ <- 0 until 10) {
- new Thread {
- override def run(): Unit = {
- for (_ <- 0 until 100) {
- val message = OneWayMessage(null, "hi")
- inbox.post(message)
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ for (_ <- 0 until 100) {
+ val message = OneWayMessage(null, "hi")
+ inbox.post(message)
+ }
+ exitLatch.countDown()
}
- exitLatch.countDown()
- }
- }.start()
+ },
+ "inbox-test-thread").start()
}
// Try to process some messages
inbox.process(dispatcher)
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
index 26ea0b040..564d818a3 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/MasterSuite.scala
@@ -18,15 +18,14 @@
package org.apache.celeborn.service.deploy.master
import com.google.common.io.Files
-import org.mockito.Mockito.{mock, times, verify}
+import org.mockito.Mockito.mock
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.{PbCheckForWorkerTimeout,
PbRegisterWorker}
-import
org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost,
HeartbeatFromApplication}
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
class MasterSuite extends AnyFunSuite
with BeforeAndAfterAll
@@ -54,11 +53,13 @@ class MasterSuite extends AnyFunSuite
val masterArgs = new MasterArguments(args, conf)
val master = new Master(conf, masterArgs)
- new Thread() {
- override def run(): Unit = {
- master.initialize()
- }
- }.start()
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ },
+ "master-init-thread").start()
Thread.sleep(5000L)
master.stop(CelebornExitKind.EXIT_IMMEDIATELY)
master.rpcEnv.shutdown()
@@ -76,11 +77,13 @@ class MasterSuite extends AnyFunSuite
val masterArgs = new MasterArguments(args, conf)
val master = new Master(conf, masterArgs)
- new Thread() {
- override def run(): Unit = {
- master.initialize()
- }
- }.start()
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ },
+ "master-init-thread").start()
Thread.sleep(5000L)
master.receive.applyOrElse(
PbCheckForWorkerTimeout.newBuilder().build(),
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
index 1fc92793c..1c1daaec9 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/http/api/ApiMasterResourceSuite.scala
@@ -23,7 +23,7 @@ import javax.ws.rs.core.{Form, MediaType}
import com.google.common.io.Files
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, ThreadUtils, Utils}
import org.apache.celeborn.server.common.HttpService
import org.apache.celeborn.server.common.http.ApiBaseResourceSuite
import org.apache.celeborn.service.deploy.master.{Master, MasterArguments}
@@ -52,11 +52,13 @@ class ApiMasterResourceSuite extends ApiBaseResourceSuite {
val masterArgs = new MasterArguments(args, celebornConf)
master = new Master(celebornConf, masterArgs)
- new Thread() {
- override def run(): Unit = {
- master.initialize()
- }
- }.start()
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ master.initialize()
+ }
+ },
+ "master-init-thread").start()
super.beforeAll()
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index b433d3c2e..1646095ec 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -20,9 +20,12 @@ package org.apache.celeborn.service.deploy.worker.memory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
+import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.slf4j.Logger;
@@ -31,24 +34,45 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadUtils;
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
private final PooledByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
- private volatile boolean stopFlag = false;
+ @VisibleForTesting public volatile boolean stopFlag = false;
+ @VisibleForTesting public final AtomicReference<Thread> dispatcherThread;
public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
- this.readBufferAllocationWait = conf.readBufferAllocationWait();
+ readBufferAllocationWait = conf.readBufferAllocationWait();
+ long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
this.memoryManager = memoryManager;
- this.setName("Read-Buffer-Dispatcher");
- this.start();
+ dispatcherThread =
+ new AtomicReference<>(
+ ThreadUtils.newThread(new DispatcherRunnable(),
"read-buffer-dispatcher"));
+ dispatcherThread.get().start();
+
+ if (checkThreadInterval > 0) {
+ ScheduledExecutorService checkAliveThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("read-buffer-dispatcher-checker");
+ checkAliveThread.scheduleWithFixedDelay(
+ () -> {
+ if (!dispatcherThread.get().isAlive()) {
+ dispatcherThread.set(
+ ThreadUtils.newThread(new DispatcherRunnable(),
"read-buffer-dispatcher"));
+ dispatcherThread.get().start();
+ }
+ },
+ checkThreadInterval,
+ checkThreadInterval,
+ TimeUnit.MILLISECONDS);
+ }
}
public void addBufferRequest(ReadBufferRequest request) {
@@ -66,60 +90,6 @@ public class ReadBufferDispatcher extends Thread {
memoryManager.changeReadBufferCounter(-1 * bufferSize);
}
- @Override
- public void run() {
- while (!stopFlag) {
- ReadBufferRequest request = null;
- try {
- request = requests.poll(1000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.info("Buffer dispatcher is closing");
- }
-
- List<ByteBuf> buffers = null;
- try {
- if (request != null) {
- long start = System.nanoTime();
- int bufferSize = request.getBufferSize();
- buffers = new ArrayList<>();
- while (buffers.size() < request.getNumber()) {
- if (memoryManager.readBufferAvailable(bufferSize)) {
- ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
- buffers.add(buf);
- memoryManager.changeReadBufferCounter(bufferSize);
- allocatedReadBuffers.increment();
- } else {
- try {
- // If dispatcher can not allocate requested buffers, it will
wait here until
- // necessary buffers are get.
- Thread.sleep(this.readBufferAllocationWait);
- } catch (InterruptedException e) {
- logger.info("Buffer dispatcher is closing");
- }
- }
- }
- long end = System.nanoTime();
- logger.debug(
- "process read buffer request using {} ms",
- TimeUnit.NANOSECONDS.toMillis(end - start));
- request.getBufferListener().notifyBuffers(buffers, null);
- } else {
- // Free buffer pool memory to main direct memory when dispatcher is
idle.
- readBufferAllocator.trimCurrentThreadCache();
- }
- } catch (Throwable e) {
- logger.error(e.getMessage(), e);
- // recycle all allocated buffers
- if (buffers != null) {
- buffers.forEach(this::recycle);
- }
-
- // notify listener has exception
- request.getBufferListener().notifyBuffers(null, e);
- }
- }
- }
-
public int requestsLength() {
return requests.size();
}
@@ -132,4 +102,66 @@ public class ReadBufferDispatcher extends Thread {
stopFlag = true;
requests.clear();
}
+
+ private class DispatcherRunnable implements Runnable {
+
+ public DispatcherRunnable() {}
+
+ @Override
+ public void run() {
+ while (!stopFlag) {
+ try {
+ ReadBufferRequest request;
+ request = requests.poll(1000, TimeUnit.MILLISECONDS);
+ List<ByteBuf> buffers = new ArrayList<>();
+ try {
+ if (request != null) {
+ processBufferRequest(request, buffers);
+ } else {
+ // Free buffer pool memory to main direct memory when dispatcher
is idle.
+ readBufferAllocator.trimCurrentThreadCache();
+ }
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ try {
+ // recycle all allocated buffers
+ for (ByteBuf buffer : buffers) {
+ recycle(buffer);
+ }
+ } catch (Throwable e1) {
+ logger.error("Recycle read buffer failed.", e1);
+ }
+ request.getBufferListener().notifyBuffers(null, e);
+ }
+ } catch (Throwable e) {
+ logger.error("Read buffer dispatcher encountered error: {}",
e.getMessage(), e);
+ }
+ }
+ }
+
+ void processBufferRequest(ReadBufferRequest request, List<ByteBuf>
buffers) {
+ long start = System.nanoTime();
+ int bufferSize = request.getBufferSize();
+ while (buffers.size() < request.getNumber()) {
+ if (memoryManager.readBufferAvailable(bufferSize)) {
+ ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
+ buffers.add(buf);
+ memoryManager.changeReadBufferCounter(bufferSize);
+ allocatedReadBuffers.increment();
+ } else {
+ try {
+ // If dispatcher can not allocate requested buffers, it will wait
here until
+ // necessary buffers are get.
+ Thread.sleep(readBufferAllocationWait);
+ } catch (InterruptedException e) {
+ logger.warn("Buffer dispatcher is closing");
+ }
+ }
+ }
+ long end = System.nanoTime();
+ logger.debug(
+ "process read buffer request using {} ms",
TimeUnit.NANOSECONDS.toMillis(end - start));
+ request.getBufferListener().notifyBuffers(buffers, null);
+ }
+ }
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index a64031f06..552f46ff6 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -933,25 +933,27 @@ private[celeborn] class Worker(
}
ShutdownHookManager.get().addShutdownHook(
- new Thread(new Runnable {
- override def run(): Unit = {
- logInfo("Shutdown hook called.")
- workerStatusManager.exitEventType match {
- case WorkerEventType.Graceful =>
- shutdownGracefully()
- case WorkerEventType.Decommission =>
- decommissionWorker()
- case _ =>
- exitImmediately()
- }
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ logInfo("Shutdown hook called.")
+ workerStatusManager.exitEventType match {
+ case WorkerEventType.Graceful =>
+ shutdownGracefully()
+ case WorkerEventType.Decommission =>
+ decommissionWorker()
+ case _ =>
+ exitImmediately()
+ }
- if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
- stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
- } else {
- stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
+ stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
+ } else {
+ stop(CelebornExitKind.EXIT_IMMEDIATELY)
+ }
}
- }
- }),
+ },
+ "worker-shutdown-hook-thread"),
WORKER_SHUTDOWN_PRIORITY)
@VisibleForTesting
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
index d6648c2fa..849dee70d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala
@@ -27,8 +27,9 @@ import com.google.common.collect.Sets
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.WorkerStatus
-import org.apache.celeborn.common.protocol.{PbWorkerStatus, WorkerEventType}
import org.apache.celeborn.common.protocol.PbWorkerStatus.State
+import org.apache.celeborn.common.protocol.WorkerEventType
+import org.apache.celeborn.common.util.ThreadUtils
import org.apache.celeborn.service.deploy.worker.storage.StorageManager
private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends
Logging {
@@ -113,12 +114,14 @@ private[celeborn] class WorkerStatusManager(conf:
CelebornConf) extends Logging
// Compatible with current exit logic
// trigger shutdown hook to exit
- new Thread() {
- override def run(): Unit = {
- Thread.sleep(10000)
- System.exit(0)
- }
- }.start()
+ ThreadUtils.newThread(
+ new Runnable {
+ override def run(): Unit = {
+ Thread.sleep(10000)
+ System.exit(0)
+ }
+ },
+ "worker-exit-thread").start()
}
def transitionState(state: State): Unit = this.synchronized {
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
index 031bb7c55..c1d6978f7 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
@@ -28,8 +28,7 @@ import org.mockito.stubbing.Answer
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.service.deploy.worker.memory.{MemoryManager,
ReadBufferListener, ReadBufferRequest}
-import org.apache.celeborn.service.deploy.worker.memory.ReadBufferDispatcher
+import org.apache.celeborn.service.deploy.worker.memory.{MemoryManager,
ReadBufferDispatcher, ReadBufferListener, ReadBufferRequest}
class ReadBufferDispactherSuite extends CelebornFunSuite {
@@ -64,4 +63,16 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
requestFuture.get(5, TimeUnit.SECONDS)
}
+ test("Test check thread alive") {
+ val mockedMemoryManager = mock(classOf[MemoryManager])
+ val conf = new CelebornConf()
+
conf.set("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval",
"100ms")
+ val readBufferDispatcher = new ReadBufferDispatcher(mockedMemoryManager,
conf)
+ val threadId1 = readBufferDispatcher.dispatcherThread.get().getId
+ readBufferDispatcher.stopFlag = true
+ Thread.sleep(1500)
+ readBufferDispatcher.stopFlag = false
+ val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
+ assert(threadId1 != threadId2)
+ }
}