This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2be1d4bf0a Fix worker cannot shutdown due to resource close failed or
heart beat check failed (#10979)
2be1d4bf0a is described below
commit 2be1d4bf0add2485d4bf322aef7bbaeabfc223de
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 15 20:06:53 2022 +0800
Fix worker cannot shutdown due to resource close failed or heart beat check
failed (#10979)
* Use try-with-resource to close resource, and add heart error threshold to
avoid worker cannot close due to heart beat check failed
* Move heartbeat error threshold to applicaiton.yml
---
.../apache/dolphinscheduler/common/Constants.java | 1 +
.../common/thread/ThreadUtils.java | 14 +++++---
.../server/master/MasterServer.java | 38 +++++++++-----------
.../server/master/config/MasterConfig.java | 7 ++++
.../master/registry/MasterRegistryClient.java | 16 +++++----
.../master/runner/MasterSchedulerBootstrap.java | 3 +-
.../src/main/resources/application.yaml | 2 ++
.../server/registry/HeartBeatTask.java | 19 ++++++++--
.../service/bean/SpringApplicationContext.java | 3 +-
.../apache/dolphinscheduler/StandaloneServer.java | 32 +++++++++++++++--
.../src/main/resources/application.yaml | 4 +++
.../server/worker/WorkerServer.java | 41 +++++++---------------
.../server/worker/config/WorkerConfig.java | 7 ++++
.../worker/registry/WorkerRegistryClient.java | 36 +++++++++++--------
.../src/main/resources/application.yaml | 2 ++
15 files changed, 142 insertions(+), 83 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 099620778e..7a84e95a13 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -50,6 +50,7 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS
= "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS
= "/lock/failover/workers";
public static final String
REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS =
"/lock/failover/startup-masters";
+
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
public static final String FORMAT_S_S_COLON = "%s:%s";
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 5c8020b7cd..f4f2a17bc7 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.experimental.UtilityClass;
@UtilityClass
public class ThreadUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ThreadUtils.class);
+
/**
* Wrapper over newDaemonFixedThreadExecutor.
*
@@ -35,10 +41,7 @@ public class ThreadUtils {
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String
threadName, int threadsNum) {
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(threadName)
- .build();
+ ThreadFactory threadFactory = new
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
}
@@ -48,8 +51,9 @@ public class ThreadUtils {
public static void sleep(final long millis) {
try {
Thread.sleep(millis);
- } catch (final InterruptedException ignore) {
+ } catch (final InterruptedException interruptedException) {
Thread.currentThread().interrupt();
+ logger.error("Current thread sleep error", interruptedException);
}
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3469cd4852..0bf3c945e7 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -115,32 +115,28 @@ public class MasterServer implements IStoppable {
* @param cause close cause
*/
public void close(String cause) {
-
- try {
- // set stop signal is true
- // execute only once
- if (!Stopper.stop()) {
- logger.warn("MasterServer is already stopped, current cause:
{}", cause);
- return;
- }
+ // set stop signal is true
+ // execute only once
+ if (!Stopper.stop()) {
+ logger.warn("MasterServer is already stopped, current cause: {}",
cause);
+ return;
+ }
+ // thread sleep 3 seconds for thread quietly stop
+ ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
+ try (SchedulerApi closedSchedulerApi = schedulerApi;
+ MasterSchedulerBootstrap closedSchedulerBootstrap =
masterSchedulerBootstrap;
+ MasterRPCServer closedRpcServer = masterRPCServer;
+ MasterRegistryClient closedMasterRegistryClient =
masterRegistryClient;
+ // close spring Context and will invoke method with @PreDestroy
annotation to destroy beans.
+ // like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+ SpringApplicationContext closedSpringContext =
springApplicationContext) {
logger.info("Master server is stopping, current cause : {}",
cause);
-
- // thread sleep 3 seconds for thread quietly stop
- ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
- // close
- this.schedulerApi.close();
- this.masterSchedulerBootstrap.close();
- this.masterRPCServer.close();
- this.masterRegistryClient.closeRegistry();
- // close spring Context and will invoke method with @PreDestroy
annotation to destroy beans.
- // like
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
- springApplicationContext.close();
-
- logger.info("MasterServer stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("MasterServer stop failed, current cause: {}", cause,
e);
+ return;
}
+ logger.info("MasterServer stopped, current cause: {}", cause);
}
@Override
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index fe51e20984..7f6f124164 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -67,6 +67,10 @@ public class MasterConfig implements Validator {
* Master heart beat task execute interval.
*/
private Duration heartbeatInterval = Duration.ofSeconds(10);
+ /**
+ * Master heart beat task error threshold, if the continuous error count
exceed this count, the master will close.
+ */
+ private int heartbeatErrorThreshold = 5;
/**
* task submit max retry times.
*/
@@ -129,6 +133,9 @@ public class MasterConfig implements Validator {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
+ if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
+ errors.rejectValue("heartbeat-error-threshold", null, "should be a
positive value");
+ }
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 359fdc745e..aa536bae49 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -53,7 +53,7 @@ import com.google.common.collect.Sets;
* <p>When the Master node startup, it will register in registry center. And
schedule a {@link HeartBeatTask} to update its metadata in registry.
*/
@Component
-public class MasterRegistryClient {
+public class MasterRegistryClient implements AutoCloseable {
/**
* logger
@@ -107,7 +107,8 @@ public class MasterRegistryClient {
registryClient.setStoppable(stoppable);
}
- public void closeRegistry() {
+ @Override
+ public void close() {
// TODO unsubscribe MasterRegistryDataListener
deregister();
}
@@ -189,11 +190,12 @@ public class MasterRegistryClient {
String localNodePath = getCurrentNodePath();
Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- masterConfig.getMaxCpuLoadAvg(),
- masterConfig.getReservedMemory(),
- Sets.newHashSet(localNodePath),
- Constants.MASTER_TYPE,
- registryClient);
+
masterConfig.getMaxCpuLoadAvg(),
+
masterConfig.getReservedMemory(),
+
Sets.newHashSet(localNodePath),
+ Constants.MASTER_TYPE,
+ registryClient,
+
masterConfig.getHeartbeatErrorThreshold());
// remove before persist
registryClient.remove(localNodePath);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 870cc1b24c..aa0542cb7c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -58,7 +58,7 @@ import org.springframework.stereotype.Service;
* Master scheduler thread, this thread will consume the commands from
database and trigger processInstance executed.
*/
@Service
-public class MasterSchedulerBootstrap extends BaseDaemonThread {
+public class MasterSchedulerBootstrap extends BaseDaemonThread implements
AutoCloseable {
private static final Logger logger =
LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
@@ -116,6 +116,7 @@ public class MasterSchedulerBootstrap extends
BaseDaemonThread {
logger.info("Master schedule bootstrap started...");
}
+ @Override
public void close() {
logger.info("Master schedule bootstrap stopping...");
logger.info("Master schedule bootstrap stopped...");
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml
b/dolphinscheduler-master/src/main/resources/application.yaml
index 7a9fa8fcde..9f191ccd25 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -98,6 +98,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
+ # Master heart beat task error threshold, if the continuous error count
exceed this count, the master will close.
+ heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 129aaff0de..c84abb4182 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable {
private final String serverType;
private final HeartBeat heartBeat;
+ private final int heartBeatErrorThreshold;
+
+ private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
+
public HeartBeatTask(long startupTime,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
- RegistryClient registryClient) {
+ RegistryClient registryClient,
+ int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory);
+ this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public HeartBeatTask(long startupTime,
@@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable {
String serverType,
RegistryClient registryClient,
int workerThreadCount,
- int workerWaitingTaskCount
- ) {
+ int workerWaitingTaskCount,
+ int heartBeatErrorThreshold) {
this.heartBeatPaths = heartBeatPaths;
this.registryClient = registryClient;
this.workerWaitingTaskCount = workerWaitingTaskCount;
this.serverType = serverType;
this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg,
reservedMemory, hostWeight, workerThreadCount);
+ this.heartBeatErrorThreshold = heartBeatErrorThreshold;
}
public String getHeartBeatInfo() {
@@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable {
for (String heartBeatPath : heartBeatPaths) {
registryClient.persistEphemeral(heartBeatPath,
heartBeat.encodeHeartBeat());
}
+ heartBeatErrorTimes.set(0);
} catch (Throwable ex) {
logger.error("HeartBeat task execute failed", ex);
+ if (heartBeatErrorTimes.incrementAndGet() >=
heartBeatErrorThreshold) {
+ registryClient.getStoppable()
+ .stop("HeartBeat task connect to zk failed too
much times: " + heartBeatErrorTimes);
+ }
}
}
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index 61dfcb35d7..5b37b1f72d 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -24,7 +24,7 @@ import
org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;
@Component
-public class SpringApplicationContext implements ApplicationContextAware {
+public class SpringApplicationContext implements ApplicationContextAware,
AutoCloseable {
private static ApplicationContext applicationContext;
@@ -36,6 +36,7 @@ public class SpringApplicationContext implements
ApplicationContextAware {
/**
* Close this application context, destroying all beans in its bean
factory.
*/
+ @Override
public void close() {
((AbstractApplicationContext)applicationContext).close();
}
diff --git
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
index 3866c53066..728fed8eaa 100644
---
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
+++
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
@@ -19,15 +19,41 @@ package org.apache.dolphinscheduler;
import org.apache.curator.test.TestingServer;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationFailedEvent;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
+
+import lombok.NonNull;
@SpringBootApplication
-public class StandaloneServer {
+public class StandaloneServer implements ApplicationListener<ApplicationEvent>
{
+
+ private static final Logger logger =
LoggerFactory.getLogger(StandaloneServer.class);
+
+ private static TestingServer zookeeperServer;
public static void main(String[] args) throws Exception {
- final TestingServer server = new TestingServer(true);
- System.setProperty("registry.zookeeper.connect-string",
server.getConnectString());
+ zookeeperServer = new TestingServer(true);
+ System.setProperty("registry.zookeeper.connect-string",
zookeeperServer.getConnectString());
SpringApplication.run(StandaloneServer.class, args);
}
+
+ @Override
+ public void onApplicationEvent(@NonNull ApplicationEvent event) {
+ if (event instanceof ApplicationFailedEvent || event instanceof
ContextClosedEvent) {
+ try (TestingServer closedServer = zookeeperServer) {
+ // close the zookeeper server
+ logger.info("Receive spring context close event: {}, will
closed zookeeper server", event);
+ } catch (IOException e) {
+ logger.error("Close zookeeper server error", e);
+ }
+ }
+ }
}
diff --git
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 6c5171e59d..5640467616 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -133,6 +133,8 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
+ # Master heart beat task error threshold, if the continuous error count
exceed this count, the master will close.
+ heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
@@ -154,6 +156,8 @@ worker:
exec-threads: 10
# worker heartbeat interval
heartbeat-interval: 10s
+ # Worker heart beat task error threshold, if the continuous error count
exceed this count, the worker will close.
+ heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker
to submit the job. If system does not have this user, it will be automatically
created after the parameter worker.tenant.auto.create is true.
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index d97a994249..d03a2a951b 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,7 +37,6 @@ import
org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections4.CollectionUtils;
import java.util.Collection;
-import java.util.Set;
import javax.annotation.PostConstruct;
@@ -121,8 +120,7 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.registry();
this.workerRegistryClient.setRegistryStoppable(this);
- Set<String> workerZkPaths =
this.workerRegistryClient.getWorkerZkPaths();
- this.workerRegistryClient.handleDeadServer(workerZkPaths,
NodeType.WORKER, Constants.DELETE_OP);
+ this.workerRegistryClient.handleDeadServer();
this.workerManagerThread.start();
@@ -139,37 +137,24 @@ public class WorkerServer implements IStoppable {
}
public void close(String cause) {
- try {
- // execute only once
- // set stop signal is true
- if (!Stopper.stop()) {
- logger.warn("WorkerServer is already stopped, current cause:
{}", cause);
- return;
- }
+ if (!Stopper.stop()) {
+ logger.warn("WorkerServer is already stopped, current cause: {}",
cause);
+ return;
+ }
+ ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
+ try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
+ WorkerRegistryClient closedRegistryClient = workerRegistryClient;
+ AlertClientService closedAlertClientService = alertClientService;
+ SpringApplicationContext closedSpringContext =
springApplicationContext;) {
logger.info("Worker server is stopping, current cause : {}",
cause);
-
- try {
- // thread sleep 3 seconds for thread quitely stop
- Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
- } catch (Exception e) {
- logger.warn("Worker server close wait error", e);
- }
-
- // close
- this.workerRpcServer.close();
- this.workerRegistryClient.unRegistry();
- this.alertClientService.close();
-
// kill running tasks
this.killAllRunningTasks();
-
- // close the application context
- this.springApplicationContext.close();
- logger.info("Worker server stopped, current cause: {}", cause);
} catch (Exception e) {
logger.error("Worker server stop failed, current cause: {}",
cause, e);
+ return;
}
+ logger.info("Worker server stopped, current cause: {}", cause);
}
@Override
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index aab162d6d4..2367975715 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,6 +40,10 @@ public class WorkerConfig implements Validator {
private int listenPort = 1234;
private int execThreads = 10;
private Duration heartbeatInterval = Duration.ofSeconds(10);
+ /**
+ * Worker heart beat task error threshold, if the continuous error count
exceed this count, the worker will close.
+ */
+ private int heartbeatErrorThreshold = 5;
private int hostWeight = 100;
private boolean tenantAutoCreate = true;
private boolean tenantDistributedUser = false;
@@ -70,6 +74,9 @@ public class WorkerConfig implements Validator {
if (workerConfig.getMaxCpuLoadAvg() <= 0) {
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
+ if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
+ errors.rejectValue("heartbeat-error-threshold", null, "should be a
positive value");
+ }
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
}
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index b94cdaf317..96d41c38c0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -54,7 +54,7 @@ import com.google.common.collect.Sets;
* worker registry
*/
@Service
-public class WorkerRegistryClient {
+public class WorkerRegistryClient implements AutoCloseable {
private final Logger logger =
LoggerFactory.getLogger(WorkerRegistryClient.class);
@@ -101,15 +101,15 @@ public class WorkerRegistryClient {
long workerHeartbeatInterval =
workerConfig.getHeartbeatInterval().getSeconds();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
- workerConfig.getMaxCpuLoadAvg(),
- workerConfig.getReservedMemory(),
- workerConfig.getHostWeight(),
- workerZkPaths,
- Constants.WORKER_TYPE,
- registryClient,
- workerConfig.getExecThreads(),
- workerManagerThread.getThreadPoolQueueSize()
- );
+
workerConfig.getMaxCpuLoadAvg(),
+
workerConfig.getReservedMemory(),
+
workerConfig.getHostWeight(),
+ workerZkPaths,
+ Constants.WORKER_TYPE,
+ registryClient,
+
workerConfig.getExecThreads(),
+
workerManagerThread.getThreadPoolQueueSize(),
+
workerConfig.getHeartbeatErrorThreshold());
for (String workerZKPath : workerZkPaths) {
// remove before persist
@@ -147,8 +147,10 @@ public class WorkerRegistryClient {
logger.error("remove worker zk path exception", ex);
}
- this.heartBeatExecutor.shutdownNow();
- logger.info("heartbeat executor shutdown");
+ if (heartBeatExecutor != null) {
+ heartBeatExecutor.shutdownNow();
+ logger.info("Heartbeat executor shutdown");
+ }
registryClient.close();
logger.info("registry client closed");
@@ -175,8 +177,9 @@ public class WorkerRegistryClient {
return workerPaths;
}
- public void handleDeadServer(Set<String> nodeSet, NodeType nodeType,
String opType) {
- registryClient.handleDeadServer(nodeSet, nodeType, opType);
+ public void handleDeadServer() {
+ Set<String> workerZkPaths = getWorkerZkPaths();
+ registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER,
Constants.DELETE_OP);
}
/**
@@ -190,4 +193,9 @@ public class WorkerRegistryClient {
registryClient.setStoppable(stoppable);
}
+ @Override
+ public void close() throws IOException {
+ unRegistry();
+ }
+
}
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml
b/dolphinscheduler-worker/src/main/resources/application.yaml
index ad390ed6af..a9c3eadf3c 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -60,6 +60,8 @@ worker:
exec-threads: 100
# worker heartbeat interval
heartbeat-interval: 10s
+ # Worker heart beat task error threshold, if the continuous error count
exceed this count, the worker will close.
+ heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker
to submit the job. If system does not have this user, it will be automatically
created after the parameter worker.tenant.auto.create is true.