This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit ba47aa017f67e681af7c311c4ef8578eef740d4b Author: Zhen Wang <643348...@qq.com> AuthorDate: Thu Jun 30 14:56:54 2022 +0800 [Minor] Make clearResourceThread and processEventThread daemon (#207) ### What changes were proposed in this pull request? Make clearResourceThread daemon and processEventThread daemon. ### Why are the changes needed? `clearResourceThread` and `processEventThread` never exits, we can make it daemon. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Nod --- .../java/com/tencent/rss/server/ShuffleFlushManager.java | 12 ++++++++---- .../main/java/com/tencent/rss/server/ShuffleTaskManager.java | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java index e246b02..be941ac 100644 --- a/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java +++ b/server/src/main/java/com/tencent/rss/server/ShuffleFlushManager.java @@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.RangeMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.conf.Configuration; import org.roaringbitmap.longlong.Roaring64NavigableMap; @@ -60,7 +61,6 @@ public class ShuffleFlushManager { private Map<String, Map<Integer, RangeMap<Integer, ShuffleWriteHandler>>> handlers = Maps.newConcurrentMap(); // appId -> shuffleId -> committed shuffle blockIds private Map<String, Map<Integer, Roaring64NavigableMap>> committedBlockIds = Maps.newConcurrentMap(); - private Runnable processEventThread; private final int retryMax; private final StorageManager storageManager; @@ -84,11 +84,12 @@ public class ShuffleFlushManager { BlockingQueue<Runnable> waitQueue = Queues.newLinkedBlockingQueue(waitQueueSize); int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE); long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE); - threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue); + threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build()); storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(","); pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC); // the thread for flush data - processEventThread = () -> { + Runnable processEventRunnable = () -> { while (true) { try { ShuffleDataFlushEvent event = flushQueue.take(); @@ -103,7 +104,10 @@ public class ShuffleFlushManager { } } }; - new Thread(processEventThread).start(); + Thread processEventThread = new Thread(processEventRunnable); + processEventThread.setName("ProcessEventThread"); + processEventThread.setDaemon(true); + processEventThread.start(); // todo: extract a class named Service, and support stop method Thread thread = new Thread("PendingEventProcessThread") { @Override diff --git a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java index e847779..fc37a19 100644 --- a/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java +++ b/server/src/main/java/com/tencent/rss/server/ShuffleTaskManager.java @@ -123,6 +123,7 @@ public class ShuffleTaskManager { }; Thread thread = new Thread(clearResourceThread); thread.setName("clearResourceThread"); + thread.setDaemon(true); thread.start(); }