codenohup commented on code in PR #2815:
URL: https://github.com/apache/celeborn/pull/2815#discussion_r1804302634
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -31,24 +33,51 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
private final PooledByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
private volatile boolean stopFlag = false;
+ private final AtomicReference<Thread> dispatcherThread;
public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
this.readBufferAllocationWait = conf.readBufferAllocationWait();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
this.memoryManager = memoryManager;
- this.setName("Read-Buffer-Dispatcher");
- this.start();
+ dispatcherThread =
+ new AtomicReference<>(
+ new Thread(new DispatcherRunnable(memoryManager),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(new
ThreadExceptionHandler("ReadBufferDispatcher"));
+ dispatcherThread.get().start();
+
+ ScheduledExecutorService checkAliveThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("ReadBufferDispatcherChecker");
+ checkAliveThread.scheduleWithFixedDelay(
+ new Thread(
Review Comment:
Runnable?
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -132,4 +107,73 @@ public void close() {
stopFlag = true;
requests.clear();
}
+
+ private class DispatcherRunnable implements Runnable {
+ private final MemoryManager memoryManager;
+
+ public DispatcherRunnable(MemoryManager memoryManager) {
+ this.memoryManager = memoryManager;
+ }
+
+ @Override
+ public void run() {
+ while (!stopFlag) {
+ try {
+ ReadBufferRequest request = null;
+ try {
+ request = requests.poll(1000, TimeUnit.MILLISECONDS);
Review Comment:
If the DispatcherRunnable thread has polled a request, and is terminated
before notifying the listener about the request, the request may be lost,
potentially resulting in the reader waiting indefinitely.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -31,24 +33,51 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
private final PooledByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
private volatile boolean stopFlag = false;
+ private final AtomicReference<Thread> dispatcherThread;
public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
this.readBufferAllocationWait = conf.readBufferAllocationWait();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
this.memoryManager = memoryManager;
- this.setName("Read-Buffer-Dispatcher");
- this.start();
+ dispatcherThread =
+ new AtomicReference<>(
+ new Thread(new DispatcherRunnable(memoryManager),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(new
ThreadExceptionHandler("ReadBufferDispatcher"));
Review Comment:
The ThreadExceptionHandler may unnecessary, because the
DispatcherRunnable#run will catch Throwable.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -31,24 +33,51 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
private final PooledByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
private volatile boolean stopFlag = false;
+ private final AtomicReference<Thread> dispatcherThread;
public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
this.readBufferAllocationWait = conf.readBufferAllocationWait();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
this.memoryManager = memoryManager;
- this.setName("Read-Buffer-Dispatcher");
- this.start();
+ dispatcherThread =
+ new AtomicReference<>(
+ new Thread(new DispatcherRunnable(memoryManager),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(new
ThreadExceptionHandler("ReadBufferDispatcher"));
+ dispatcherThread.get().start();
+
+ ScheduledExecutorService checkAliveThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("ReadBufferDispatcherChecker");
+ checkAliveThread.scheduleWithFixedDelay(
+ new Thread(
+ () -> {
+ if (!dispatcherThread.get().isAlive()) {
+ dispatcherThread.set(
+ new Thread(new DispatcherRunnable(memoryManager),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(
+ new ThreadExceptionHandler("ReadBufferDispatcher"));
+ dispatcherThread.get().start();
+ }
+ }),
+ 50,
Review Comment:
Checking every 50 milliseconds may be too frequent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]