pan3793 commented on code in PR #2815:
URL: https://github.com/apache/celeborn/pull/2815#discussion_r1804621282
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -31,24 +33,51 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
private final PooledByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
private volatile boolean stopFlag = false;
+ private final AtomicReference<Thread> dispatcherThread;
public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
this.readBufferAllocationWait = conf.readBufferAllocationWait();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
this.memoryManager = memoryManager;
- this.setName("Read-Buffer-Dispatcher");
- this.start();
+ dispatcherThread =
+ new AtomicReference<>(
+ new Thread(new DispatcherRunnable(memoryManager),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(new
ThreadExceptionHandler("ReadBufferDispatcher"));
+ dispatcherThread.get().start();
+
+ ScheduledExecutorService checkAliveThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("ReadBufferDispatcherChecker");
+ checkAliveThread.scheduleWithFixedDelay(
+ new Thread(
+ () -> {
+ if (!dispatcherThread.get().isAlive()) {
+ dispatcherThread.set(
+ new Thread(new DispatcherRunnable(memoryManager),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(
+ new ThreadExceptionHandler("ReadBufferDispatcher"));
+ dispatcherThread.get().start();
+ }
+ }),
+ 50,
Review Comment:
this is more like a diagnostic approach rather than a regular logic that
should run in production. for this case, we can temporarily introduce an
`internal` conf.
-the `internal` conf won't be exposed to the end user, and can be removed
anytime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]