This is an automated email from the ASF dual-hosted git repository.

changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 64e1e67581 Use DefaultUncaughtExceptionHandler to log the uncached 
exception (#15496)
64e1e67581 is described below

commit 64e1e675818c1db0dbba9f35b30f32c0ad129021
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jan 18 14:59:17 2024 +0800

    Use DefaultUncaughtExceptionHandler to log the uncached exception (#15496)
    
    Co-authored-by: fuchanghai <[email protected]>
---
 .../apache/dolphinscheduler/alert/AlertServer.java |  4 ++
 .../alert/metrics/AlertServerMetrics.java          |  6 +++
 .../dolphinscheduler/api/ApiApplicationServer.java |  4 ++
 .../api/metrics/ApiServerMetrics.java              |  8 +++
 .../common/thread/BaseDaemonThread.java            |  2 +
 ...d.java => DefaultUncaughtExceptionHandler.java} | 33 ++++++++----
 .../common/thread/ThreadUtils.java                 | 20 +++----
 .../common/thread/ThreadUtilsTest.java}            | 27 +++++-----
 .../extract/base/NettyClientHandler.java           |  2 +-
 .../extract/base/NettyRemotingClient.java          | 16 +++---
 .../extract/base/NettyRemotingServer.java          | 17 ++----
 .../base/server/JdkDynamicServerHandler.java       |  2 +-
 .../extract/base/utils/NamedThreadFactory.java     | 61 ----------------------
 .../server/master/MasterServer.java                |  4 ++
 .../server/master/metrics/MasterServerMetrics.java |  6 +++
 .../server/master/registry/ServerNodeManager.java  |  6 +--
 .../execute/AsyncMasterTaskDelayQueueLooper.java   |  2 +-
 .../execute/MasterAsyncTaskExecutorThreadPool.java |  2 +-
 .../execute/MasterSyncTaskExecutorThreadPool.java  |  2 +-
 .../plugin/task/api/AbstractCommandExecutor.java   |  2 +-
 .../server/worker/WorkerServer.java                |  3 ++
 .../server/worker/metrics/WorkerServerMetrics.java |  6 +++
 .../runner/WorkerTaskExecutorThreadPool.java       |  4 +-
 23 files changed, 111 insertions(+), 128 deletions(-)

diff --git 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index aeb2baf7a2..fd3d4b02e3 100644
--- 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.alert;
 
+import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
 import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
 import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
 import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
@@ -24,6 +25,7 @@ import 
org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
 import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 
 import javax.annotation.PreDestroy;
@@ -54,6 +56,8 @@ public class AlertServer {
     private AlertRegistryClient alertRegistryClient;
 
     public static void main(String[] args) {
+        
AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+        
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
         Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
         new SpringApplicationBuilder(AlertServer.class).run(args);
     }
diff --git 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
index 4784aa3b62..db75a49371 100644
--- 
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
+++ 
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
@@ -45,6 +45,12 @@ public class AlertServerMetrics {
                 .register(Metrics.globalRegistry);
     }
 
+    public static void registerUncachedException(final Supplier<Number> 
supplier) {
+        Gauge.builder("ds.alert.uncached.exception", supplier)
+                .description("number of uncached exception")
+                .register(Metrics.globalRegistry);
+    }
+
     public void incAlertSuccessCount() {
         alertSuccessCounter.increment();
     }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 2bb9b7cf51..c7e6d9778f 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -17,7 +17,9 @@
 
 package org.apache.dolphinscheduler.api;
 
+import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
 import org.apache.dolphinscheduler.common.enums.PluginType;
+import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
 import org.apache.dolphinscheduler.dao.PluginDao;
 import org.apache.dolphinscheduler.dao.entity.PluginDefine;
 import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
@@ -51,6 +53,8 @@ public class ApiApplicationServer {
     private PluginDao pluginDao;
 
     public static void main(String[] args) {
+        
ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+        
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
         SpringApplication.run(ApiApplicationServer.class);
     }
 
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
index 08f114dc2e..f97f9afffa 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
@@ -18,10 +18,12 @@
 package org.apache.dolphinscheduler.api.metrics;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import lombok.experimental.UtilityClass;
 import io.micrometer.core.instrument.Counter;
 import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.Metrics;
 import io.micrometer.core.instrument.Timer;
 
@@ -120,4 +122,10 @@ public class ApiServerMetrics {
                         "ds.api.response.time",
                         "user.id", String.valueOf(userId)));
     }
