This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2be1d4bf0a Fix worker cannot shutdown due to resource close failed or 
heart beat check failed (#10979)
2be1d4bf0a is described below

commit 2be1d4bf0add2485d4bf322aef7bbaeabfc223de
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Jul 15 20:06:53 2022 +0800

    Fix worker cannot shutdown due to resource close failed or heart beat check 
failed (#10979)
    
    * Use try-with-resource to close resource, and add heart error threshold to 
avoid worker cannot close due to heart beat check failed
    
    * Move heartbeat error threshold to applicaiton.yml
---
 .../apache/dolphinscheduler/common/Constants.java  |  1 +
 .../common/thread/ThreadUtils.java                 | 14 +++++---
 .../server/master/MasterServer.java                | 38 +++++++++-----------
 .../server/master/config/MasterConfig.java         |  7 ++++
 .../master/registry/MasterRegistryClient.java      | 16 +++++----
 .../master/runner/MasterSchedulerBootstrap.java    |  3 +-
 .../src/main/resources/application.yaml            |  2 ++
 .../server/registry/HeartBeatTask.java             | 19 ++++++++--
 .../service/bean/SpringApplicationContext.java     |  3 +-
 .../apache/dolphinscheduler/StandaloneServer.java  | 32 +++++++++++++++--
 .../src/main/resources/application.yaml            |  4 +++
 .../server/worker/WorkerServer.java                | 41 +++++++---------------
 .../server/worker/config/WorkerConfig.java         |  7 ++++
 .../worker/registry/WorkerRegistryClient.java      | 36 +++++++++++--------
 .../src/main/resources/application.yaml            |  2 ++
 15 files changed, 142 insertions(+), 83 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 099620778e..7a84e95a13 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -50,6 +50,7 @@ public final class Constants {
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS 
= "/lock/failover/masters";
     public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS 
= "/lock/failover/workers";
     public static final String 
REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = 
"/lock/failover/startup-masters";
+
     public static final String FORMAT_SS = "%s%s";
     public static final String FORMAT_S_S = "%s/%s";
     public static final String FORMAT_S_S_COLON = "%s:%s";
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
index 5c8020b7cd..f4f2a17bc7 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
@@ -21,12 +21,18 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import lombok.experimental.UtilityClass;
 
 @UtilityClass
 public class ThreadUtils {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ThreadUtils.class);
+
     /**
      * Wrapper over newDaemonFixedThreadExecutor.
      *
@@ -35,10 +41,7 @@ public class ThreadUtils {
      * @return ExecutorService
      */
     public static ExecutorService newDaemonFixedThreadExecutor(String 
threadName, int threadsNum) {
-        ThreadFactory threadFactory = new ThreadFactoryBuilder()
-            .setDaemon(true)
-            .setNameFormat(threadName)
-            .build();
+        ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
         return Executors.newFixedThreadPool(threadsNum, threadFactory);
     }
 
@@ -48,8 +51,9 @@ public class ThreadUtils {
     public static void sleep(final long millis) {
         try {
             Thread.sleep(millis);
-        } catch (final InterruptedException ignore) {
+        } catch (final InterruptedException interruptedException) {
             Thread.currentThread().interrupt();
+            logger.error("Current thread sleep error", interruptedException);
         }
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3469cd4852..0bf3c945e7 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -115,32 +115,28 @@ public class MasterServer implements IStoppable {
      * @param cause close cause
      */
     public void close(String cause) {
-
-        try {
-            // set stop signal is true
-            // execute only once
-            if (!Stopper.stop()) {
-                logger.warn("MasterServer is already stopped, current cause: 
{}", cause);
-                return;
-            }
+        // set stop signal is true
+        // execute only once
+        if (!Stopper.stop()) {
+            logger.warn("MasterServer is already stopped, current cause: {}", 
cause);
+            return;
+        }
+        // thread sleep 3 seconds for thread quietly stop
+        ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
+        try (SchedulerApi closedSchedulerApi = schedulerApi;
+             MasterSchedulerBootstrap closedSchedulerBootstrap = 
masterSchedulerBootstrap;
+             MasterRPCServer closedRpcServer = masterRPCServer;
+             MasterRegistryClient closedMasterRegistryClient = 
masterRegistryClient;
+             // close spring Context and will invoke method with @PreDestroy 
annotation to destroy beans.
+             // like 
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
+             SpringApplicationContext closedSpringContext = 
springApplicationContext) {
 
             logger.info("Master server is stopping, current cause : {}", 
cause);
-
-            // thread sleep 3 seconds for thread quietly stop
-            ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
-            // close
-            this.schedulerApi.close();
-            this.masterSchedulerBootstrap.close();
-            this.masterRPCServer.close();
-            this.masterRegistryClient.closeRegistry();
-            // close spring Context and will invoke method with @PreDestroy 
annotation to destroy beans.
-            // like 
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
-            springApplicationContext.close();
-
-            logger.info("MasterServer stopped, current cause: {}", cause);
         } catch (Exception e) {
             logger.error("MasterServer stop failed, current cause: {}", cause, 
e);
+            return;
         }
+        logger.info("MasterServer stopped, current cause: {}", cause);
     }
 
     @Override
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index fe51e20984..7f6f124164 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -67,6 +67,10 @@ public class MasterConfig implements Validator {
      * Master heart beat task execute interval.
      */
     private Duration heartbeatInterval = Duration.ofSeconds(10);
+    /**
+     * Master heart beat task error threshold, if the continuous error count 
exceed this count, the master will close.
+     */
+    private int heartbeatErrorThreshold = 5;
     /**
      * task submit max retry times.
      */
@@ -129,6 +133,9 @@ public class MasterConfig implements Validator {
         if (masterConfig.getMaxCpuLoadAvg() <= 0) {
             
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
+        if (masterConfig.getHeartbeatErrorThreshold() <= 0) {
+            errors.rejectValue("heartbeat-error-threshold", null, "should be a 
positive value");
+        }
         
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
     }
 }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index 359fdc745e..aa536bae49 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -53,7 +53,7 @@ import com.google.common.collect.Sets;
  * <p>When the Master node startup, it will register in registry center. And 
schedule a {@link HeartBeatTask} to update its metadata in registry.
  */
 @Component
-public class MasterRegistryClient {
+public class MasterRegistryClient implements AutoCloseable {
 
     /**
      * logger
@@ -107,7 +107,8 @@ public class MasterRegistryClient {
         registryClient.setStoppable(stoppable);
     }
 
-    public void closeRegistry() {
+    @Override
+    public void close() {
         // TODO unsubscribe MasterRegistryDataListener
         deregister();
     }
@@ -189,11 +190,12 @@ public class MasterRegistryClient {
         String localNodePath = getCurrentNodePath();
         Duration masterHeartbeatInterval = masterConfig.getHeartbeatInterval();
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                masterConfig.getMaxCpuLoadAvg(),
-                masterConfig.getReservedMemory(),
-                Sets.newHashSet(localNodePath),
-                Constants.MASTER_TYPE,
-                registryClient);
+                                                        
masterConfig.getMaxCpuLoadAvg(),
+                                                        
masterConfig.getReservedMemory(),
+                                                        
Sets.newHashSet(localNodePath),
+                                                        Constants.MASTER_TYPE,
+                                                        registryClient,
+                                                        
masterConfig.getHeartbeatErrorThreshold());
 
         // remove before persist
         registryClient.remove(localNodePath);
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 870cc1b24c..aa0542cb7c 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -58,7 +58,7 @@ import org.springframework.stereotype.Service;
  * Master scheduler thread, this thread will consume the commands from 
database and trigger processInstance executed.
  */
 @Service
-public class MasterSchedulerBootstrap extends BaseDaemonThread {
+public class MasterSchedulerBootstrap extends BaseDaemonThread implements 
AutoCloseable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(MasterSchedulerBootstrap.class);
 
@@ -116,6 +116,7 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread {
         logger.info("Master schedule bootstrap started...");
     }
 
+    @Override
     public void close() {
         logger.info("Master schedule bootstrap stopping...");
         logger.info("Master schedule bootstrap stopped...");
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml 
b/dolphinscheduler-master/src/main/resources/application.yaml
index 7a9fa8fcde..9f191ccd25 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -98,6 +98,8 @@ master:
   host-selector: lower_weight
   # master heartbeat interval
   heartbeat-interval: 10s
+  # Master heart beat task error threshold, if the continuous error count 
exceed this count, the master will close.
+  heartbeat-error-threshold: 5
   # master commit task retry times
   task-commit-retry-times: 5
   # master commit task interval
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index 129aaff0de..c84abb4182 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.common.utils.HeartBeat;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,16 +39,22 @@ public class HeartBeatTask implements Runnable {
     private final String serverType;
     private final HeartBeat heartBeat;
 
+    private final int heartBeatErrorThreshold;
+
+    private final AtomicInteger heartBeatErrorTimes = new AtomicInteger();
+
     public HeartBeatTask(long startupTime,
                          double maxCpuloadAvg,
                          double reservedMemory,
                          Set<String> heartBeatPaths,
                          String serverType,
-                         RegistryClient registryClient) {
+                         RegistryClient registryClient,
+                         int heartBeatErrorThreshold) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory);
+        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public HeartBeatTask(long startupTime,
@@ -58,13 +65,14 @@ public class HeartBeatTask implements Runnable {
                          String serverType,
                          RegistryClient registryClient,
                          int workerThreadCount,
-                         int workerWaitingTaskCount
-    ) {
+                         int workerWaitingTaskCount,
+                         int heartBeatErrorThreshold) {
         this.heartBeatPaths = heartBeatPaths;
         this.registryClient = registryClient;
         this.workerWaitingTaskCount = workerWaitingTaskCount;
         this.serverType = serverType;
         this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, 
reservedMemory, hostWeight, workerThreadCount);
+        this.heartBeatErrorThreshold = heartBeatErrorThreshold;
     }
 
     public String getHeartBeatInfo() {
@@ -88,8 +96,13 @@ public class HeartBeatTask implements Runnable {
             for (String heartBeatPath : heartBeatPaths) {
                 registryClient.persistEphemeral(heartBeatPath, 
heartBeat.encodeHeartBeat());
             }
+            heartBeatErrorTimes.set(0);
         } catch (Throwable ex) {
             logger.error("HeartBeat task execute failed", ex);
+            if (heartBeatErrorTimes.incrementAndGet() >= 
heartBeatErrorThreshold) {
+                registryClient.getStoppable()
+                              .stop("HeartBeat task connect to zk failed too 
much times: " + heartBeatErrorTimes);
+            }
         }
     }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
index 61dfcb35d7..5b37b1f72d 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/bean/SpringApplicationContext.java
@@ -24,7 +24,7 @@ import 
org.springframework.context.support.AbstractApplicationContext;
 import org.springframework.stereotype.Component;
 
 @Component
-public class SpringApplicationContext implements ApplicationContextAware {
+public class SpringApplicationContext implements ApplicationContextAware, 
AutoCloseable {
 
     private static ApplicationContext applicationContext;
 
@@ -36,6 +36,7 @@ public class SpringApplicationContext implements 
ApplicationContextAware {
     /**
      * Close this application context, destroying all beans in its bean 
factory.
      */
+    @Override
     public void close() {
         ((AbstractApplicationContext)applicationContext).close();
     }
diff --git 
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
 
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
index 3866c53066..728fed8eaa 100644
--- 
a/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
+++ 
b/dolphinscheduler-standalone-server/src/main/java/org/apache/dolphinscheduler/StandaloneServer.java
@@ -19,15 +19,41 @@ package org.apache.dolphinscheduler;
 
 import org.apache.curator.test.TestingServer;
 
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.event.ApplicationFailedEvent;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
+
+import lombok.NonNull;
 
 @SpringBootApplication
-public class StandaloneServer {
+public class StandaloneServer implements ApplicationListener<ApplicationEvent> 
{
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StandaloneServer.class);
+
+    private static TestingServer zookeeperServer;
 
     public static void main(String[] args) throws Exception {
-        final TestingServer server = new TestingServer(true);
-        System.setProperty("registry.zookeeper.connect-string", 
server.getConnectString());
+        zookeeperServer = new TestingServer(true);
+        System.setProperty("registry.zookeeper.connect-string", 
zookeeperServer.getConnectString());
         SpringApplication.run(StandaloneServer.class, args);
     }
+
+    @Override
+    public void onApplicationEvent(@NonNull ApplicationEvent event) {
+        if (event instanceof ApplicationFailedEvent || event instanceof 
ContextClosedEvent) {
+            try (TestingServer closedServer = zookeeperServer) {
+                // close the zookeeper server
+                logger.info("Receive spring context close event: {}, will 
closed zookeeper server", event);
+            } catch (IOException e) {
+                logger.error("Close zookeeper server error", e);
+            }
+        }
+    }
 }
diff --git 
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml 
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 6c5171e59d..5640467616 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -133,6 +133,8 @@ master:
   host-selector: lower_weight
   # master heartbeat interval
   heartbeat-interval: 10s
+  # Master heart beat task error threshold, if the continuous error count 
exceed this count, the master will close.
+  heartbeat-error-threshold: 5
   # master commit task retry times
   task-commit-retry-times: 5
   # master commit task interval
@@ -154,6 +156,8 @@ worker:
   exec-threads: 10
   # worker heartbeat interval
   heartbeat-interval: 10s
+  # Worker heart beat task error threshold, if the continuous error count 
exceed this count, the worker will close.
+  heartbeat-error-threshold: 5
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # tenant corresponds to the user of the system, which is used by the worker 
to submit the job. If system does not have this user, it will be automatically 
created after the parameter worker.tenant.auto.create is true.
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index d97a994249..d03a2a951b 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.server.worker;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.ProcessUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@@ -37,7 +37,6 @@ import 
org.apache.dolphinscheduler.service.task.TaskPluginManager;
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.Collection;
-import java.util.Set;
 
 import javax.annotation.PostConstruct;
 
@@ -121,8 +120,7 @@ public class WorkerServer implements IStoppable {
 
         this.workerRegistryClient.registry();
         this.workerRegistryClient.setRegistryStoppable(this);
-        Set<String> workerZkPaths = 
this.workerRegistryClient.getWorkerZkPaths();
-        this.workerRegistryClient.handleDeadServer(workerZkPaths, 
NodeType.WORKER, Constants.DELETE_OP);
+        this.workerRegistryClient.handleDeadServer();
 
         this.workerManagerThread.start();
 
@@ -139,37 +137,24 @@ public class WorkerServer implements IStoppable {
     }
 
     public void close(String cause) {
-        try {
-            // execute only once
-            // set stop signal is true
-            if (!Stopper.stop()) {
-                logger.warn("WorkerServer is already stopped, current cause: 
{}", cause);
-                return;
-            }
+        if (!Stopper.stop()) {
+            logger.warn("WorkerServer is already stopped, current cause: {}", 
cause);
+            return;
+        }
+        ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
 
+        try (WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
+             WorkerRegistryClient closedRegistryClient = workerRegistryClient;
+             AlertClientService closedAlertClientService = alertClientService;
+             SpringApplicationContext closedSpringContext = 
springApplicationContext;) {
             logger.info("Worker server is stopping, current cause : {}", 
cause);
-
-            try {
-                // thread sleep 3 seconds for thread quitely stop
-                Thread.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
-            } catch (Exception e) {
-                logger.warn("Worker server close wait error", e);
-            }
-
-            // close
-            this.workerRpcServer.close();
-            this.workerRegistryClient.unRegistry();
-            this.alertClientService.close();
-
             // kill running tasks
             this.killAllRunningTasks();
-
-            // close the application context
-            this.springApplicationContext.close();
-            logger.info("Worker server stopped, current cause: {}", cause);
         } catch (Exception e) {
             logger.error("Worker server stop failed, current cause: {}", 
cause, e);
+            return;
         }
+        logger.info("Worker server stopped, current cause: {}", cause);
     }
 
     @Override
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index aab162d6d4..2367975715 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -40,6 +40,10 @@ public class WorkerConfig implements Validator {
     private int listenPort = 1234;
     private int execThreads = 10;
     private Duration heartbeatInterval = Duration.ofSeconds(10);
+    /**
+     * Worker heart beat task error threshold, if the continuous error count 
exceed this count, the worker will close.
+     */
+    private int heartbeatErrorThreshold = 5;
     private int hostWeight = 100;
     private boolean tenantAutoCreate = true;
     private boolean tenantDistributedUser = false;
@@ -70,6 +74,9 @@ public class WorkerConfig implements Validator {
         if (workerConfig.getMaxCpuLoadAvg() <= 0) {
             
workerConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
         }
+        if (workerConfig.getHeartbeatErrorThreshold() <= 0) {
+            errors.rejectValue("heartbeat-error-threshold", null, "should be a 
positive value");
+        }
         
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
     }
 }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index b94cdaf317..96d41c38c0 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -54,7 +54,7 @@ import com.google.common.collect.Sets;
  * worker registry
  */
 @Service
-public class WorkerRegistryClient {
+public class WorkerRegistryClient implements AutoCloseable {
 
     private final Logger logger = 
LoggerFactory.getLogger(WorkerRegistryClient.class);
 
@@ -101,15 +101,15 @@ public class WorkerRegistryClient {
         long workerHeartbeatInterval = 
workerConfig.getHeartbeatInterval().getSeconds();
 
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
-                workerConfig.getMaxCpuLoadAvg(),
-                workerConfig.getReservedMemory(),
-                workerConfig.getHostWeight(),
-                workerZkPaths,
-                Constants.WORKER_TYPE,
-                registryClient,
-                workerConfig.getExecThreads(),
-                workerManagerThread.getThreadPoolQueueSize()
-        );
+                                                        
workerConfig.getMaxCpuLoadAvg(),
+                                                        
workerConfig.getReservedMemory(),
+                                                        
workerConfig.getHostWeight(),
+                                                        workerZkPaths,
+                                                        Constants.WORKER_TYPE,
+                                                        registryClient,
+                                                        
workerConfig.getExecThreads(),
+                                                        
workerManagerThread.getThreadPoolQueueSize(),
+                                                        
workerConfig.getHeartbeatErrorThreshold());
 
         for (String workerZKPath : workerZkPaths) {
             // remove before persist
@@ -147,8 +147,10 @@ public class WorkerRegistryClient {
             logger.error("remove worker zk path exception", ex);
         }
 
-        this.heartBeatExecutor.shutdownNow();
-        logger.info("heartbeat executor shutdown");
+        if (heartBeatExecutor != null) {
+            heartBeatExecutor.shutdownNow();
+            logger.info("Heartbeat executor shutdown");
+        }
 
         registryClient.close();
         logger.info("registry client closed");
@@ -175,8 +177,9 @@ public class WorkerRegistryClient {
         return workerPaths;
     }
 
-    public void handleDeadServer(Set<String> nodeSet, NodeType nodeType, 
String opType) {
-        registryClient.handleDeadServer(nodeSet, nodeType, opType);
+    public void handleDeadServer() {
+        Set<String> workerZkPaths = getWorkerZkPaths();
+        registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, 
Constants.DELETE_OP);
     }
 
     /**
@@ -190,4 +193,9 @@ public class WorkerRegistryClient {
         registryClient.setStoppable(stoppable);
     }
 
+    @Override
+    public void close() throws IOException {
+        unRegistry();
+    }
+
 }
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml 
b/dolphinscheduler-worker/src/main/resources/application.yaml
index ad390ed6af..a9c3eadf3c 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -60,6 +60,8 @@ worker:
   exec-threads: 100
   # worker heartbeat interval
   heartbeat-interval: 10s
+  # Worker heart beat task error threshold, if the continuous error count 
exceed this count, the worker will close.
+  heartbeat-error-threshold: 5
   # worker host weight to dispatch tasks, default value 100
   host-weight: 100
   # tenant corresponds to the user of the system, which is used by the worker 
to submit the job. If system does not have this user, it will be automatically 
created after the parameter worker.tenant.auto.create is true.

Reply via email to