This is an automated email from the ASF dual-hosted git repository.
changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 64e1e67581 Use DefaultUncaughtExceptionHandler to log the uncached
exception (#15496)
64e1e67581 is described below
commit 64e1e675818c1db0dbba9f35b30f32c0ad129021
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Jan 18 14:59:17 2024 +0800
Use DefaultUncaughtExceptionHandler to log the uncached exception (#15496)
Co-authored-by: fuchanghai <[email protected]>
---
.../apache/dolphinscheduler/alert/AlertServer.java | 4 ++
.../alert/metrics/AlertServerMetrics.java | 6 +++
.../dolphinscheduler/api/ApiApplicationServer.java | 4 ++
.../api/metrics/ApiServerMetrics.java | 8 +++
.../common/thread/BaseDaemonThread.java | 2 +
...d.java => DefaultUncaughtExceptionHandler.java} | 33 ++++++++----
.../common/thread/ThreadUtils.java | 20 +++----
.../common/thread/ThreadUtilsTest.java} | 27 +++++-----
.../extract/base/NettyClientHandler.java | 2 +-
.../extract/base/NettyRemotingClient.java | 16 +++---
.../extract/base/NettyRemotingServer.java | 17 ++----
.../base/server/JdkDynamicServerHandler.java | 2 +-
.../extract/base/utils/NamedThreadFactory.java | 61 ----------------------
.../server/master/MasterServer.java | 4 ++
.../server/master/metrics/MasterServerMetrics.java | 6 +++
.../server/master/registry/ServerNodeManager.java | 6 +--
.../execute/AsyncMasterTaskDelayQueueLooper.java | 2 +-
.../execute/MasterAsyncTaskExecutorThreadPool.java | 2 +-
.../execute/MasterSyncTaskExecutorThreadPool.java | 2 +-
.../plugin/task/api/AbstractCommandExecutor.java | 2 +-
.../server/worker/WorkerServer.java | 3 ++
.../server/worker/metrics/WorkerServerMetrics.java | 6 +++
.../runner/WorkerTaskExecutorThreadPool.java | 4 +-
23 files changed, 111 insertions(+), 128 deletions(-)
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
index aeb2baf7a2..fd3d4b02e3 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.alert;
+import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
@@ -24,6 +25,7 @@ import
org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.ListenerEventPostService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import javax.annotation.PreDestroy;
@@ -54,6 +56,8 @@ public class AlertServer {
private AlertRegistryClient alertRegistryClient;
public static void main(String[] args) {
+
AlertServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
new SpringApplicationBuilder(AlertServer.class).run(args);
}
diff --git
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
index 4784aa3b62..db75a49371 100644
---
a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
+++
b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/metrics/AlertServerMetrics.java
@@ -45,6 +45,12 @@ public class AlertServerMetrics {
.register(Metrics.globalRegistry);
}
+ public static void registerUncachedException(final Supplier<Number>
supplier) {
+ Gauge.builder("ds.alert.uncached.exception", supplier)
+ .description("number of uncached exception")
+ .register(Metrics.globalRegistry);
+ }
+
public void incAlertSuccessCount() {
alertSuccessCounter.increment();
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 2bb9b7cf51..c7e6d9778f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.api;
+import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
import org.apache.dolphinscheduler.common.enums.PluginType;
+import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
@@ -51,6 +53,8 @@ public class ApiApplicationServer {
private PluginDao pluginDao;
public static void main(String[] args) {
+
ApiServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
SpringApplication.run(ApiApplicationServer.class);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
index 08f114dc2e..f97f9afffa 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/metrics/ApiServerMetrics.java
@@ -18,10 +18,12 @@
package org.apache.dolphinscheduler.api.metrics;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import lombok.experimental.UtilityClass;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
@@ -120,4 +122,10 @@ public class ApiServerMetrics {
"ds.api.response.time",
"user.id", String.valueOf(userId)));
}
+
+ public static void registerUncachedException(final Supplier<Number>
supplier) {
+ Gauge.builder("ds.api.uncached.exception", supplier)
+ .description("number of uncached exception")
+ .register(Metrics.globalRegistry);
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
index 88a44004cb..b495bb7c56 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
@@ -25,12 +25,14 @@ public abstract class BaseDaemonThread extends Thread {
protected BaseDaemonThread(Runnable runnable) {
super(runnable);
this.setDaemon(true);
+
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
}
protected BaseDaemonThread(String threadName) {
super();
this.setName(threadName);
this.setDaemon(true);
+
this.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
}
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
similarity index 52%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
copy to
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
index 88a44004cb..62a0fd6911 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/DefaultUncaughtExceptionHandler.java
@@ -17,20 +17,31 @@
package org.apache.dolphinscheduler.common.thread;
-/**
- * All thread used in DolphinScheduler should extend with this class to avoid
the server hang issue.
- */
-public abstract class BaseDaemonThread extends Thread {
+import java.util.concurrent.atomic.LongAdder;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DefaultUncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
+
+ private static final DefaultUncaughtExceptionHandler INSTANCE = new
DefaultUncaughtExceptionHandler();
- protected BaseDaemonThread(Runnable runnable) {
- super(runnable);
- this.setDaemon(true);
+ private static final LongAdder uncaughtExceptionCount = new LongAdder();
+
+ private DefaultUncaughtExceptionHandler() {
+ }
+
+ public static DefaultUncaughtExceptionHandler getInstance() {
+ return INSTANCE;
}
- protected BaseDaemonThread(String threadName) {
- super();
- this.setName(threadName);
- this.setDaemon(true);
+ public static long getUncaughtExceptionCount() {
+ return uncaughtExceptionCount.longValue();
}
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ uncaughtExceptionCount.add(1);
+ log.error("Caught an exception in {}.", t, e);
+ }
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 5eef04ed82..c4271919b5 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -31,24 +31,20 @@ import
com.google.common.util.concurrent.ThreadFactoryBuilder;
@Slf4j
public class ThreadUtils {
- /**
- * Wrapper over newDaemonFixedThreadExecutor.
- *
- * @param threadName threadName
- * @param threadsNum threadsNum
- * @return ExecutorService
- */
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String
threadName, int threadsNum) {
- ThreadFactory threadFactory = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
- return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum,
threadFactory);
+ return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum,
newDaemonThreadFactory(threadName));
}
public static ScheduledExecutorService
newSingleDaemonScheduledExecutorService(String threadName) {
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat(threadName)
+ return
Executors.newSingleThreadScheduledExecutor(newDaemonThreadFactory(threadName));
+ }
+
+ public static ThreadFactory newDaemonThreadFactory(String threadName) {
+ return new ThreadFactoryBuilder()
.setDaemon(true)
+ .setNameFormat(threadName)
+
.setUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance())
.build();
- return Executors.newSingleThreadScheduledExecutor(threadFactory);
}
/**
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
similarity index 59%
copy from
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
copy to
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
index 88a44004cb..7d7b2c0ac6 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/BaseDaemonThread.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/thread/ThreadUtilsTest.java
@@ -17,20 +17,21 @@
package org.apache.dolphinscheduler.common.thread;
-/**
- * All thread used in DolphinScheduler should extend with this class to avoid
the server hang issue.
- */
-public abstract class BaseDaemonThread extends Thread {
+import java.util.concurrent.ThreadPoolExecutor;
- protected BaseDaemonThread(Runnable runnable) {
- super(runnable);
- this.setDaemon(true);
- }
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
- protected BaseDaemonThread(String threadName) {
- super();
- this.setName(threadName);
- this.setDaemon(true);
- }
+class ThreadUtilsTest {
+ @Test
+ void newDaemonFixedThreadExecutor() throws InterruptedException {
+ ThreadPoolExecutor threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("DemonThread", 1);
+ threadPoolExecutor.execute(() -> {
+ throw new IllegalArgumentException("I am an exception");
+ });
+ Thread.sleep(1_000);
+ Assertions.assertEquals(1,
DefaultUncaughtExceptionHandler.getUncaughtExceptionCount());
+
+ }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
index 2b49b2fec2..b0d998af83 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyClientHandler.java
@@ -67,7 +67,7 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
future.release();
if (future.getInvokeCallback() != null) {
future.removeFuture();
- this.callbackExecutor.submit(future::executeInvokeCallback);
+ this.callbackExecutor.execute(future::executeInvokeCallback);
} else {
future.putResponse(deserialize);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
index 1ca87091af..e4682f5224 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingClient.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.extract.base;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
@@ -30,7 +31,6 @@ import
org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
import
org.apache.dolphinscheduler.extract.base.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
-import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.net.InetSocketAddress;
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -80,12 +81,11 @@ public class NettyRemotingClient implements AutoCloseable {
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
+ ThreadFactory nettyClientThreadFactory =
ThreadUtils.newDaemonThreadFactory("NettyClientThread-");
if (Epoll.isAvailable()) {
- this.workerGroup =
- new EpollEventLoopGroup(clientConfig.getWorkerThreads(),
new NamedThreadFactory("NettyClient"));
+ this.workerGroup = new
EpollEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
} else {
- this.workerGroup =
- new NioEventLoopGroup(clientConfig.getWorkerThreads(), new
NamedThreadFactory("NettyClient"));
+ this.workerGroup = new
NioEventLoopGroup(clientConfig.getWorkerThreads(), nettyClientThreadFactory);
}
this.callbackExecutor = new ThreadPoolExecutor(
Constants.CPUS,
@@ -93,12 +93,12 @@ public class NettyRemotingClient implements AutoCloseable {
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
- new NamedThreadFactory("CallbackExecutor"),
+
ThreadUtils.newDaemonThreadFactory("NettyClientCallbackThread-"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
- this.responseFutureExecutor =
- Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("ResponseFutureExecutor"));
+ this.responseFutureExecutor =
Executors.newSingleThreadScheduledExecutor(
+
ThreadUtils.newDaemonThreadFactory("NettyClientResponseFutureThread-"));
this.start();
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
index 7655b804fe..365a17dd03 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/NettyRemotingServer.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.extract.base;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
@@ -27,15 +28,11 @@ import
org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
@@ -55,8 +52,8 @@ public class NettyRemotingServer {
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
- private final ExecutorService defaultExecutor =
-
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
+ private final ExecutorService defaultExecutor = ThreadUtils
+ .newDaemonFixedThreadExecutor("NettyRemotingServerThread",
Runtime.getRuntime().availableProcessors() * 2);
private final EventLoopGroup bossGroup;
@@ -68,16 +65,12 @@ public class NettyRemotingServer {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
- private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer
bind %s fail";
-
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
ThreadFactory bossThreadFactory =
- new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(serverConfig.getServerName()
+ "BossThread_%s")
- .build();
+
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() +
"BossThread_%s");
ThreadFactory workerThreadFactory =
- new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat(serverConfig.getServerName() +
"WorkerThread_%s").build();
+
ThreadUtils.newDaemonThreadFactory(serverConfig.getServerName() +
"WorkerThread_%s");
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new
EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index a98362209d..b4978172f1 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -90,7 +90,7 @@ public class JdkDynamicServerHandler extends
ChannelInboundHandlerAdapter {
channel.writeAndFlush(response);
return;
}
- nettyRemotingServer.getDefaultExecutor().submit(() -> {
+ nettyRemotingServer.getDefaultExecutor().execute(() -> {
StandardRpcResponse iRpcResponse;
try {
StandardRpcRequest standardRpcRequest =
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
deleted file mode 100644
index 19589a9698..0000000000
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/NamedThreadFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.extract.base.utils;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * thread factory
- */
-public class NamedThreadFactory implements ThreadFactory {
-
- private final AtomicInteger increment = new AtomicInteger(1);
-
- /**
- * name
- */
- private final String name;
-
- /**
- * count
- */
- private final int count;
-
- public NamedThreadFactory(String name) {
- this(name, 0);
- }
-
- public NamedThreadFactory(String name, int count) {
- this.name = name;
- this.count = count;
- }
-
- /**
- * create thread
- * @param r runnable
- * @return thread
- */
- @Override
- public Thread newThread(Runnable r) {
- final String threadName = count > 0 ? String.format("%s_%d_%d", name,
count, increment.getAndIncrement())
- : String.format("%s_%d", name, increment.getAndIncrement());
- Thread t = new Thread(r, threadName);
- t.setDaemon(true);
- return t;
- }
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 82f3bda760..7d6600cc43 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -84,6 +85,9 @@ public class MasterServer implements IStoppable {
private MasterSlotManager masterSlotManager;
public static void main(String[] args) {
+
MasterServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+
+
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER);
SpringApplication.run(MasterServer.class);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
index 78eb0b7216..09ba1cb4ba 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/MasterServerMetrics.java
@@ -61,6 +61,12 @@ public class MasterServerMetrics {
.register(Metrics.globalRegistry);
}
+ public static void registerUncachedException(final Supplier<Number>
supplier) {
+ Gauge.builder("ds.master.uncached.exception", supplier)
+ .description("number of uncached exception")
+ .register(Metrics.globalRegistry);
+ }
+
public void incMasterOverload() {
masterOverloadCounter.increment();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 5fc0a1990f..7c7095971f 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -20,11 +20,11 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
-import org.apache.dolphinscheduler.extract.base.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Event.Type;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
@@ -116,8 +116,8 @@ public class ServerNodeManager implements InitializingBean {
refreshNodesAndGroupMappings();
// init executor service
- executorService =
- Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("ServerNodeManagerExecutor"));
+ executorService = Executors
+
.newSingleThreadScheduledExecutor(ThreadUtils.newDaemonThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(
new WorkerNodeInfoAndGroupDbSyncTask(),
0,
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
index 86b711f245..e7babaa417 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
@@ -78,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends
BaseDaemonThread implements
"Cannot find the taskInstance from
TaskExecutionContextCacheManager, the task may already been killed, will stop
the async master task");
continue;
}
- masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() ->
{
+ masterAsyncTaskExecutorThreadPool.getThreadPool().execute(()
-> {
final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
index 868b66b6df..2761958cd6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
@@ -41,7 +41,7 @@ public class MasterAsyncTaskExecutorThreadPool implements
IMasterTaskExecutorThr
public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor
asyncMasterTaskExecutor) {
synchronized (MasterAsyncTaskExecutorThreadPool.class) {
// todo: check if the thread pool is overload
- threadPoolExecutor.submit(asyncMasterTaskExecutor);
+ threadPoolExecutor.execute(asyncMasterTaskExecutor);
return true;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
index 3f683076e6..4ce59f1f49 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
@@ -41,7 +41,7 @@ public class MasterSyncTaskExecutorThreadPool implements
IMasterTaskExecutorThre
public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor
syncMasterTaskExecutor) {
synchronized (MasterSyncTaskExecutorThreadPool.class) {
// todo: check if the thread pool is overload
- threadPoolExecutor.submit(syncMasterTaskExecutor);
+ threadPoolExecutor.execute(syncMasterTaskExecutor);
return true;
}
}
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
index 4203516f42..e20da27bbd 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
@@ -265,7 +265,7 @@ public abstract class AbstractCommandExecutor {
// todo: remove this this thread pool.
ExecutorService getOutputLogService = ThreadUtils
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" +
taskRequest.getTaskName());
- getOutputLogService.submit(() -> {
+ getOutputLogService.execute(() -> {
TaskOutputParameterParser taskOutputParameterParser = new
TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new
InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 5af0c2617e..2420ae5253 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
+import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
@@ -75,6 +76,8 @@ public class WorkerServer implements IStoppable {
* @param args arguments
*/
public static void main(String[] args) {
+
WorkerServerMetrics.registerUncachedException(DefaultUncaughtExceptionHandler::getUncaughtExceptionCount);
+
Thread.setDefaultUncaughtExceptionHandler(DefaultUncaughtExceptionHandler.getInstance());
Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER);
SpringApplication.run(WorkerServer.class);
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
index a73ce541a3..613e7eaf28 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java
@@ -127,4 +127,10 @@ public class WorkerServerMetrics {
.register(Metrics.globalRegistry);
}
+ public static void registerUncachedException(final Supplier<Number>
supplier) {
+ Gauge.builder("ds.worker.uncached.exception", supplier)
+ .description("number of uncached exception")
+ .register(Metrics.globalRegistry);
+ }
+
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
index 0e4bb98080..4606d9f7e9 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
@@ -49,7 +49,7 @@ public class WorkerTaskExecutorThreadPool {
synchronized (WorkerTaskExecutorThreadPool.class) {
if
(TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy()))
{
WorkerTaskExecutorHolder.put(workerTaskExecutor);
- threadPoolExecutor.submit(workerTaskExecutor);
+ threadPoolExecutor.execute(workerTaskExecutor);
return true;
}
if (isOverload()) {
@@ -58,7 +58,7 @@ public class WorkerTaskExecutorThreadPool {
return false;
}
WorkerTaskExecutorHolder.put(workerTaskExecutor);
- threadPoolExecutor.submit(workerTaskExecutor);
+ threadPoolExecutor.execute(workerTaskExecutor);
return true;
}
}