This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new c956185  [Feature] 2.0.2-prepare bug fix of Pressure tests #7511 
(#7540)
c956185 is described below

commit c956185e00118bde6df99c2aaf3017d974515ebf
Author: zwZjut <[email protected]>
AuthorDate: Wed Dec 22 18:57:51 2021 +0800

    [Feature] 2.0.2-prepare bug fix of Pressure tests #7511 (#7540)
    
    * [Feature][dolphinscheduler-api] parse traceId in http header for Cross 
system delivery to #7237 (#7238)
    
    * to #7237
    
    * rerun test
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * chery-pick 05aef27 and handle conflicts
    
    * to #7065: fix ExecutorService and schedulerService (#7072)
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * [Feature][dolphinscheduler-api] access control of taskDefinition and 
taskInstance in project to #7081  (#7082)
    
    * to #7081
    
    * fix #7081
    
    * to #7081
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * chery-pick 8ebe060 and handle conflicts
    
    * cherry-pick 1f18444 and handle conflicts
    
    * fix #6807: dolphinscheduler.zookeeper.env_vars - > 
dolphinscheduler.registry.env_vars (#6808)
    
    Co-authored-by: honghuo.zw <[email protected]>
    Co-authored-by: Kirs <[email protected]>
    
    * add default constructor (#6780)
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * to #7108 (#7109)
    
    * to #7511
    
    * to #7511
    
    * to #7511
    
    * to #7511
    
    Co-authored-by: honghuo.zw <[email protected]>
    Co-authored-by: Kirs <[email protected]>
---
 .../application-api.properties.tpl                 |   3 +
 .../conf/dolphinscheduler/master.properties.tpl    |   4 +
 .../conf/dolphinscheduler/quartz.properties.tpl    |   3 +-
 .../conf/dolphinscheduler/registry.properties.tpl  |   1 +
 .../conf/dolphinscheduler/worker.properties.tpl    |   3 +
 docker/kubernetes/dolphinscheduler/values.yaml     |  10 ++
 .../apache/dolphinscheduler/common/Constants.java  |   3 +-
 .../dao/entity/ProcessInstance.java                |  18 ++
 .../dao/mapper/ProcessInstanceMapMapper.java       |   4 +-
 .../dao/mapper/ProcessInstanceMapper.java          |   8 +
 .../dao/mapper/TaskInstanceMapper.java             |   2 +
 .../dao/mapper/ProcessInstanceMapper.xml           |  13 +-
 .../dao/mapper/TaskInstanceMapper.xml              |   6 +
 .../src/main/resources/sql/dolphinscheduler_h2.sql |   1 +
 .../main/resources/sql/dolphinscheduler_mysql.sql  |   1 +
 .../resources/sql/dolphinscheduler_postgre.sql     |   1 +
 .../src/main/resources/sql/soft_version            |   2 +-
 .../2.0.2_schema/mysql/dolphinscheduler_ddl.sql    |  20 +++
 .../2.0.2_schema/mysql/dolphinscheduler_dml.sql    |  16 ++
 .../postgresql/dolphinscheduler_ddl.sql            |  41 +++++
 .../postgresql/dolphinscheduler_dml.sql            |  16 ++
 .../server/master/MasterServer.java                |  23 ++-
 .../server/master/config/MasterConfig.java         |  21 +++
 .../master/consumer/TaskPriorityQueueConsumer.java |   8 +-
 .../processor/queue/TaskResponseService.java       |  26 ++-
 .../master/registry/MasterRegistryClient.java      | 199 +++++++++++++++++----
 .../registry/MasterRegistryDataListener.java       |   4 +-
 .../server/master/registry/ServerNodeManager.java  |   4 +-
 .../master/runner/FailoverExecuteThread.java       |  91 ++++++++++
 .../master/runner/MasterSchedulerService.java      |   1 +
 .../master/runner/WorkflowExecuteThread.java       |  16 +-
 .../master/runner/task/CommonTaskProcessor.java    |   6 +-
 .../server/worker/WorkerServer.java                |  30 +++-
 .../server/worker/config/WorkerConfig.java         |  11 ++
 .../worker/processor/DBTaskAckProcessor.java       |   9 +-
 .../worker/processor/DBTaskResponseProcessor.java  |   5 +-
 .../worker/registry/WorkerRegistryClient.java      |  44 +++++
 .../worker/runner/RetryReportTaskStatusThread.java |  16 +-
 .../master/registry/MasterRegistryClientTest.java  |   6 +-
 .../service/process/ProcessService.java            |  30 +++-
 .../service/quartz/QuartzExecutors.java            |  73 ++++----
 .../service/queue/TaskPriorityQueueImpl.java       |   2 +-
 .../service/registry/RegistryClient.java           |   2 +-
 .../dolphinscheduler/spi/utils/PropertyUtils.java  |   3 +
 44 files changed, 670 insertions(+), 136 deletions(-)

diff --git a/docker/build/conf/dolphinscheduler/application-api.properties.tpl 
b/docker/build/conf/dolphinscheduler/application-api.properties.tpl
index d78db2d..393a33c 100644
--- a/docker/build/conf/dolphinscheduler/application-api.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/application-api.properties.tpl
@@ -38,6 +38,9 @@ 
server.compression.mime-types=text/html,text/xml,text/plain,text/css,text/javasc
 # max http post size
 server.jetty.max-http-form-post-size=5000000
 
+# max http header size
+server.max-http-header-size=81920
+
 # messages encoding
 spring.messages.encoding=UTF-8
 
diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl 
b/docker/build/conf/dolphinscheduler/master.properties.tpl
index 046d5c1..98ca3dd 100644
--- a/docker/build/conf/dolphinscheduler/master.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/master.properties.tpl
@@ -44,3 +44,7 @@ master.max.cpuload.avg=${MASTER_MAX_CPULOAD_AVG}
 
 # master reserved memory, only lower than system available memory, master 
server can schedule. default value 0.3, the unit is G
 master.reserved.memory=${MASTER_RESERVED_MEMORY}
+# master failover interval minutes
+master.failover.interval=${MASTER_FAILOVER_INTERVAL}
+# master kill yarn job when handle failover
+master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/quartz.properties.tpl 
b/docker/build/conf/dolphinscheduler/quartz.properties.tpl
index 45c61a6..5f011f9 100644
--- a/docker/build/conf/dolphinscheduler/quartz.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/quartz.properties.tpl
@@ -32,7 +32,8 @@
 
 #org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
 #org.quartz.threadPool.makeThreadsDaemons = true
-#org.quartz.threadPool.threadCount = 25
+org.quartz.threadPool.threadCount = ${ORG_QUARTZ_THREADPOOL_THREADCOUNT}
+org.quartz.scheduler.batchTriggerAcquisitionMaxCount = 
${ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT}
 #org.quartz.threadPool.threadPriority = 5
 
 #============================================================================
diff --git a/docker/build/conf/dolphinscheduler/registry.properties.tpl 
b/docker/build/conf/dolphinscheduler/registry.properties.tpl
index 9ee8add..e1ac104 100644
--- a/docker/build/conf/dolphinscheduler/registry.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/registry.properties.tpl
@@ -17,3 +17,4 @@
 
 registry.plugin.name=${REGISTRY_PLUGIN_NAME}
 registry.servers=${REGISTRY_SERVERS}
+session.timeout.ms=${SESSION_TIMEOUT_MS}
\ No newline at end of file
diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl 
b/docker/build/conf/dolphinscheduler/worker.properties.tpl
index 94a3352..e1f1574 100644
--- a/docker/build/conf/dolphinscheduler/worker.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl
@@ -41,3 +41,6 @@ worker.groups=${WORKER_GROUPS}
 
 # alert server listen host
 alert.listen.host=${ALERT_LISTEN_HOST}
+
+# worker retry report task statues interval seconds
+worker.retry.report.task.statues.interval=${WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL}
\ No newline at end of file
diff --git a/docker/kubernetes/dolphinscheduler/values.yaml 
b/docker/kubernetes/dolphinscheduler/values.yaml
index 6ef58d5..f0df9e7 100644
--- a/docker/kubernetes/dolphinscheduler/values.yaml
+++ b/docker/kubernetes/dolphinscheduler/values.yaml
@@ -53,6 +53,10 @@ externalDatabase:
 ## If not exists external zookeeper, by default, Dolphinscheduler's zookeeper 
will use it.
 zookeeper:
   enabled: true
+  tickTime: 3000
+  maxSessionTimeout: 60000
+  initLimit: 300
+  maxClientCnxns: 2000
   fourlwCommandsWhitelist: "srvr,ruok,wchs,cons"
   persistence:
     enabled: false
@@ -158,6 +162,10 @@ master:
     MASTER_TASK_COMMIT_INTERVAL: "1000"
     MASTER_MAX_CPULOAD_AVG: "-1"
     MASTER_RESERVED_MEMORY: "0.3"
+    MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER: "true"
+    ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
+    ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
+    SESSION_TIMEOUT_MS: 60000
   ## Periodic probe of container liveness. Container will be restarted if the 
probe fails. Cannot be updated.
   ## More info: 
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
   livenessProbe:
@@ -225,6 +233,8 @@ worker:
     WORKER_MAX_CPULOAD_AVG: "-1"
     WORKER_RESERVED_MEMORY: "0.3"
     WORKER_GROUPS: "default"
+    SESSION_TIMEOUT_MS: 60000
+    WORKER_RETRY_REPORT_TASK_STATUS_INTERVAL: 600
   ## Periodic probe of container liveness. Container will be restarted if the 
probe fails. Cannot be updated.
   ## More info: 
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
   livenessProbe:
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 5a7b7cd..809bcb1 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -53,7 +53,7 @@ public final class Constants {
     public static final String ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK = 
"org.quartz.jobStore.acquireTriggersWithinLock";
     public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE = 
"org.quartz.jobStore.dataSource";
     public static final String 
ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS = 
"org.quartz.dataSource.myDs.connectionProvider.class";
-
+    public static final String 
ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT = 
"org.quartz.scheduler.batchTriggerAcquisitionMaxCount";
     /**
      * quartz config default value
      */
@@ -66,6 +66,7 @@ public final class Constants {
     public static final String QUARTZ_INSTANCENAME = "DolphinScheduler";
     public static final String QUARTZ_INSTANCEID = "AUTO";
     public static final String QUARTZ_ACQUIRETRIGGERSWITHINLOCK = "true";
+    public static final String QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT = "100";
 
     /**
      * common properties path
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 18c386b..f20b13a 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -244,6 +244,12 @@ public class ProcessInstance {
      */
     private int dryRun;
 
+    /**
+     * re-start time
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
+    private Date restartTime;
+
     public ProcessInstance() {
 
     }
@@ -516,6 +522,14 @@ public class ProcessInstance {
         this.dryRun = dryRun;
     }
 
+    public Date getRestartTime() {
+        return restartTime;
+    }
+
+    public void setRestartTime(Date restartTime) {
+        this.restartTime = restartTime;
+    }
+
     /**
      * add command to history
      *
@@ -684,6 +698,10 @@ public class ProcessInstance {
             + ", dryRun='"
             + dryRun
             + '\''
+            + '}'
+            + ", restartTime='"
+            + restartTime
+            + '\''
             + '}';
     }
 
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
index 0e5a381..8ad7e89 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapMapper.java
@@ -17,11 +17,13 @@
 package org.apache.dolphinscheduler.dao.mapper;
 
 import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
 import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
 /**
  * process instance map mapper interface
  */
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 51f6029..4a156ce 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -44,6 +44,14 @@ public interface ProcessInstanceMapper extends 
BaseMapper<ProcessInstance> {
     ProcessInstance queryDetailById(@Param("processId") int processId);
 
     /**
+     * query process instance host by stateArray
+     *
+     * @param stateArray
+     * @return
+     */
+    List<String> queryNeedFailoverProcessInstanceHost(@Param("states") int[] 
stateArray);
+
+    /**
      * query process instance by host and stateArray
      *
      * @param host host
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index 795004d..5e2597d 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -73,4 +73,6 @@ public interface TaskInstanceMapper extends 
BaseMapper<TaskInstance> {
                                                     @Param("startTime") Date 
startTime,
                                                     @Param("endTime") Date 
endTime
     );
+
+    int updateHostAndSubmitTimeById(@Param("id") int id, @Param("host") String 
host, @Param("submitTime") Date submitTime);
 }
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 77d96b5..7729695 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -23,7 +23,7 @@
         command_type, command_param, task_depend_type, max_try_times, 
failure_strategy, warning_type,
         warning_group_id, schedule_time, command_start_time, global_params, 
flag,
         update_time, is_sub_process, executor_id, history_cmd,
-        process_instance_priority, worker_group,environment_code, timeout, 
tenant_id, var_pool, dry_run
+        process_instance_priority, worker_group,environment_code, timeout, 
tenant_id, var_pool, dry_run, restart_time
     </sql>
     <select id="queryDetailById" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
@@ -45,7 +45,14 @@
         </foreach>
         order by id asc
     </select>
-
+    <select id="queryNeedFailoverProcessInstanceHost" resultType="String">
+        select distinct host
+        from t_ds_process_instance
+        where state in
+        <foreach collection="states" item="i" open="(" close=")" separator=",">
+            #{i}
+        </foreach>
+    </select>
     <select id="queryTopNProcessInstance" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select
         <include refid="baseSql"/>
@@ -93,7 +100,7 @@
     <select id="queryProcessInstanceListPaging" 
resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
         select instance.id, instance.command_type, instance.executor_id, 
instance.process_definition_version,
         instance.process_definition_code, instance.name, instance.state, 
instance.schedule_time, instance.start_time,
-        instance.end_time, instance.run_times, instance.recovery, 
instance.host, instance.dry_run
+        instance.end_time, instance.run_times, instance.recovery, 
instance.host, instance.dry_run, instance.restart_time
         from t_ds_process_instance instance
         join t_ds_process_definition define ON 
instance.process_definition_code = define.code
         where instance.is_sub_process=0
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index f41b58a..bdbc538 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -162,4 +162,10 @@
         </if>
         order by instance.start_time desc
     </select>
+    <update id="updateHostAndSubmitTimeById">
+        update t_ds_task_instance
+        set host        = #{host},
+            submit_time = #{submitTime}
+        where id = #{id}
+    </update>
 </mapper>
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
index c85e106..4f6e900 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
@@ -601,6 +601,7 @@ CREATE TABLE t_ds_process_instance
     tenant_id                  int(11) NOT NULL DEFAULT '-1',
     var_pool                   longtext,
     dry_run                    int NULL DEFAULT 0,
+    restart_time               datetime     DEFAULT NULL,
     PRIMARY KEY (id)
 );
 
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 35e8d5a..9e52d60 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -596,6 +596,7 @@ CREATE TABLE `t_ds_process_instance` (
   `var_pool` longtext COMMENT 'var_pool',
   `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
   `next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next 
processInstanceId',
+  `restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time',
   PRIMARY KEY (`id`),
   KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
   KEY `start_time_index` (`start_time`,`end_time`) USING BTREE
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
index 3683b9b..5c02006 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgre.sql
@@ -501,6 +501,7 @@ CREATE TABLE t_ds_process_instance (
   var_pool text ,
   dry_run int DEFAULT '0' ,
   next_process_instance_id int DEFAULT '0',
+  restart_time timestamp DEFAULT NULL ,
   PRIMARY KEY (id)
 ) ;
 
diff --git a/dolphinscheduler-dao/src/main/resources/sql/soft_version 
b/dolphinscheduler-dao/src/main/resources/sql/soft_version
index 10bf840..f93ea0c 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/soft_version
+++ b/dolphinscheduler-dao/src/main/resources/sql/soft_version
@@ -1 +1 @@
-2.0.1
\ No newline at end of file
+2.0.2
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000..c7a2396
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
+
+alter table t_ds_process_instance add column if not exists `restart_time` 
datetime DEFAULT NULL COMMENT 'process instance restart time';
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql
  
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql
 
new file mode 100644
index 0000000..38964cc
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_dml.sql
   
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
new file mode 100644
index 0000000..d26cf8e
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+delimiter d//
+CREATE OR REPLACE FUNCTION public.dolphin_update_metadata(
+       )
+    RETURNS character varying
+    LANGUAGE 'plpgsql'
+    COST 100
+    VOLATILE PARALLEL UNSAFE
+AS $BODY$
+DECLARE
+v_schema varchar;
+BEGIN
+    ---get schema name
+    v_schema =current_schema();
+
+EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD 
COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
+return 'Success!';
+exception when others then
+               ---Raise EXCEPTION '(%)',SQLERRM;
+        return SQLERRM;
+END;
+$BODY$;
+
+select dolphin_update_metadata();
+
+d//
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql
new file mode 100644
index 0000000..38964cc
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_dml.sql
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index ce0a080..e9d0cc5 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.server.master;
 
+import static 
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
+
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.thread.Stopper;
@@ -31,8 +33,9 @@ import 
org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProce
 import 
org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
 import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
 import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
+import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
 import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
+import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.quartz.QuartzExecutors;
 
@@ -51,8 +54,6 @@ import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.FilterType;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
-import static 
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME;
-
 /**
  *  master server
  */
@@ -105,6 +106,9 @@ public class MasterServer implements IStoppable {
     @Autowired
     private EventExecuteService eventExecuteService;
 
+    @Autowired
+    private FailoverExecuteThread failoverExecuteThread;
+
     @Value("${spring.datasource.driver-class-name}")
     private String driverClassName;
 
@@ -145,8 +149,8 @@ public class MasterServer implements IStoppable {
 
         // self tolerant
         this.masterRegistryClient.init(this.processInstanceExecMaps);
-        this.masterRegistryClient.start();
         this.masterRegistryClient.setRegistryStoppable(this);
+        this.masterRegistryClient.start();
 
         this.eventExecuteService.init(this.processInstanceExecMaps);
         this.eventExecuteService.start();
@@ -155,6 +159,8 @@ public class MasterServer implements IStoppable {
 
         this.masterSchedulerService.start();
 
+        this.failoverExecuteThread.start();
+
         // start QuartzExecutors
         // what system should do if exception
         try {
@@ -217,8 +223,17 @@ public class MasterServer implements IStoppable {
             }
             // close spring Context and will invoke method with @PreDestroy 
annotation to destory beans. like 
ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
             springApplicationContext.close();
+            logger.info("springApplicationContext close");
         } catch (Exception e) {
             logger.error("master server stop exception ", e);
+        } finally {
+            try {
+                // thread sleep 60 seconds for quietly stop
+                Thread.sleep(60000L);
+            } catch (Exception e) {
+                logger.warn("thread sleep exception ", e);
+            }
+            System.exit(1);
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 124eceb..13a68c4 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -63,6 +63,12 @@ public class MasterConfig {
     @Value("${master.cache.process.definition:true}")
     private boolean masterCacheProcessDefinition;
 
+    @Value("${master.failover.interval:10}")
+    private int failoverInterval;
+
+    @Value("${master.kill.yarn.job.when.handle.fail.over:true}")
+    private boolean masterKillYarnJobWhenHandleFailOver;
+
     public int getListenPort() {
         return listenPort;
     }
@@ -162,4 +168,19 @@ public class MasterConfig {
         this.masterCacheProcessDefinition = masterCacheProcessDefinition;
     }
 
+    public int getFailoverInterval() {
+        return failoverInterval;
+    }
+
+    public void setFailoverInterval(int failoverInterval) {
+        this.failoverInterval = failoverInterval;
+    }
+
+    public boolean getMasterKillYarnJobWhenHandleFailOver() {
+        return masterKillYarnJobWhenHandleFailOver;
+    }
+
+    public void setMasterKillYarnJobWhenHandleFailOver(boolean 
masterKillYarnJobWhenHandleFailOver) {
+        this.masterKillYarnJobWhenHandleFailOver = 
masterKillYarnJobWhenHandleFailOver;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 7b18e2b..574f5db 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -31,6 +31,7 @@ import 
org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -136,8 +137,13 @@ public class TaskPriorityQueueConsumer extends Thread {
             } else {
                 result = dispatcher.dispatch(executionContext);
             }
+            if (result) {
+                
processService.updateHostAndSubmitTimeById(taskPriority.getTaskId(), 
executionContext.getHost().getAddress(), new Date());
+            }
         } catch (ExecuteException e) {
-            logger.error("dispatch error: {}", e.getMessage(),e);
+            logger.error("ExecuteException dispatch error: {}", 
e.getMessage(), e);
+        } catch (Throwable t) {
+            logger.error("dispatch error: {}", t, t);
         }
         return result;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index 27b96e1..a320a70 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -110,6 +110,7 @@ public class TaskResponseService {
     public void addResponse(TaskResponseEvent taskResponseEvent) {
         try {
             eventQueue.put(taskResponseEvent);
+            logger.debug("eventQueue size:{}", eventQueue.size());
         } catch (InterruptedException e) {
             logger.error("put task : {} error :{}", taskResponseEvent, e);
             Thread.currentThread().interrupt();
@@ -155,36 +156,49 @@ public class TaskResponseService {
                 try {
                     if (taskInstance != null) {
                         ExecutionStatus status = 
taskInstance.getState().typeIsFinished() ? taskInstance.getState() : 
taskResponseEvent.getState();
-                        processService.changeTaskState(taskInstance, status,
+                        boolean result = 
processService.changeTaskState(taskInstance, status,
                                 taskResponseEvent.getStartTime(),
                                 taskResponseEvent.getWorkerAddress(),
                                 taskResponseEvent.getExecutePath(),
                                 taskResponseEvent.getLogPath(),
                                 taskResponseEvent.getTaskInstanceId());
+                        logger.debug("changeTaskState in ACK , changed in 
meta:{} ,task instance state:{}, task response event state:{}, taskInstance 
id:{},taskInstance host:{}",
+                                result, taskInstance.getState(), 
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
                     }
                     // if taskInstance is null (maybe deleted) . retry will be 
meaningless . so ack success
                     DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskResponseEvent.getTaskInstanceId());
                     channel.writeAndFlush(taskAckCommand.convert2Command());
+                    logger.debug("worker ack master success, taskInstance 
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
                 } catch (Exception e) {
                     logger.error("worker ack master error", e);
-                    DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
+                    DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : 
taskInstance.getId());
                     channel.writeAndFlush(taskAckCommand.convert2Command());
                 }
                 break;
             case RESULT:
                 try {
+                    boolean result = true;
                     if (taskInstance != null) {
-                        processService.changeTaskState(taskInstance, 
taskResponseEvent.getState(),
+                        result = processService.changeTaskState(taskInstance, 
taskResponseEvent.getState(),
                                 taskResponseEvent.getEndTime(),
                                 taskResponseEvent.getProcessId(),
                                 taskResponseEvent.getAppIds(),
                                 taskResponseEvent.getTaskInstanceId(),
                                 taskResponseEvent.getVarPool()
                         );
+                        logger.debug("changeTaskState in RESULT , changed in 
meta:{} task instance state:{}, task response event state:{}, taskInstance 
id:{},taskInstance host:{}",
+                                result, taskInstance.getState(), 
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
+                    }
+                    if (!result) {
+                        DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), 
taskResponseEvent.getTaskInstanceId());
+                        
channel.writeAndFlush(taskResponseCommand.convert2Command());
+                        logger.debug("worker response master failure, 
taskInstance id:{},taskInstance host:{}", taskInstance.getId(), 
taskInstance.getHost());
+                    } else {
+                        // if taskInstance is null (maybe deleted) . retry 
will be meaningless . so response success
+                        DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), 
taskResponseEvent.getTaskInstanceId());
+                        
channel.writeAndFlush(taskResponseCommand.convert2Command());
+                        logger.debug("worker response master success, 
taskInstance id:{},taskInstance host:{}", taskInstance.getId(), 
taskInstance.getHost());
                     }
-                    // if taskInstance is null (maybe deleted) . retry will be 
meaningless . so response success
-                    DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), 
taskResponseEvent.getTaskInstanceId());
-                    
channel.writeAndFlush(taskResponseCommand.convert2Command());
                 } catch (Exception e) {
                     logger.error("worker response master error", e);
                     DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index ae2b969..5f1eff6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -43,6 +43,7 @@ import 
org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.service.registry.RegistryClient;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
 
 import java.util.Collections;
@@ -127,16 +128,16 @@ public class MasterRegistryClient {
                 ThreadUtils.sleep(SLEEP_TIME_MILLIS);
             }
 
-            // self tolerant
-            if (registryClient.getActiveMasterNum() == 1) {
-                removeNodePath(null, NodeType.MASTER, true);
-                removeNodePath(null, NodeType.WORKER, true);
-            }
             registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new 
MasterRegistryDataListener());
         } catch (Exception e) {
             logger.error("master start up exception", e);
+            this.registryClient.getStoppable().stop("master start up 
exception");
         } finally {
-            registryClient.releaseLock(nodeLock);
+            try {
+                registryClient.releaseLock(nodeLock);
+            } catch (Exception e) {
+                logger.error("release lock error", e);
+            }
         }
     }
 
@@ -150,18 +151,57 @@ public class MasterRegistryClient {
     }
 
     /**
-     * remove zookeeper node path
+     * remove master node path
      *
-     * @param path zookeeper node path
-     * @param nodeType zookeeper node type
+     * @param path node path
+     * @param nodeType node type
      * @param failover is failover
      */
-    public void removeNodePath(String path, NodeType nodeType, boolean 
failover) {
+    public void removeMasterNodePath(String path, NodeType nodeType, boolean 
failover) {
         logger.info("{} node deleted : {}", nodeType, path);
-        String failoverPath = getFailoverLockPath(nodeType);
+
+        if (StringUtils.isEmpty(path)) {
+            logger.error("server down error: empty path: {}, nodeType:{}", 
path, nodeType);
+            return;
+        }
+
+        String serverHost = registryClient.getHostByEventDataPath(path);
+        if (StringUtils.isEmpty(serverHost)) {
+            logger.error("server down error: unknown path: {}, nodeType:{}", 
path, nodeType);
+            return;
+        }
+
+        String failoverPath = getFailoverLockPath(nodeType, serverHost);
         try {
             registryClient.getLock(failoverPath);
 
+            if (!registryClient.exists(path)) {
+                logger.info("path: {} not exists", path);
+                // handle dead server
+                registryClient.handleDeadServer(Collections.singleton(path), 
nodeType, Constants.ADD_OP);
+            }
+
+            //failover server
+            if (failover) {
+                failoverServerWhenDown(serverHost, nodeType);
+            }
+        } catch (Exception e) {
+            logger.error("{} server failover failed, host:{}", nodeType, 
serverHost, e);
+        } finally {
+            registryClient.releaseLock(failoverPath);
+        }
+    }
+
+    /**
+     * remove worker node path
+     *
+     * @param path     node path
+     * @param nodeType node type
+     * @param failover is failover
+     */
+    public void removeWorkerNodePath(String path, NodeType nodeType, boolean 
failover) {
+        logger.info("{} node deleted : {}", nodeType, path);
+        try {
             String serverHost = null;
             if (!StringUtils.isEmpty(path)) {
                 serverHost = registryClient.getHostByEventDataPath(path);
@@ -169,18 +209,18 @@ public class MasterRegistryClient {
                     logger.error("server down error: unknown path: {}", path);
                     return;
                 }
-                // handle dead server
-                registryClient.handleDeadServer(Collections.singleton(path), 
nodeType, Constants.ADD_OP);
+                if (!registryClient.exists(path)) {
+                    logger.info("path: {} not exists", path);
+                    // handle dead server
+                    
registryClient.handleDeadServer(Collections.singleton(path), nodeType, 
Constants.ADD_OP);
+                }
             }
             //failover server
             if (failover) {
                 failoverServerWhenDown(serverHost, nodeType);
             }
         } catch (Exception e) {
-            logger.error("{} server failover failed.", nodeType);
-            logger.error("failover exception ", e);
-        } finally {
-            registryClient.releaseLock(failoverPath);
+            logger.error("{} server failover failed", nodeType, e);
         }
     }
 
@@ -209,12 +249,12 @@ public class MasterRegistryClient {
      * @param nodeType zookeeper node type
      * @return fail over lock path
      */
-    private String getFailoverLockPath(NodeType nodeType) {
+    public String getFailoverLockPath(NodeType nodeType, String host) {
         switch (nodeType) {
             case MASTER:
-                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
+                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS + "/" + host;
             case WORKER:
-                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
+                return 
Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS + "/" + host;
             default:
                 return "";
         }
@@ -226,7 +266,11 @@ public class MasterRegistryClient {
      * @param taskInstance task instance
      * @return true if task instance need fail over
      */
-    private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) {
+    private boolean checkTaskInstanceNeedFailover(List<Server> workerServers, 
TaskInstance taskInstance) {
+
+        // first submit: host is null
+        // dispatch succeed: host is not null &&  submit_time is null
+        // ACK || RESULT from worker: host is not null && start_time is not 
null
 
         boolean taskNeedFailover = true;
 
@@ -234,14 +278,15 @@ public class MasterRegistryClient {
         if (taskInstance.getHost() == null) {
             return false;
         }
-
-        // if the worker node exists in zookeeper, we must check the task 
starts after the worker
-        if (registryClient.checkNodeExists(taskInstance.getHost(), 
NodeType.WORKER)) {
-            //if task start after worker starts, there is no need to failover 
the task.
-            if (checkTaskAfterWorkerStart(taskInstance)) {
-                taskNeedFailover = false;
-            }
+        // host is not null and submit time is null, master will retry
+        if (taskInstance.getSubmitTime() == null) {
+            return false;
+        }
+        //if task start after worker starts, there is no need to failover the 
task.
+        if (checkTaskAfterWorkerStart(workerServers, taskInstance)) {
+            taskNeedFailover = false;
         }
+
         return taskNeedFailover;
     }
 
@@ -270,6 +315,54 @@ public class MasterRegistryClient {
     }
 
     /**
+     * check task start after the worker server starts.
+     *
+     * @param taskInstance task instance
+     * @return true if task instance start time after worker server start date
+     */
+    private boolean checkTaskAfterWorkerStart(List<Server> workerServers, 
TaskInstance taskInstance) {
+        if (StringUtils.isEmpty(taskInstance.getHost())) {
+            return false;
+        }
+
+        Date taskTime = taskInstance.getStartTime() == null ? 
taskInstance.getSubmitTime() : taskInstance.getStartTime();
+
+        Date workerServerStartDate = getServerStartupTime(workerServers, 
taskInstance.getHost());
+        if (workerServerStartDate != null) {
+            return taskTime.after(workerServerStartDate);
+        }
+        return false;
+    }
+
+    /**
+     * get server startup time
+     */
+    private Date getServerStartupTime(List<Server> servers, String host) {
+        if (CollectionUtils.isEmpty(servers)) {
+            return null;
+        }
+        Date serverStartupTime = null;
+        for (Server server : servers) {
+            if (host.equals(server.getHost() + Constants.COLON + 
server.getPort())) {
+                serverStartupTime = server.getCreateTime();
+                break;
+            }
+        }
+        return serverStartupTime;
+    }
+
+    /**
+     * get server startup time
+     */
+    private Date getServerStartupTime(NodeType nodeType, String host) {
+        if (StringUtils.isEmpty(host)) {
+            return null;
+        }
+        List<Server> servers = registryClient.getServerList(nodeType);
+        return getServerStartupTime(servers, host);
+    }
+
+    /**
      * failover worker tasks
      * <p>
      * 1. kill yarn job if there are yarn jobs in tasks.
@@ -279,10 +372,13 @@ public class MasterRegistryClient {
      * @param workerHost worker host
      */
     private void failoverWorker(String workerHost) {
+
         if (StringUtils.isEmpty(workerHost)) {
             return;
         }
 
+        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
+
         long startTime = System.currentTimeMillis();
         List<TaskInstance> needFailoverTaskInstanceList = 
processService.queryNeedFailoverTaskInstances(workerHost);
         Map<Integer, ProcessInstance> processInstanceCacheMap = new 
HashMap<>();
@@ -300,11 +396,17 @@ public class MasterRegistryClient {
                 processInstanceCacheMap.put(processInstance.getId(), 
processInstance);
             }
 
+            if (!checkTaskInstanceNeedFailover(workerServers, taskInstance)) {
+                continue;
+            }
+
             // only failover the task owned myself if worker down.
-            if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) 
{
-                logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
-                failoverTaskInstance(processInstance, taskInstance);
+            if 
(!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
+                continue;
             }
+
+            logger.info("failover task instance id: {}, process instance id: 
{}", taskInstance.getId(), taskInstance.getProcessInstanceId());
+            failoverTaskInstance(processInstance, taskInstance);
         }
         logger.info("end worker[{}] failover, useTime:{}ms", workerHost, 
System.currentTimeMillis() - startTime);
     }
@@ -316,11 +418,15 @@ public class MasterRegistryClient {
      *
      * @param masterHost master host
      */
-    private void failoverMaster(String masterHost) {
+    public void failoverMaster(String masterHost) {
+
         if (StringUtils.isEmpty(masterHost)) {
             return;
         }
 
+        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, 
masterHost);
+        List<Server> workerServers = 
registryClient.getServerList(NodeType.WORKER);
+
         long startTime = System.currentTimeMillis();
         List<ProcessInstance> needFailoverProcessInstanceList = 
processService.queryNeedFailoverProcessInstances(masterHost);
         logger.info("start master[{}] failover, process list size:{}", 
masterHost, needFailoverProcessInstanceList.size());
@@ -330,6 +436,11 @@ public class MasterRegistryClient {
                 continue;
             }
 
+            if (serverStartupTime != null && processInstance.getRestartTime() 
!= null
+                    && 
processInstance.getRestartTime().after(serverStartupTime)) {
+                continue;
+            }
+
             logger.info("failover process instance id: {}", 
processInstance.getId());
 
             List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
@@ -337,6 +448,12 @@ public class MasterRegistryClient {
                 if (Constants.NULL.equals(taskInstance.getHost())) {
                     continue;
                 }
+                if (taskInstance.getState().typeIsFinished()) {
+                    continue;
+                }
+                if (!checkTaskInstanceNeedFailover(workerServers, 
taskInstance)) {
+                    continue;
+                }
                 logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
                 failoverTaskInstance(processInstance, taskInstance);
             }
@@ -347,6 +464,13 @@ public class MasterRegistryClient {
         logger.info("master[{}] failover end, useTime:{}ms", masterHost, 
System.currentTimeMillis() - startTime);
     }
 
+    /**
+     * failover task instance
+     * <p>
+     * 1. kill yarn job if there are yarn jobs in tasks.
+     * 2. change task state from running to need failover.
+     * 3. try to notify local master
+     */
     private void failoverTaskInstance(ProcessInstance processInstance, 
TaskInstance taskInstance) {
         if (taskInstance == null) {
             logger.error("failover task instance error, taskInstance is null");
@@ -359,24 +483,23 @@ public class MasterRegistryClient {
             return;
         }
 
-        if (!checkTaskInstanceNeedFailover(taskInstance)) {
-            return;
-        }
-
         taskInstance.setProcessInstance(processInstance);
         TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildProcessInstanceRelatedInfo(processInstance)
                 .create();
 
-        // only kill yarn job if exists , the local thread has exited
-        ProcessUtils.killYarnJob(taskExecutionContext);
+        if (masterConfig.getMasterKillYarnJobWhenHandleFailOver()) {
+            // only kill yarn job if exists , the local thread has exited
+            ProcessUtils.killYarnJob(taskExecutionContext);
+        }
 
         taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
         processService.saveTaskInstance(taskInstance);
 
         WorkflowExecuteThread workflowExecuteThreadNotify = 
processInstanceExecMaps.get(processInstance.getId());
         if (workflowExecuteThreadNotify == null) {
+            logger.info("workflowExecuteThreadNotify is null, just return, 
task id:{},process id:{}", taskInstance.getId(), processInstance.getId());
             return;
         }
         StateEvent stateEvent = new StateEvent();
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
index cb5b6be..361f09f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryDataListener.java
@@ -63,7 +63,7 @@ public class MasterRegistryDataListener implements 
SubscribeListener {
                 logger.info("master node added : {}", path);
                 break;
             case REMOVE:
-                masterRegistryClient.removeNodePath(path, NodeType.MASTER, 
true);
+                masterRegistryClient.removeMasterNodePath(path, 
NodeType.MASTER, true);
                 break;
             default:
                 break;
@@ -78,7 +78,7 @@ public class MasterRegistryDataListener implements 
SubscribeListener {
                 break;
             case REMOVE:
                 logger.info("worker node deleted : {}", path);
-                masterRegistryClient.removeNodePath(path, NodeType.WORKER, 
true);
+                masterRegistryClient.removeWorkerNodePath(path, 
NodeType.WORKER, true);
                 break;
             default:
                 break;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index b7e904b..6b30a1a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import javax.annotation.PreDestroy;
 
@@ -387,7 +388,8 @@ public class ServerNodeManager implements InitializingBean {
             workerGroup = workerGroup.toLowerCase();
             Set<String> nodes = workerGroupNodes.get(workerGroup);
             if (CollectionUtils.isNotEmpty(nodes)) {
-                return Collections.unmodifiableSet(nodes);
+                // avoid ConcurrentModificationException
+                return 
Collections.unmodifiableSet(nodes.stream().collect(Collectors.toSet()));
             }
             return nodes;
         } finally {
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
new file mode 100644
index 0000000..81d02a9
--- /dev/null
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.apache.dolphinscheduler.service.registry.RegistryClient;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class FailoverExecuteThread extends Thread {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(FailoverExecuteThread.class);
+
+    @Autowired
+    private MasterRegistryClient masterRegistryClient;
+
+    @Autowired
+    private RegistryClient registryClient;
+
+    @Autowired
+    private MasterConfig masterConfig;
+
+    /**
+     * process service
+     */
+    @Autowired
+    private ProcessService processService;
+
+    @Override
+    public synchronized void start() {
+        super.setName("FailoverExecuteThread");
+        super.start();
+    }
+
+    @Override
+    public void run() {
+        while (Stopper.isRunning()) {
+            logger.info("failover execute started");
+            try {
+                List<String> hosts = 
processService.queryNeedFailoverProcessInstanceHost();
+                if (CollectionUtils.isEmpty(hosts)) {
+                    continue;
+                }
+                for (String host : hosts) {
+                    String failoverPath = 
masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
+                    try {
+                        registryClient.getLock(failoverPath);
+                        masterRegistryClient.failoverMaster(host);
+                    } catch (Exception e) {
+                        logger.error("{} server failover failed, host:{}", 
NodeType.MASTER, host, e);
+                    } finally {
+                        registryClient.releaseLock(failoverPath);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("failover execute error", e);
+            } finally {
+                ThreadUtils.sleep((long) Constants.SLEEP_TIME_MILLIS * 
masterConfig.getFailoverInterval() * 60);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
index ca44ead..33c84b3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java
@@ -234,6 +234,7 @@ public class MasterSchedulerService extends Thread {
             if (ServerNodeManager.MASTER_SIZE == 0) {
                 return null;
             }
+            logger.debug("master size:{}",ServerNodeManager.MASTER_SIZE);
             List<Command> commandList = 
processService.findCommandPage(ServerNodeManager.MASTER_SIZE, pageNumber);
             if (commandList.size() == 0) {
                 return null;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 640abb7..bc6159d 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -409,9 +409,10 @@ public class WorkflowExecuteThread implements Runnable {
 
     private boolean checkStateEvent(StateEvent stateEvent) {
         if (this.processInstance.getId() != stateEvent.getProcessInstanceId()) 
{
-            logger.error("mismatch process instance id: {}, state event:{}",
+            logger.error("mismatch process instance id: {}, state event:{}, 
task instance id:{}",
                     this.processInstance.getId(),
-                    stateEvent.toString());
+                    stateEvent.toString(),
+                    stateEvent.getTaskInstanceId());
             return false;
         }
         return true;
@@ -482,6 +483,7 @@ public class WorkflowExecuteThread implements Runnable {
                 processDefinition.getGlobalParamList(),
                 CommandType.COMPLEMENT_DATA, 
processInstance.getScheduleTime()));
         processInstance.setStartTime(new Date());
+        processInstance.setRestartTime(processInstance.getStartTime());
         processInstance.setEndTime(null);
         processService.saveProcessInstance(processInstance);
         this.taskInstanceHashMap.clear();
@@ -876,11 +878,11 @@ public class WorkflowExecuteThread implements Runnable {
             }
 
             if 
(completeTaskList.containsKey(Long.toString(task.getTaskCode()))) {
-                logger.info("task {} has already run success", task.getName());
+                logger.info("task {} has already run success, task id:{}", 
task.getName(), task.getId());
                 continue;
             }
             if (task.getState().typeIsPause() || 
task.getState().typeIsCancel()) {
-                logger.info("task {} stopped, the state is {}", 
task.getName(), task.getState());
+                logger.info("task {} stopped, the state is {}, task id:{}", 
task.getName(), task.getState(), task.getId());
             } else {
                 addTaskToStandByList(task);
             }
@@ -1167,13 +1169,13 @@ public class WorkflowExecuteThread implements Runnable {
      * @param taskInstance task instance
      */
     private void addTaskToStandByList(TaskInstance taskInstance) {
-        logger.info("add task to stand by list: {}", taskInstance.getName());
+        logger.info("add task to stand by list, task name: {} , task id:{}", 
taskInstance.getName(), taskInstance.getId());
         try {
             if (!readyToSubmitTaskQueue.contains(taskInstance)) {
                 readyToSubmitTaskQueue.put(taskInstance);
             }
         } catch (Exception e) {
-            logger.error("add task instance to readyToSubmitTaskQueue error, 
taskName: {}", taskInstance.getName(), e);
+            logger.error("add task instance to readyToSubmitTaskQueue, 
taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e);
         }
     }
 
@@ -1253,7 +1255,7 @@ public class WorkflowExecuteThread implements Runnable {
                     TaskInstance retryTask = 
processService.findTaskInstanceById(task.getId());
                     if (retryTask != null && 
retryTask.getState().equals(ExecutionStatus.FORCED_SUCCESS)) {
                         task.setState(retryTask.getState());
-                        logger.info("task: {} has been forced success, put it 
into complete task list and stop retrying", task.getName());
+                        logger.info("task name: {} has been forced success, 
put it into complete task list and stop retrying, task id:{}", task.getName(), 
task.getId());
                         removeTaskFromStandbyList(task);
                         
completeTaskList.put(Long.toString(task.getTaskCode()), task);
                         submitPostNode(Long.toString(task.getTaskCode()));
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index ee1c548..23988f9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -70,8 +70,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
         if (this.taskInstance == null) {
             return false;
         }
-        dispatchTask(taskInstance, processInstance);
-        return true;
+        return dispatchTask(taskInstance, processInstance);
     }
 
     @Override
@@ -127,7 +126,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
             taskPriority.setTaskExecutionContext(taskExecutionContext);
 
             taskUpdateQueue.put(taskPriority);
-            logger.info(String.format("master submit success, task : %s", 
taskInstance.getName()));
+            logger.info("master submit success, task id:{}, task name:{}, 
process id:{}",
+                    taskInstance.getId(), taskInstance.getName(), 
taskInstance.getProcessInstanceId());
             return true;
         } catch (Exception e) {
             logger.error("submit task  Exception: ", e);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 66f4e53..c66718f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -21,7 +21,6 @@ import static 
org.apache.dolphinscheduler.common.Constants.SPRING_DATASOURCE_DRI
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
-import org.apache.dolphinscheduler.common.enums.NodeType;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.utils.PropertyUtils;
 import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@@ -29,12 +28,19 @@ import 
org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.plugin.TaskPluginManager;
-import org.apache.dolphinscheduler.server.worker.processor.*;
+import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
+import 
org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
+import 
org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
+import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import 
org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
 import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
 import org.apache.dolphinscheduler.service.alert.AlertClientService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import javax.annotation.PostConstruct;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -45,9 +51,6 @@ import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.FilterType;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 
-import javax.annotation.PostConstruct;
-import java.util.Set;
-
 /**
  * worker server
  */
@@ -146,14 +149,14 @@ public class WorkerServer implements IStoppable {
         try {
             this.workerRegistryClient.registry();
             this.workerRegistryClient.setRegistryStoppable(this);
-            Set<String> workerZkPaths = 
this.workerRegistryClient.getWorkerZkPaths();
-
-            this.workerRegistryClient.handleDeadServer(workerZkPaths, 
NodeType.WORKER, Constants.DELETE_OP);
         } catch (Exception e) {
-            logger.error(e.getMessage(), e);
+            logger.error("worker registry error", e);
             throw new RuntimeException(e);
         }
 
+        // solve dead lock
+        
logger.info(org.apache.dolphinscheduler.spi.utils.PropertyUtils.dumpProperties());
+
         // task execute manager
         this.workerManagerThread.start();
 
@@ -194,8 +197,17 @@ public class WorkerServer implements IStoppable {
             this.workerRegistryClient.unRegistry();
             this.alertClientService.close();
             this.springApplicationContext.close();
+            logger.info("springApplicationContext close");
         } catch (Exception e) {
             logger.error("worker server stop exception ", e);
+        } finally {
+            try {
+                // thread sleep 60 seconds for quietly stop
+                Thread.sleep(60000L);
+            } catch (Exception e) {
+                logger.warn("thread sleep exception ", e);
+            }
+            System.exit(1);
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index a3feb77..57119a7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -65,6 +65,17 @@ public class WorkerConfig {
     @Value("${task.plugin.binding:}")
     private String taskPluginBinding;
 
+    @Value("${worker.retry.report.task.statues.interval:10}")
+    private int retryReportTaskStatusInterval;
+
+    public int getRetryReportTaskStatusInterval() {
+        return retryReportTaskStatusInterval;
+    }
+
+    public void setRetryReportTaskStatusInterval(int 
retryReportTaskStatusInterval) {
+        this.retryReportTaskStatusInterval = retryReportTaskStatusInterval;
+    }
+
     public int getListenPort() {
         return listenPort;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
index a340ad7..3aac840 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskAckProcessor.java
@@ -17,17 +17,21 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.remote.command.*;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.channel.Channel;
+
 /**
  *  db task ack processor
  */
@@ -50,6 +54,7 @@ public class DBTaskAckProcessor implements 
NettyRequestProcessor {
 
         if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()){
             
ResponceCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
+            logger.debug("removeAckCache: taskinstance id:{}", 
taskAckCommand.getTaskInstanceId());
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
index 97a9cf5..6da9fdd 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/DBTaskResponseProcessor.java
@@ -17,7 +17,6 @@
 
 package org.apache.dolphinscheduler.server.worker.processor;
 
-import io.netty.channel.Channel;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
@@ -25,11 +24,14 @@ import 
org.apache.dolphinscheduler.remote.command.CommandType;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.channel.Channel;
+
 /**
  *  db task response processor
  */
@@ -51,6 +53,7 @@ public class DBTaskResponseProcessor implements 
NettyRequestProcessor {
 
         if (taskResponseCommand.getStatus() == 
ExecutionStatus.SUCCESS.getCode()){
             
ResponceCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
+            logger.debug("removeResponseCache: taskinstance id:{}", 
taskResponseCommand.getTaskInstanceId());
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index 54ec49c..326fcb2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -20,11 +20,14 @@ package org.apache.dolphinscheduler.server.worker.registry;
 import static 
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
 import static 
org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
 import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.IStoppable;
 import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.registry.api.ConnectionState;
 import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
 import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
@@ -104,6 +107,14 @@ public class WorkerRegistryClient {
             logger.info("worker node : {} registry to ZK {} successfully", 
address, workerZKPath);
         }
 
+        while (!this.checkNodeExists()) {
+            ThreadUtils.sleep(SLEEP_TIME_MILLIS);
+        }
+
+        this.handleDeadServer(workerZkPaths, NodeType.WORKER, 
Constants.DELETE_OP);
+
+        registryClient.addConnectionStateListener(this::handleConnectionState);
+
         HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
                 workerConfig.getWorkerMaxCpuloadAvg(),
                 workerConfig.getWorkerReservedMemory(),
@@ -119,6 +130,32 @@ public class WorkerRegistryClient {
         logger.info("worker node : {} heartbeat interval {} s", address, 
workerHeartbeatInterval);
     }
 
+    public void handleConnectionState(ConnectionState state) {
+        switch (state) {
+            case CONNECTED:
+                logger.info("registry connection state is {}", state);
+                break;
+            case SUSPENDED:
+                logger.info("registry connection state is {}, ready to stop 
myself", state);
+                registryClient.getStoppable().stop("registry connection state 
is SUSPENDED, stop myself");
+                break;
+            case RECONNECTED:
+                logger.info("registry connection state is {}, clean the node 
info", state);
+                String address = 
NetUtils.getAddr(workerConfig.getListenPort());
+                Set<String> workerZkPaths = getWorkerZkPaths();
+                for (String workerZKPath : workerZkPaths) {
+                    registryClient.persistEphemeral(workerZKPath, "");
+                    logger.info("worker node : {} reconnect to ZK {} 
successfully", address, workerZKPath);
+                }
+                break;
+            case DISCONNECTED:
+                logger.info("registry connection state is {}, ready to stop 
myself", state);
+                registryClient.getStoppable().stop("registry connection state 
is DISCONNECTED, stop myself");
+                break;
+            default:
+        }
+    }
+
     /**
      * remove registry info
      */
@@ -177,4 +214,11 @@ public class WorkerRegistryClient {
         registryClient.setStoppable(stoppable);
     }
 
+    public boolean checkNodeExists() {
+        boolean result = registryClient.checkNodeExists(NetUtils.getHost(), 
NodeType.WORKER);
+        if (result) {
+            logger.info("check worker, node exist success, host:{}", 
NetUtils.getHost());
+        }
+        return result;
+    }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
index b2d0031..f52be9d 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java
@@ -18,18 +18,20 @@
 package org.apache.dolphinscheduler.server.worker.runner;
 
 import org.apache.dolphinscheduler.common.thread.Stopper;
-
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+
+import java.util.Map;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.Map;
-
 /**
  * Retry Report Task Status Thread
  */
@@ -38,10 +40,8 @@ public class RetryReportTaskStatusThread implements Runnable 
{
 
     private final Logger logger = 
LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
 
-    /**
-     * every 5 minutes
-     */
-    private static long RETRY_REPORT_TASK_STATUS_INTERVAL = 5 * 60 * 1000L;
+    @Autowired
+    WorkerConfig workerConfig;
 
     /**
      *  task callback service
@@ -68,7 +68,7 @@ public class RetryReportTaskStatusThread implements Runnable {
         while (Stopper.isRunning()){
 
             // sleep 5 minutes
-            ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
+            ThreadUtils.sleep(workerConfig.getRetryReportTaskStatusInterval() 
* 1000);
 
             try {
                 if (!responceCache.getAckCache().isEmpty()){
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
index 65d6b89..7148105 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClientTest.java
@@ -114,9 +114,9 @@ public class MasterRegistryClientTest {
 
     @Test
     public void removeNodePathTest() {
-        masterRegistryClient.removeNodePath("/path", NodeType.MASTER, false);
-        masterRegistryClient.removeNodePath("/path", NodeType.MASTER, true);
+        masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, 
false);
+        masterRegistryClient.removeMasterNodePath("/path", NodeType.MASTER, 
true);
         //Cannot mock static methods
-        masterRegistryClient.removeNodePath("/path", NodeType.WORKER, true);
+        masterRegistryClient.removeWorkerNodePath("/path", NodeType.WORKER, 
true);
     }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index df7549c..cc968f9 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.service.process;
 
+import static java.util.stream.Collectors.toSet;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
@@ -27,8 +28,6 @@ import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
 import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
 
-import static java.util.stream.Collectors.toSet;
-
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -579,6 +578,7 @@ public class ProcessService {
         processInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
         processInstance.setRecovery(Flag.NO);
         processInstance.setStartTime(new Date());
+        processInstance.setRestartTime(processInstance.getStartTime());
         processInstance.setRunTimes(1);
         processInstance.setMaxTryTimes(0);
         processInstance.setCommandParam(command.getCommandParam());
@@ -775,6 +775,7 @@ public class ProcessService {
             processInstance.setScheduleTime(command.getScheduleTime());
         }
         processInstance.setHost(host);
+        processInstance.setRestartTime(new Date());
         ExecutionStatus runStatus = ExecutionStatus.RUNNING_EXECUTION;
         int runTime = processInstance.getRunTimes();
         switch (commandType) {
@@ -844,6 +845,7 @@ public class ProcessService {
                     updateTaskInstance(taskInstance);
                 }
                 processInstance.setStartTime(new Date());
+                processInstance.setRestartTime(processInstance.getStartTime());
                 processInstance.setEndTime(null);
                 processInstance.setRunTimes(runTime + 1);
                 initComplementDataParam(processDefinition, processInstance, 
cmdParam);
@@ -1015,6 +1017,7 @@ public class ProcessService {
         }
         taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
         updateTaskInstance(taskInstance);
+        logger.debug("update task instance, task instance id:{}", 
taskInstance.getId());
     }
 
     /**
@@ -1317,9 +1320,6 @@ public class ProcessService {
         taskInstance.setExecutorId(processInstance.getExecutorId());
         
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
         taskInstance.setState(getSubmitTaskState(taskInstance, 
processInstanceState));
-        if (taskInstance.getSubmitTime() == null) {
-            taskInstance.setSubmitTime(new Date());
-        }
         if (taskInstance.getFirstSubmitTime() == null) {
             taskInstance.setFirstSubmitTime(taskInstance.getSubmitTime());
         }
@@ -1435,6 +1435,11 @@ public class ProcessService {
         }
     }
 
+    public boolean updateHostAndSubmitTimeById(int id, String host, Date date) 
{
+        int count = taskInstanceMapper.updateHostAndSubmitTimeById(id, host, 
date);
+        return count > 0;
+    }
+
     /**
      * insert task instance
      *
@@ -1454,6 +1459,7 @@ public class ProcessService {
      */
     public boolean updateTaskInstance(TaskInstance taskInstance) {
         int count = taskInstanceMapper.updateById(taskInstance);
+        logger.debug("updateTaskInstance, task instance id:{}, state;{}", 
taskInstance.getId(), taskInstance.getState());
         return count > 0;
     }
 
@@ -1691,8 +1697,9 @@ public class ProcessService {
      * @param executePath executePath
      * @param logPath logPath
      * @param taskInstId taskInstId
+     * @reutrn
      */
-    public void changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state, Date startTime, String host,
+    public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state, Date startTime, String host,
                                 String executePath,
                                 String logPath,
                                 int taskInstId) {
@@ -1701,7 +1708,7 @@ public class ProcessService {
         taskInstance.setHost(host);
         taskInstance.setExecutePath(executePath);
         taskInstance.setLogPath(logPath);
-        saveTaskInstance(taskInstance);
+        return saveTaskInstance(taskInstance);
     }
 
     /**
@@ -1721,8 +1728,9 @@ public class ProcessService {
      * @param endTime endTime
      * @param taskInstId taskInstId
      * @param varPool varPool
+     * @return
      */
-    public void changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state,
+    public boolean changeTaskState(TaskInstance taskInstance, ExecutionStatus 
state,
                                 Date endTime,
                                 int processId,
                                 String appIds,
@@ -1734,7 +1742,7 @@ public class ProcessService {
         taskInstance.setEndTime(endTime);
         taskInstance.setVarPool(varPool);
         changeOutParam(taskInstance);
-        saveTaskInstance(taskInstance);
+        return saveTaskInstance(taskInstance);
     }
 
     /**
@@ -1819,6 +1827,10 @@ public class ProcessService {
         return processInstanceMapper.queryByHostAndStatus(host, stateArray);
     }
 
+    public List<String> queryNeedFailoverProcessInstanceHost() {
+        return 
processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
+    }
+
     /**
      * process need failover process instance
      *
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 622206a..86d73b6 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -17,39 +17,6 @@
 
 package org.apache.dolphinscheduler.service.quartz;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.PropertyUtils;
-import org.apache.dolphinscheduler.dao.entity.Schedule;
-import org.apache.dolphinscheduler.service.exceptions.ServiceException;
-import org.quartz.CronTrigger;
-import org.quartz.Job;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.TriggerKey;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.jdbcjobstore.JobStoreTX;
-import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
-import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
-import org.quartz.impl.matchers.GroupMatcher;
-import org.quartz.simpl.SimpleThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK;
@@ -61,6 +28,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_I
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES;
+import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME;
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON;
@@ -70,6 +38,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL
 import static 
org.apache.dolphinscheduler.common.Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY;
 import static org.apache.dolphinscheduler.common.Constants.PROJECT_ID;
 import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_ACQUIRETRIGGERSWITHINLOCK;
+import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT;
 import static 
org.apache.dolphinscheduler.common.Constants.QUARTZ_CLUSTERCHECKININTERVAL;
 import static org.apache.dolphinscheduler.common.Constants.QUARTZ_DATASOURCE;
 import static org.apache.dolphinscheduler.common.Constants.QUARTZ_INSTANCEID;
@@ -91,6 +60,42 @@ import static org.quartz.CronScheduleBuilder.cronSchedule;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.TriggerBuilder.newTrigger;
 
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.PropertyUtils;
+import org.apache.dolphinscheduler.dao.entity.Schedule;
+import org.apache.dolphinscheduler.service.exceptions.ServiceException;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.quartz.CronTrigger;
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.TriggerKey;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.jdbcjobstore.JobStoreTX;
+import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
+import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.quartz.simpl.SimpleThreadPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * single Quartz executors instance
  */
@@ -170,6 +175,8 @@ public class QuartzExecutors {
             
properties.setProperty(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, 
conf.getString(ORG_QUARTZ_JOBSTORE_ACQUIRETRIGGERSWITHINLOCK, 
QUARTZ_ACQUIRETRIGGERSWITHINLOCK));
             properties.setProperty(ORG_QUARTZ_JOBSTORE_DATASOURCE, 
conf.getString(ORG_QUARTZ_JOBSTORE_DATASOURCE, QUARTZ_DATASOURCE));
             
properties.setProperty(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, 
conf.getString(ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS, 
HikariConnectionProvider.class.getName()));
+            
properties.setProperty(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT,
+                                        
conf.getString(ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT, 
QUARTZ_BATCHTRIGGERACQUISTITIONMAXCOUNT));
 
             schedulerFactory.initialize(properties);
             scheduler = schedulerFactory.getScheduler();
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
index 8d630be..d9168d4 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
@@ -32,7 +32,7 @@ public class TaskPriorityQueueImpl implements 
TaskPriorityQueue<TaskPriority> {
     /**
      * queue size
      */
-    private static final Integer QUEUE_MAX_SIZE = 3000;
+    private static final Integer QUEUE_MAX_SIZE = 10000;
 
     /**
      * queue
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index f384678..775fc10 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -330,7 +330,7 @@ public class RegistryClient {
             if (serverPath.startsWith(serverType + UNDERLINE + host)) {
                 String server = REGISTRY_DOLPHINSCHEDULER_DEAD_SERVERS + 
SINGLE_SLASH + serverPath;
                 remove(server);
-                logger.info("{} server {} deleted from zk dead server path 
success", serverType, host);
+                logger.info("{} server {} deleted from zk dead server path:{} 
success", serverType, host,server);
             }
         }
     }
diff --git 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
index 2b1adff..8a4068c 100644
--- 
a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
+++ 
b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/utils/PropertyUtils.java
@@ -181,4 +181,7 @@ public class PropertyUtils {
         properties.setProperty(key, value);
     }
 
+    public static String dumpProperties() {
+        return properties.toString();
+    }
 }

Reply via email to