This is an automated email from the ASF dual-hosted git repository. wenjun pushed a commit to branch 3.0.0-prepare in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit f3250bf5fa6c33cacb92bc31cdcb1d734e8cc27b Author: Wenjun Ruan <[email protected]> AuthorDate: Fri Jul 15 20:06:53 2022 +0800 Fix worker cannot shutdown due to resource close failed or heart beat check failed (#10979) * Use try-with-resource to close resource, and add heart error threshold to avoid worker cannot close due to heart beat check failed * Move heartbeat error threshold to applicaiton.yml (cherry picked from commit 2be1d4bf0add2485d4bf322aef7bbaeabfc223de) --- .../apache/dolphinscheduler/common/Constants.java | 1 + .../common/thread/ThreadUtils.java | 14 +++++--- .../server/master/MasterServer.java | 36 +++++++++---------- .../server/master/config/MasterConfig.java | 7 ++++ .../master/registry/MasterRegistryClient.java | 8 +++-- .../master/runner/MasterSchedulerBootstrap.java | 3 +- .../src/main/resources/application.yaml | 2 ++ .../server/registry/HeartBeatTask.java | 19 ++++++++-- .../service/bean/SpringApplicationContext.java | 3 +- .../apache/dolphinscheduler/StandaloneServer.java | 32 +++++++++++++++-- .../src/main/resources/application.yaml | 4 +++ .../server/worker/WorkerServer.java | 41 +++++++--------------- .../server/worker/config/WorkerConfig.java | 7 ++++ .../worker/registry/WorkerRegistryClient.java | 36 +++++++++++-------- .../src/main/resources/application.yaml | 2 ++ 15 files changed, 137 insertions(+), 78 deletions(-) diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 5b813b22c7..b4397f007a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -50,6 +50,7 @@ public final class Constants { public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers"; public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; + public static final String FORMAT_SS = "%s%s"; public static final String FORMAT_S_S = "%s/%s"; public static final String FOLDER_SEPARATOR = "/"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 5c8020b7cd..f4f2a17bc7 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.experimental.UtilityClass; @UtilityClass public class ThreadUtils { + + private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class); + /** * Wrapper over newDaemonFixedThreadExecutor. * @@ -35,10 +41,7 @@ public class ThreadUtils { * @return ExecutorService */ public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) { - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(threadName) - .build(); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build(); return Executors.newFixedThreadPool(threadsNum, threadFactory); } @@ -48,8 +51,9 @@ public class ThreadUtils { public static void sleep(final long millis) { try { Thread.sleep(millis); - } catch (final InterruptedException ignore) { + } catch (final InterruptedException interruptedException) { Thread.currentThread().interrupt(); + logger.error("Current thread sleep error", interruptedException); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 66b258d5b2..6711ff3289 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -115,31 +115,27 @@ public class MasterServer implements IStoppable { * @param cause close cause */ public void close(String cause) { - - try { - // set stop signal is true - // execute only once - if (!Stopper.stop()) { - logger.warn("MasterServer is already stopped, current cause: {}", cause); - return; - } + // set stop signal is true + // execute only once + if (!Stopper.stop()) { + logger.warn("MasterServer is already stopped, current cause: {}", cause); + return; + } + // thread sleep 3 seconds for thread quietly stop + ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); + try (MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap; + MasterRPCServer closedRpcServer = masterRPCServer; + MasterRegistryClient closedMasterRegistryClient = masterRegistryClient; + // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. + // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc + SpringApplicationContext closedSpringContext = springApplicationContext) { logger.info("Master server is stopping, current cause : {}", cause); - - // thread sleep 3 seconds for thread quietly stop - ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); - // close - this.masterSchedulerBootstrap.close(); - this.masterRPCServer.close(); - this.masterRegistryClient.closeRegistry(); - // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. - // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc - springApplicationContext.close(); - - logger.info("MasterServer stopped, current cause: {}", cause); } catch (Exception e) { logger.error("MasterServer stop failed, current cause: {}", cause, e); + return; } + logger.info("MasterServer stopped, current cause: {}", cause); } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index fe51e20984..7f6f124164 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -67,6 +67,10 @@ public class MasterConfig implements Validator { * Master heart beat task execute interval. */ private Duration heartbeatInterval = Duration.ofSeconds(10); + /** + * Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. + */ + private int heartbeatErrorThreshold = 5; /** * task submit max retry times. */ @@ -129,6 +133,9 @@ public class MasterConfig implements Validator { if (masterConfig.getMaxCpuLoadAvg() <= 0) { masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); } + if (masterConfig.getHeartbeatErrorThreshold() <= 0) { + errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value"); + } masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort())); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 486c360117..a59b712828 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -53,7 +53,7 @@ import com.google.common.collect.Sets; * <p>When the Master node startup, it will register in registry center. And schedule a {@link HeartBeatTask} to update its metadata in registry. */ @Component -public class MasterRegistryClient { +public class MasterRegistryClient implements AutoCloseable { /** * logger @@ -108,7 +108,8 @@ public class MasterRegistryClient { registryClient.setStoppable(stoppable); } - public void closeRegistry() { + @Override + public void close() { // TODO unsubscribe MasterRegistryDataListener deregister(); } @@ -194,7 +195,8 @@ public class MasterRegistryClient { masterConfig.getReservedMemory(), Sets.newHashSet(localNodePath), Constants.MASTER_TYPE, - registryClient); + registryClient, + masterConfig.getHeartbeatErrorThreshold()); // remove before persist registryClient.remove(localNodePath); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index b013a0e9c4..523e5f839d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -57,7 +57,7 @@ import org.springframework.stereotype.Service; * Master scheduler thread, this thread will consume the commands from database and trigger processInstance executed. */ @Service -public class MasterSchedulerBootstrap extends BaseDaemonThread { +public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerBootstrap.class); @@ -112,6 +112,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread { logger.info("Master schedule bootstrap started..."); } + @Override public void close() { logger.info("Master schedule bootstrap stopping..."); logger.info("Master schedule bootstrap stopped..."); diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index ff29b19389..b559806d28 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -98,6 +98,8 @@ master: host-selector: lower_weight # master heartbeat interval heartbeat-interval: 10s + # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. + heartbeat-error-threshold: 5 # master commit task retry times task-commit-retry-times: 5 # master commit task interval diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 129aaff0de..c84abb4182 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable { private final String serverType; private final HeartBeat heartBeat; + private final int heartBeatErrorThreshold; + + private final AtomicInteger heartBeatErrorTimes = new AtomicInteger(); + public HeartBeatTask(long startupTime, double maxCpuloadAvg, double reservedMemory, Set<String> heartBeatPaths, String serverType, - RegistryClient registryClient) { + RegistryClient registryClient, + int heartBeatErrorThreshold) { this.heartBeatPaths = heartBeatPaths; this.registryClient = registryClient; this.serverType = serverType; this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory); + this.heartBeatErrorThreshold = heartBeatErrorThreshold; } public HeartBeatTask(long startupTime, @@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable { String serverType, RegistryClient registryClient, int workerThreadCount, - int workerWaitingTaskCount - ) { + int workerWaitingTaskCount, + int heartBeatErrorThreshold) { this.heartBeatPaths = heartBeatPaths; this.registryClient = registryClient; this.workerWaitingTaskCount = workerWaitingTaskCount; this.serverType = serverType; this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount); + this.heartBeatErrorThreshold = heartBeatErrorThreshold; } public String getHeartBeatInfo() { @@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable { for (String heartBeatPath : heartBeatPaths) { registryClient.persistEphemeral(heartBeatPath, heartBeat.encodeHeartBeat()); } + heartBeatErrorTimes.set(0); } catch (Throwable ex) { logger.error("HeartBeat task execute failed", ex); + if (heartBeatErrorTimes.incrementAndGet() >= heartBeatErrorThreshold) { + registryClient.getStoppable() + .stop("HeartBeat task connect to zk failed too much times: " + heartBeatErrorTimes); + } } } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java index 61dfcb35d7..5b37b1f72d 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java @@ -24,7 +24,7 @@ import org.springframework.context.support.AbstractApplicationContext; import org.springframework.stereotype.Component; @Component -public class SpringApplicationContext implements ApplicationContextAware { +public class SpringApplicationContext implements ApplicationContextAware, AutoCloseable { private static ApplicationContext applicationContext; @@ -36,6 +36,7 @@ public class SpringApplicationContext implements ApplicationContextAware { /** * Close this application context, destroying all beans in its bean factory. */ + @Override public void close() { ((AbstractApplicationContext)applicationContext).close(); } diff --git a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java index 3866c53066..728fed8eaa 100644 --- a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java +++ b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java @@ -19,15 +19,41 @@ package org.apache.dolphinscheduler; import org.apache.curator.test.TestingServer; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.event.ApplicationFailedEvent; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; + +import lombok.NonNull; @SpringBootApplication -public class StandaloneServer { +public class StandaloneServer implements ApplicationListener<ApplicationEvent> { + + private static final Logger logger = LoggerFactory.getLogger(StandaloneServer.class); + + private static TestingServer zookeeperServer; public static void main(String[] args) throws Exception { - final TestingServer server = new TestingServer(true); - System.setProperty("registry.zookeeper.connect-string", server.getConnectString()); + zookeeperServer = new TestingServer(true); + System.setProperty("registry.zookeeper.connect-string", zookeeperServer.getConnectString()); SpringApplication.run(StandaloneServer.class, args); } + + @Override + public void onApplicationEvent(@NonNull ApplicationEvent event) { + if (event instanceof ApplicationFailedEvent || event instanceof ContextClosedEvent) { + try (TestingServer closedServer = zookeeperServer) { + // close the zookeeper server + logger.info("Receive spring context close event: {}, will closed zookeeper server", event); + } catch (IOException e) { + logger.error("Close zookeeper server error", e); + } + } + } } diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 9a940b6c0e..8411360275 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -116,6 +116,8 @@ master: host-selector: lower_weight # master heartbeat interval heartbeat-interval: 10s + # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. + heartbeat-error-threshold: 5 # master commit task retry times task-commit-retry-times: 5 # master commit task interval @@ -137,6 +139,8 @@ worker: exec-threads: 10 # worker heartbeat interval heartbeat-interval: 10s + # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. + heartbeat-error-threshold: 5 # worker host weight to dispatch tasks, default value 100 host-weight: 100 # worker tenant auto create diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 0321b76f44..471bc008eb 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; -import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -37,7 +37,6 @@ import org.apache.dolphinscheduler.service.task.TaskPluginManager; import org.apache.commons.collections4.CollectionUtils; import java.util.Collection; -import java.util.Set; import javax.annotation.PostConstruct; @@ -111,8 +110,7 @@ public class WorkerServer implements IStoppable { this.workerRegistryClient.registry(); this.workerRegistryClient.setRegistryStoppable(this); - Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths(); - this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); + this.workerRegistryClient.handleDeadServer(); this.workerManagerThread.start(); @@ -129,37 +127,24 @@ public class WorkerServer implements IStoppable { } public void close(String cause) { - try { - // execute only once - // set stop signal is true - if (!Stopper.stop()) { - logger.warn("WorkerServer is already stopped, current cause: {}", cause); - return; - } + if (!Stopper.stop()) { + logger.warn("WorkerServer is already stopped, current cause: {}", cause); + return; + } + ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); + try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer; + WorkerRegistryClient closedRegistryClient = workerRegistryClient; + AlertClientService closedAlertClientService = alertClientService; + SpringApplicationContext closedSpringContext = springApplicationContext;) { logger.info("Worker server is stopping, current cause : {}", cause); - - try { - // thread sleep 3 seconds for thread quitely stop - Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis()); - } catch (Exception e) { - logger.warn("Worker server close wait error", e); - } - - // close - this.workerRpcServer.close(); - this.workerRegistryClient.unRegistry(); - this.alertClientService.close(); - // kill running tasks this.killAllRunningTasks(); - - // close the application context - this.springApplicationContext.close(); - logger.info("Worker server stopped, current cause: {}", cause); } catch (Exception e) { logger.error("Worker server stop failed, current cause: {}", cause, e); + return; } + logger.info("Worker server stopped, current cause: {}", cause); } @Override diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index aab162d6d4..2367975715 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -40,6 +40,10 @@ public class WorkerConfig implements Validator { private int listenPort = 1234; private int execThreads = 10; private Duration heartbeatInterval = Duration.ofSeconds(10); + /** + * Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. + */ + private int heartbeatErrorThreshold = 5; private int hostWeight = 100; private boolean tenantAutoCreate = true; private boolean tenantDistributedUser = false; @@ -70,6 +74,9 @@ public class WorkerConfig implements Validator { if (workerConfig.getMaxCpuLoadAvg() <= 0) { workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2); } + if (workerConfig.getHeartbeatErrorThreshold() <= 0) { + errors.rejectValue("heartbeat-error-threshold", null, "should be a positive value"); + } workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index b33b3ef0d7..99d18acc30 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -55,7 +55,7 @@ import com.google.common.collect.Sets; * worker registry */ @Service -public class WorkerRegistryClient { +public class WorkerRegistryClient implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(WorkerRegistryClient.class); @@ -102,15 +102,15 @@ public class WorkerRegistryClient { long workerHeartbeatInterval = workerConfig.getHeartbeatInterval().getSeconds(); HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, - workerConfig.getMaxCpuLoadAvg(), - workerConfig.getReservedMemory(), - workerConfig.getHostWeight(), - workerZkPaths, - Constants.WORKER_TYPE, - registryClient, - workerConfig.getExecThreads(), - workerManagerThread.getThreadPoolQueueSize() - ); + workerConfig.getMaxCpuLoadAvg(), + workerConfig.getReservedMemory(), + workerConfig.getHostWeight(), + workerZkPaths, + Constants.WORKER_TYPE, + registryClient, + workerConfig.getExecThreads(), + workerManagerThread.getThreadPoolQueueSize(), + workerConfig.getHeartbeatErrorThreshold()); for (String workerZKPath : workerZkPaths) { // remove before persist @@ -148,8 +148,10 @@ public class WorkerRegistryClient { logger.error("remove worker zk path exception", ex); } - this.heartBeatExecutor.shutdownNow(); - logger.info("heartbeat executor shutdown"); + if (heartBeatExecutor != null) { + heartBeatExecutor.shutdownNow(); + logger.info("Heartbeat executor shutdown"); + } registryClient.close(); logger.info("registry client closed"); @@ -176,8 +178,9 @@ public class WorkerRegistryClient { return workerPaths; } - public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, String opType) { - registryClient.handleDeadServer(nodeSet, nodeType, opType); + public void handleDeadServer() { + Set<String> workerZkPaths = getWorkerZkPaths(); + registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP); } /** @@ -191,4 +194,9 @@ public class WorkerRegistryClient { registryClient.setStoppable(stoppable); } + @Override + public void close() throws IOException { + unRegistry(); + } + } diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index d764f9d25b..7d653edc24 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -60,6 +60,8 @@ worker: exec-threads: 100 # worker heartbeat interval heartbeat-interval: 10s + # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. + heartbeat-error-threshold: 5 # worker host weight to dispatch tasks, default value 100 host-weight: 100 # worker tenant auto create
