pan3793 commented on code in PR #2815:
URL: https://github.com/apache/celeborn/pull/2815#discussion_r1805870111
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java:
##########
@@ -31,24 +34,52 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.network.util.NettyUtils;
import org.apache.celeborn.common.network.util.TransportConf;
+import org.apache.celeborn.common.util.ThreadExceptionHandler;
+import org.apache.celeborn.common.util.ThreadUtils;
-public class ReadBufferDispatcher extends Thread {
+public class ReadBufferDispatcher {
private final Logger logger =
LoggerFactory.getLogger(ReadBufferDispatcher.class);
private final LinkedBlockingQueue<ReadBufferRequest> requests = new
LinkedBlockingQueue<>();
private final MemoryManager memoryManager;
private final PooledByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
- private volatile boolean stopFlag = false;
+ @VisibleForTesting public volatile boolean stopFlag = false;
+ @VisibleForTesting public final AtomicReference<Thread> dispatcherThread;
public ReadBufferDispatcher(MemoryManager memoryManager, CelebornConf conf) {
- this.readBufferAllocationWait = conf.readBufferAllocationWait();
+ readBufferAllocationWait = conf.readBufferAllocationWait();
+ long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
NettyUtils.getPooledByteBufAllocator(new TransportConf("readBuffer",
conf), null, true);
this.memoryManager = memoryManager;
- this.setName("Read-Buffer-Dispatcher");
- this.start();
+ dispatcherThread =
+ new AtomicReference<>(
+ ThreadUtils.newThread(new DispatcherRunnable(),
"ReadBufferDispatcher"));
+ dispatcherThread
+ .get()
+ .setUncaughtExceptionHandler(new
ThreadExceptionHandler("ReadBufferDispatcher"));
+ dispatcherThread.get().start();
+
+ ScheduledExecutorService checkAliveThread =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("ReadBufferDispatcherChecker");
Review Comment:
should this also be moved inside `if (checkThreadInterval > 0) { }`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]