+
+    public static void registerUncachedException(final Supplier<Number> 
supplier) {
+        Gauge.builder("ds.api.uncached.exception", supplier)
+                .description("number of uncached exception")
+                .register(Metrics.globalRegistry);
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
index 88a44004cb..b495bb7c56 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
@@ -25,12 +25,14 @@ public abstract class BaseDaemonThread extends Thread {
     protected BaseDaemonThread(Runnable runnable) {
         super(runnable);
         this.setDaemon(true);
+        
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
     }
 
     protected BaseDaemonThread(String threadName) {
         super();
         this.setName(threadName);
         this.setDaemon(true);
+        
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
     }
 
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
similarity index 52%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
copy to 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
index 88a44004cb..62a0fd6911 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
@@ -17,20 +17,31 @@
 
 package org.apache.dolphinscheduler.common.thread;
 
-/**
- * All thread used in DolphinScheduler should extend with this class to avoid 
the server hang issue.
- */
-public abstract class BaseDaemonThread extends Thread {
+import java.util.concurrent.atomic.LongAdder;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DefaultUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
+
+    private static final DefaultUncaughtExceptionHandler INSTANCE = new 
DefaultUncaughtExceptionHandler();
 
-    protected BaseDaemonThread(Runnable runnable) {
-        super(runnable);
-        this.setDaemon(true);
+    private static final LongAdder uncaughtExceptionCount = new LongAdder();
+
+    private DefaultUncaughtExceptionHandler() {
+    }
+
+    public static DefaultUncaughtExceptionHandler getInstance() {
+        return INSTANCE;
     }
 
-    protected BaseDaemonThread(String threadName) {
-        super();
-        this.setName(threadName);
-        this.setDaemon(true);
+    public static long getUncaughtExceptionCount() {
+        return uncaughtExceptionCount.longValue();
     }
 
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        uncaughtExceptionCount.add(1);
+        log.error("Caught an exception in {}.", t, e);
+    }
 }
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 5eef04ed82..c4271919b5 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -31,24 +31,20 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
 @Slf4j
 public class ThreadUtils {
 
-    /**
-     * Wrapper over newDaemonFixedThreadExecutor.
-     *
-     * @param threadName threadName
-     * @param threadsNum threadsNum
-     * @return ExecutorService
-     */
     public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String 
threadName, int threadsNum) {
-        ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
-        return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, 
threadFactory);
+        return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, 
newDaemonThreadFactory(threadName));
     }
 
     public static ScheduledExecutorService 
newSingleDaemonScheduledExecutorService(String threadName) {
-        ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                .setNameFormat(threadName)
+        return 
Executors.newSingleThreadScheduledExecutor(newDaemonThreadFactory(threadName));
+    }
+
+    public static ThreadFactory newDaemonThreadFactory(String threadName) {
+        return new ThreadFactoryBuilder()
                 .setDaemon(true)
+                .setNameFormat(threadName)
+                
.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance())
                 .build();
-        return Executors.newSingleThreadScheduledExecutor(threadFactory);
     }
 
     /**
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
similarity index 59%
copy from 
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
copy to 
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
index 88a44004cb..7d7b2c0ac6 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
+++ 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
@@ -17,20 +17,21 @@
 
 package org.apache.dolphinscheduler.common.thread;
 
-/**
- * All thread used in DolphinScheduler should extend with this class to avoid 
the server hang issue.
- */
-public abstract class BaseDaemonThread extends Thread {
+import java.util.concurrent.ThreadPoolExecutor;
 
-    protected BaseDaemonThread(Runnable runnable) {
-        super(runnable);
-        this.setDaemon(true);
-    }
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-    protected BaseDaemonThread(String threadName) {
-        super();
-        this.setName(threadName);
-        this.setDaemon(true);
-    }
+class ThreadUtilsTest {
 
+    @Test
+    void newDaemonFixedThreadExecutor() throws InterruptedException {
+        ThreadPoolExecutor threadPoolExecutor = 
ThreadUtils.newDaemonFixedThreadExecutor("DemonThread", 1);
+        threadPoolExecutor.execute(() -> {
+            throw new IllegalArgumentException("I am an exception");
+        });
+        Thread.sleep(1_000);
+        Assertions.assertEquals(1, 
DefaultUncaughtExceptionHandler.getUncaughtExceptionCount());
+
+    }
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
index 2b49b2fec2..b0d998af83 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
@@ -67,7 +67,7 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
         future.release();
         if (future.getInvokeCallback() != null) {
             future.removeFuture();
-            this.callbackExecutor.submit(future::executeInvokeCallback);
+            this.callbackExecutor.execute(future::executeInvokeCallback);
         } else {
             future.putResponse(deserialize);
         }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
index 1ca87091af..e4682f5224 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.extract.base;
 
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
 import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
 import 
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
@@ -30,7 +31,6 @@ import 
org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
 import 
org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
 import org.apache.dolphinscheduler.extract.base.utils.Constants;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
-import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
 import java.net.InetSocketAddress;
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -80,12 +81,11 @@ public class NettyRemotingClient implements AutoCloseable {
 
     public NettyRemotingClient(final NettyClientConfig clientConfig) {
         this.clientConfig = clientConfig;
+        ThreadFactory nettyClientThreadFactory = 
ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
         if (Epoll.isAvailable()) {
-            this.workerGroup =
-                    new EpollEventLoopGroup(clientConfig.getWorkerThreads(), 
new NamedThreadFactory("NettyClient"));
+            this.workerGroup = new 
EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
         } else {
-            this.workerGroup =
-                    new NioEventLoopGroup(clientConfig.getWorkerThreads(), new 
NamedThreadFactory("NettyClient"));
+            this.workerGroup = new 
NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
         }
         this.callbackExecutor = new ThreadPoolExecutor(
                 Constants.CPUS,
@@ -93,12 +93,12 @@ public class NettyRemotingClient implements AutoCloseable {
                 1,
                 TimeUnit.MINUTES,
                 new LinkedBlockingQueue<>(1000),
-                new NamedThreadFactory("CallbackExecutor"),
+                
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
                 new CallerThreadExecutePolicy());
         this.clientHandler = new NettyClientHandler(this, callbackExecutor);
 
-        this.responseFutureExecutor =
-                Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("ResponseFutureExecutor"));
+        this.responseFutureExecutor = 
Executors.newSingleThreadScheduledExecutor(
+                
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
 
         this.start();
     }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
index 7655b804fe..365a17dd03 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.extract.base;
 
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
@@ -27,15 +28,11 @@ import 
org.apache.dolphinscheduler.extract.base.utils.Constants;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.extern.slf4j.Slf4j;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
@@ -55,8 +52,8 @@ public class NettyRemotingServer {
 
     private final ServerBootstrap serverBootstrap = new ServerBootstrap();
 
-    private final ExecutorService defaultExecutor =
-            
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+    private final ExecutorService defaultExecutor = ThreadUtils
+            .newDaemonFixedThreadExecutor("NettyRemotingServerThread", 
Runtime.getRuntime().availableProcessors() * 2);
 
     private final EventLoopGroup bossGroup;
 
@@ -68,16 +65,12 @@ public class NettyRemotingServer {
 
     private final AtomicBoolean isStarted = new AtomicBoolean(false);
 
-    private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer 
bind %s fail";
-
     public NettyRemotingServer(final NettyServerConfig serverConfig) {
         this.serverConfig = serverConfig;
         ThreadFactory bossThreadFactory =
-                new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName()
 + "BossThread_%s")
-                        .build();
+                
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + 
"BossThread_%s");
         ThreadFactory workerThreadFactory =
-                new ThreadFactoryBuilder().setDaemon(true)
-                        .setNameFormat(serverConfig.getServerName() + 
"WorkerThread_%s").build();
+                
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() + 
"WorkerThread_%s");
         if (Epoll.isAvailable()) {
             this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
             this.workGroup = new 
EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index a98362209d..b4978172f1 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends 
ChannelInboundHandlerAdapter {
                 channel.writeAndFlush(response);
                 return;
             }
-            nettyRemotingServer.getDefaultExecutor().submit(() -> {
+            nettyRemotingServer.getDefaultExecutor().execute(() -> {
                 StandardRpcResponse iRpcResponse;
                 try {
                     StandardRpcRequest standardRpcRequest =
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
deleted file mode 100644
index 19589a9698..0000000000
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.extract.base.utils;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- *  thread factory
- */
-public class NamedThreadFactory implements ThreadFactory {
-
-    private final AtomicInteger increment = new AtomicInteger(1);
-
-    /**
-     *  name
-     */
-    private final String name;
-
-    /**
-     *  count
-     */
-    private final int count;
-
-    public NamedThreadFactory(String name) {
-        this(name, 0);
-    }
-
-    public NamedThreadFactory(String name, int count) {
-        this.name = name;
-        this.count = count;
-    }
-
-    /**
-     *  create thread
-     * @param r runnable
-     * @return thread
-     */
-    @Override
-    public Thread newThread(Runnable r) {
-        final String threadName = count > 0 ? String.format("%s_%d_%d", name, 
count, increment.getAndIncrement())
-                : String.format("%s_%d", name, increment.getAndIncrement());
-        Thread t = new Thread(r, threadName);
-        t.setDaemon(true);
-        return t;
-    }
-}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 82f3bda760..7d6600cc43 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -84,6 +85,9 @@ public class MasterServer implements IStoppable {
     private MasterSlotManager masterSlotManager;
 
     public static void main(String[] args) {
+        
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+
+        
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
         Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
         SpringApplication.run(MasterServer.class);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
index 78eb0b7216..09ba1cb4ba 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
@@ -61,6 +61,12 @@ public class MasterServerMetrics {
                 .register(Metrics.globalRegistry);
     }
 
+    public static void registerUncachedException(final Supplier<Number> 
supplier) {
+        Gauge.builder("ds.master.uncached.exception", supplier)
+                .description("number of uncached exception")
+                .register(Metrics.globalRegistry);
+    }
+
     public void incMasterOverload() {
         masterOverloadCounter.increment();
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 5fc0a1990f..7c7095971f 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.registry;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
 import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.AlertDao;
 import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
 import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
-import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.registry.api.Event;
 import org.apache.dolphinscheduler.registry.api.Event.Type;
 import org.apache.dolphinscheduler.registry.api.RegistryClient;
@@ -116,8 +116,8 @@ public class ServerNodeManager implements InitializingBean {
         refreshNodesAndGroupMappings();
 
         // init executor service
-        executorService =
-                Executors.newSingleThreadScheduledExecutor(new 
NamedThreadFactory("ServerNodeManagerExecutor"));
+        executorService = Executors
+                
.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("ServerNodeManagerExecutor"));
         executorService.scheduleWithFixedDelay(
                 new WorkerNodeInfoAndGroupDbSyncTask(),
                 0,
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
index 86b711f245..e7babaa417 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
@@ -78,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends 
BaseDaemonThread implements
                             "Cannot find the taskInstance from 
TaskExecutionContextCacheManager, the task may already been killed, will stop 
the async master task");
                     continue;
                 }
-                masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() -> 
{
+                masterAsyncTaskExecutorThreadPool.getThreadPool().execute(() 
-> {
                     final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
                             
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
                     final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
index 868b66b6df..2761958cd6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
@@ -41,7 +41,7 @@ public class MasterAsyncTaskExecutorThreadPool implements 
IMasterTaskExecutorThr
     public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor 
asyncMasterTaskExecutor) {
         synchronized (MasterAsyncTaskExecutorThreadPool.class) {
             // todo: check if the thread pool is overload
-            threadPoolExecutor.submit(asyncMasterTaskExecutor);
+            threadPoolExecutor.execute(asyncMasterTaskExecutor);
             return true;
         }
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
index 3f683076e6..4ce59f1f49 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
@@ -41,7 +41,7 @@ public class MasterSyncTaskExecutorThreadPool implements 
IMasterTaskExecutorThre
     public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor 
syncMasterTaskExecutor) {
         synchronized (MasterSyncTaskExecutorThreadPool.class) {
             // todo: check if the thread pool is overload
-            threadPoolExecutor.submit(syncMasterTaskExecutor);
+            threadPoolExecutor.execute(syncMasterTaskExecutor);
             return true;
         }
     }
diff --git 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 4203516f42..e20da27bbd 100644
--- 
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -265,7 +265,7 @@ public abstract class AbstractCommandExecutor {
         // todo: remove this this thread pool.
         ExecutorService getOutputLogService = ThreadUtils
                 
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + 
taskRequest.getTaskName());
-        getOutputLogService.submit(() -> {
+        getOutputLogService.execute(() -> {
             TaskOutputParameterParser taskOutputParameterParser = new 
TaskOutputParameterParser();
             try (BufferedReader inReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
                 
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 5af0c2617e..2420ae5253 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.constants.Constants;
 import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import 
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -75,6 +76,8 @@ public class WorkerServer implements IStoppable {
      * @param args arguments
      */
     public static void main(String[] args) {
+        
WorkerServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+        
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
         Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
         SpringApplication.run(WorkerServer.class);
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
index a73ce541a3..613e7eaf28 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
@@ -127,4 +127,10 @@ public class WorkerServerMetrics {
                 .register(Metrics.globalRegistry);
     }
 
+    public static void registerUncachedException(final Supplier<Number> 
supplier) {
+        Gauge.builder("ds.worker.uncached.exception", supplier)
+                .description("number of uncached exception")
+                .register(Metrics.globalRegistry);
+    }
+
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
index 0e4bb98080..4606d9f7e9 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
@@ -49,7 +49,7 @@ public class WorkerTaskExecutorThreadPool {
         synchronized (WorkerTaskExecutorThreadPool.class) {
             if 
(TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy()))
 {
                 WorkerTaskExecutorHolder.put(workerTaskExecutor);
-                threadPoolExecutor.submit(workerTaskExecutor);
+                threadPoolExecutor.execute(workerTaskExecutor);
                 return true;
             }
             if (isOverload()) {
@@ -58,7 +58,7 @@ public class WorkerTaskExecutorThreadPool {
                 return false;
             }
             WorkerTaskExecutorHolder.put(workerTaskExecutor);
-            threadPoolExecutor.submit(workerTaskExecutor);
+            threadPoolExecutor.execute(workerTaskExecutor);
             return true;
         }
     }


Reply via email to