This is an automated email from the ASF dual-hosted git repository.

caishunfeng pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
     new f8942bf  [Improvement][MasterServer] event response handle parallel  
(#7560)
f8942bf is described below

commit f8942bf7982c832bc13a0911877fa8198e716b55
Author: zwZjut <[email protected]>
AuthorDate: Fri Dec 24 10:39:52 2021 +0800

    [Improvement][MasterServer] event response handle parallel  (#7560)
    
    * [Feature][dolphinscheduler-api] parse traceId in http header for Cross 
system delivery to #7237 (#7238)
    
    * to #7237
    
    * rerun test
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * chery-pick 05aef27 and handle conflicts
    
    * to #7065: fix ExecutorService and schedulerService (#7072)
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * [Feature][dolphinscheduler-api] access control of taskDefinition and 
taskInstance in project to #7081  (#7082)
    
    * to #7081
    
    * fix #7081
    
    * to #7081
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * chery-pick 8ebe060 and handle conflicts
    
    * cherry-pick 1f18444 and handle conflicts
    
    * fix #6807: dolphinscheduler.zookeeper.env_vars - > 
dolphinscheduler.registry.env_vars (#6808)
    
    Co-authored-by: honghuo.zw <[email protected]>
    Co-authored-by: Kirs <[email protected]>
    
    * add default constructor (#6780)
    
    Co-authored-by: honghuo.zw <[email protected]>
    
    * to #7108 (#7109)
    
    * to #7450
    
    * to #7450: fix parallel bug
    
    * add index
    
    * expose config to user
    
    * fix bug
    
    * fix bug
    
    * add delay delete
    
    * fix bug
    
    * add License
    
    * fix ut
    
    * fix ut
    
    * fix name
    
    Co-authored-by: honghuo.zw <[email protected]>
    Co-authored-by: Kirs <[email protected]>
---
 .../conf/dolphinscheduler/master.properties.tpl    |   4 +-
 docker/kubernetes/dolphinscheduler/values.yaml     |   1 +
 .../main/resources/sql/dolphinscheduler_mysql.sql  |   3 +-
 .../resources/sql/dolphinscheduler_postgresql.sql  |   2 +
 .../2.0.2_schema/mysql/dolphinscheduler_ddl.sql    |  24 ++-
 .../postgresql/dolphinscheduler_ddl.sql            |   3 +
 .../server/master/config/MasterConfig.java         |  13 +-
 ...Service.java => TaskResponsePersistThread.java} | 152 ++++++---------
 .../processor/queue/TaskResponseService.java       | 205 ++++++++++++---------
 .../master/registry/MasterRegistryClient.java      |  18 +-
 .../master/runner/FailoverExecuteThread.java       |  21 ++-
 .../processor/queue/TaskResponseServiceTest.java   |   7 +-
 12 files changed, 252 insertions(+), 201 deletions(-)

diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl 
b/docker/build/conf/dolphinscheduler/master.properties.tpl
index 98ca3dd..5d130fa 100644
--- a/docker/build/conf/dolphinscheduler/master.properties.tpl
+++ b/docker/build/conf/dolphinscheduler/master.properties.tpl
@@ -47,4 +47,6 @@ master.reserved.memory=${MASTER_RESERVED_MEMORY}
 # master failover interval minutes
 master.failover.interval=${MASTER_FAILOVER_INTERVAL}
 # master kill yarn job when handle failover
-master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
\ No newline at end of file
+master.kill.yarn.job.when.handle.failover=${MASTER_KILL_YARN_JOB_WHEN_HANDLE_FAILOVER}
+# master.persist.event.state.threads
+master.persist.event.state.threads=${MASTER_PERSIST_EVENT_STATE_THREADS}
\ No newline at end of file
diff --git a/docker/kubernetes/dolphinscheduler/values.yaml 
b/docker/kubernetes/dolphinscheduler/values.yaml
index f0df9e7..52431f9 100644
--- a/docker/kubernetes/dolphinscheduler/values.yaml
+++ b/docker/kubernetes/dolphinscheduler/values.yaml
@@ -166,6 +166,7 @@ master:
     ORG_QUARTZ_THREADPOOL_THREADCOUNT: "25"
     ORG_QUARTZ_SCHEDULER_BATCHTRIGGERACQUISTITIONMAXCOUNT: "1"
     SESSION_TIMEOUT_MS: 60000
+    MASTER_PERSIST_EVENT_STATE_THREADS: 10
   ## Periodic probe of container liveness. Container will be restarted if the 
probe fails. Cannot be updated.
   ## More info: 
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes
   livenessProbe:
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
index 9e52d60..8e7401b 100644
--- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
+++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
@@ -532,7 +532,8 @@ CREATE TABLE `t_ds_process_task_relation` (
   `condition_params` text COMMENT 'condition params(json)',
   `create_time` datetime NOT NULL COMMENT 'create time',
   `update_time` datetime NOT NULL COMMENT 'update time',
-  PRIMARY KEY (`id`)
+  PRIMARY KEY (`id`),
+  KEY `project_code_process_definition_code_index` 
(`project_code`,`process_definition_code`) USING BTREE
 ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
 
 -- ----------------------------
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
index 5c02006..7bb8a9b 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
@@ -440,6 +440,8 @@ CREATE TABLE t_ds_process_task_relation (
   PRIMARY KEY (id)
 ) ;
 
+create index project_code_process_definition_code_index on 
t_ds_process_task_relation (project_code,process_definition_code);
+
 DROP TABLE IF EXISTS t_ds_process_task_relation_log;
 CREATE TABLE t_ds_process_task_relation_log (
   id int NOT NULL  ,
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
index 2525b3c..89d5c53 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/mysql/dolphinscheduler_ddl.sql
@@ -17,6 +17,7 @@
 
 SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
 
+
 -- uc_dolphin_T_t_ds_process_instance_A_restart_time
 drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_restart_time;
 delimiter d//
@@ -35,4 +36,25 @@ d//
 
 delimiter ;
 CALL uc_dolphin_T_t_ds_process_instance_A_restart_time();
-DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
\ No newline at end of file
+DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_restart_time;
+
+
+-- uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index()
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.STATISTICS
+           WHERE TABLE_NAME='t_ds_process_task_relation'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND INDEX_NAME ='project_code_process_definition_code_index')
+   THEN
+ALTER TABLE `t_ds_process_task_relation` ADD KEY 
`project_code_process_definition_code_index`(`project_code`,`process_definition_code`)
 USING BTREE;
+END IF;
+END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index();
+DROP PROCEDURE uc_dolphin_T_t_ds_process_task_relation_A_pc_pd_index;
\ No newline at end of file
diff --git 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
index d26cf8e..75be01f 100644
--- 
a/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
+++ 
b/dolphinscheduler-dao/src/main/resources/sql/upgrade/2.0.2_schema/postgresql/dolphinscheduler_ddl.sql
@@ -29,6 +29,9 @@ BEGIN
     v_schema =current_schema();
 
 EXECUTE 'ALTER TABLE ' || quote_ident(v_schema) ||'.t_ds_process_instance ADD 
COLUMN IF NOT EXISTS "restart_time" timestamp DEFAULT NULL';
+
+EXECUTE 'CREATE INDEX IF NOT EXISTS project_code_process_definition_code_index 
ON ' || quote_ident(v_schema) ||'.t_ds_process_task_relation USING 
Btree("project_code","process_definition_code")';
+
 return 'Success!';
 exception when others then
                ---Raise EXCEPTION '(%)',SQLERRM;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 13a68c4..b7e5642 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -66,9 +66,12 @@ public class MasterConfig {
     @Value("${master.failover.interval:10}")
     private int failoverInterval;
 
-    @Value("${master.kill.yarn.job.when.handle.fail.over:true}")
+    @Value("${master.kill.yarn.job.when.handle.failover:true}")
     private boolean masterKillYarnJobWhenHandleFailOver;
 
+    @Value("${master.persist.event.state.threads:10}")
+    private int masterPersistEventStateThreads;
+
     public int getListenPort() {
         return listenPort;
     }
@@ -183,4 +186,12 @@ public class MasterConfig {
     public void setMasterKillYarnJobWhenHandleFailOver(boolean 
masterKillYarnJobWhenHandleFailOver) {
         this.masterKillYarnJobWhenHandleFailOver = 
masterKillYarnJobWhenHandleFailOver;
     }
+
+    public int getMasterPersistEventStateThreads() {
+        return masterPersistEventStateThreads;
+    }
+
+    public void setMasterPersistEventStateThreads(int 
masterPersistEventStateThreads) {
+        this.masterPersistEventStateThreads = masterPersistEventStateThreads;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
similarity index 69%
copy from 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
copy to 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
index a320a70..621dd79 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java
@@ -21,123 +21,60 @@ import org.apache.dolphinscheduler.common.enums.Event;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.StateEvent;
 import org.apache.dolphinscheduler.common.enums.StateEventType;
-import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
 import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
 import io.netty.channel.Channel;
 
-/**
- * task manager
- */
-@Component
-public class TaskResponseService {
+public class TaskResponsePersistThread implements Runnable {
 
     /**
-     * logger
+     * logger of TaskResponsePersistThread
      */
-    private final Logger logger = 
LoggerFactory.getLogger(TaskResponseService.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(TaskResponsePersistThread.class);
 
-    /**
-     * attemptQueue
-     */
-    private final BlockingQueue<TaskResponseEvent> eventQueue = new 
LinkedBlockingQueue<>();
+    private final ConcurrentLinkedQueue<TaskResponseEvent>  events = new 
ConcurrentLinkedQueue<>();
+
+    private final Integer processInstanceId;
 
     /**
      * process service
      */
-    @Autowired
     private ProcessService processService;
 
-    /**
-     * task response worker
-     */
-    private Thread taskResponseWorker;
-
     private ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceMapper;
 
-    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceMapper) {
-        if (this.processInstanceMapper == null) {
-            this.processInstanceMapper = processInstanceMapper;
-        }
-    }
-
-    @PostConstruct
-    public void start() {
-        this.taskResponseWorker = new TaskResponseWorker();
-        this.taskResponseWorker.setName("StateEventResponseWorker");
-        this.taskResponseWorker.start();
+    public TaskResponsePersistThread(ProcessService processService,
+                                     ConcurrentHashMap<Integer, 
WorkflowExecuteThread> processInstanceMapper,
+                                     Integer processInstanceId) {
+        this.processService = processService;
+        this.processInstanceMapper = processInstanceMapper;
+        this.processInstanceId = processInstanceId;
     }
 
-    @PreDestroy
-    public void stop() {
-        try {
-            this.taskResponseWorker.interrupt();
-            if (!eventQueue.isEmpty()) {
-                List<TaskResponseEvent> remainEvents = new 
ArrayList<>(eventQueue.size());
-                eventQueue.drainTo(remainEvents);
-                for (TaskResponseEvent event : remainEvents) {
-                    this.persist(event);
+    @Override
+    public void run() {
+        while (!this.events.isEmpty()) {
+            TaskResponseEvent event = this.events.peek();
+            try {
+                boolean result = persist(event);
+                if (!result) {
+                    logger.error("persist meta error, task id:{}, instance 
id:{}", event.getTaskInstanceId(), event.getProcessInstanceId());
                 }
+            } catch (Exception e) {
+                logger.error("persist error, task id:{}, instance id:{}", 
event.getTaskInstanceId(), event.getProcessInstanceId(), e);
+            } finally {
+                this.events.remove(event);
             }
-        } catch (Exception e) {
-            logger.error("stop error:", e);
-        }
-    }
-
-    /**
-     * put task to attemptQueue
-     *
-     * @param taskResponseEvent taskResponseEvent
-     */
-    public void addResponse(TaskResponseEvent taskResponseEvent) {
-        try {
-            eventQueue.put(taskResponseEvent);
-            logger.debug("eventQueue size:{}", eventQueue.size());
-        } catch (InterruptedException e) {
-            logger.error("put task : {} error :{}", taskResponseEvent, e);
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    /**
-     * task worker thread
-     */
-    class TaskResponseWorker extends Thread {
-
-        @Override
-        public void run() {
-
-            while (Stopper.isRunning()) {
-                try {
-                    // if not task , blocking here
-                    TaskResponseEvent taskResponseEvent = eventQueue.take();
-                    persist(taskResponseEvent);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    break;
-                } catch (Exception e) {
-                    logger.error("persist task error", e);
-                }
-            }
-            logger.info("StateEventResponseWorker stopped");
         }
     }
 
@@ -146,17 +83,20 @@ public class TaskResponseService {
      *
      * @param taskResponseEvent taskResponseEvent
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
+    private boolean persist(TaskResponseEvent taskResponseEvent) {
         Event event = taskResponseEvent.getEvent();
         Channel channel = taskResponseEvent.getChannel();
 
         TaskInstance taskInstance = 
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
+
+        boolean result = true;
+
         switch (event) {
             case ACK:
                 try {
                     if (taskInstance != null) {
                         ExecutionStatus status = 
taskInstance.getState().typeIsFinished() ? taskInstance.getState() : 
taskResponseEvent.getState();
-                        boolean result = 
processService.changeTaskState(taskInstance, status,
+                        processService.changeTaskState(taskInstance, status,
                                 taskResponseEvent.getStartTime(),
                                 taskResponseEvent.getWorkerAddress(),
                                 taskResponseEvent.getExecutePath(),
@@ -170,6 +110,7 @@ public class TaskResponseService {
                     channel.writeAndFlush(taskAckCommand.convert2Command());
                     logger.debug("worker ack master success, taskInstance 
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
                 } catch (Exception e) {
+                    result = false;
                     logger.error("worker ack master error", e);
                     DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : 
taskInstance.getId());
                     channel.writeAndFlush(taskAckCommand.convert2Command());
@@ -177,7 +118,6 @@ public class TaskResponseService {
                 break;
             case RESULT:
                 try {
-                    boolean result = true;
                     if (taskInstance != null) {
                         result = processService.changeTaskState(taskInstance, 
taskResponseEvent.getState(),
                                 taskResponseEvent.getEndTime(),
@@ -200,6 +140,7 @@ public class TaskResponseService {
                         logger.debug("worker response master success, 
taskInstance id:{},taskInstance host:{}", taskInstance.getId(), 
taskInstance.getHost());
                     }
                 } catch (Exception e) {
+                    result = false;
                     logger.error("worker response master error", e);
                     DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
                     
channel.writeAndFlush(taskResponseCommand.convert2Command());
@@ -208,6 +149,7 @@ public class TaskResponseService {
             default:
                 throw new IllegalArgumentException("invalid event type : " + 
event);
         }
+
         WorkflowExecuteThread workflowExecuteThread = 
this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
         if (workflowExecuteThread != null) {
             StateEvent stateEvent = new StateEvent();
@@ -217,9 +159,31 @@ public class TaskResponseService {
             stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
             workflowExecuteThread.addStateEvent(stateEvent);
         }
+        return result;
+    }
+
+    public boolean addEvent(TaskResponseEvent event) {
+        if (event.getProcessInstanceId() != this.processInstanceId) {
+            logger.info("event would be abounded, task instance id:{}, process 
instance id:{}, this.processInstanceId:{}",
+                    event.getTaskInstanceId(), event.getProcessInstanceId(), 
this.processInstanceId);
+            return false;
+        }
+        return this.events.add(event);
+    }
+
+    public int eventSize() {
+        return this.events.size();
+    }
+
+    public boolean isEmpty() {
+        return this.events.isEmpty();
+    }
+
+    public Integer getProcessInstanceId() {
+        return processInstanceId;
     }
 
-    public BlockingQueue<TaskResponseEvent> getEventQueue() {
-        return eventQueue;
+    public String getKey() {
+        return String.valueOf(processInstanceId);
     }
-}
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index a320a70..5ef2350 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -17,22 +17,18 @@
 
 package org.apache.dolphinscheduler.server.master.processor.queue;
 
-import org.apache.dolphinscheduler.common.enums.Event;
-import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.StateEvent;
-import org.apache.dolphinscheduler.common.enums.StateEventType;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
-import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -42,7 +38,11 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import io.netty.channel.Channel;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * task manager
@@ -66,37 +66,56 @@ public class TaskResponseService {
     @Autowired
     private ProcessService processService;
 
+    @Autowired
+    private MasterConfig masterConfig;
+
     /**
      * task response worker
      */
     private Thread taskResponseWorker;
 
-    private ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceMapper;
+    /**
+     * event handler
+     */
+    private Thread taskResponseEventHandler;
+
+    private ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceMap;
+
+    private final ConcurrentHashMap<String, TaskResponsePersistThread> 
taskResponseEventHandlerMap = new ConcurrentHashMap<>();
+
+    private ListeningExecutorService listeningExecutorService;
+
+    private ExecutorService eventExecService;
+
+    /**
+     * task response mapper
+     */
+    private final ConcurrentHashMap<Integer, TaskResponsePersistThread> 
processTaskResponseMap = new ConcurrentHashMap<>();
 
-    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceMapper) {
-        if (this.processInstanceMapper == null) {
-            this.processInstanceMapper = processInstanceMapper;
+    public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> 
processInstanceMap) {
+        if (this.processInstanceMap == null) {
+            this.processInstanceMap = processInstanceMap;
         }
     }
 
     @PostConstruct
     public void start() {
+        eventExecService = 
ThreadUtils.newDaemonFixedThreadExecutor("PersistEventState", 
masterConfig.getMasterPersistEventStateThreads());
+        this.listeningExecutorService = 
MoreExecutors.listeningDecorator(eventExecService);
         this.taskResponseWorker = new TaskResponseWorker();
-        this.taskResponseWorker.setName("StateEventResponseWorker");
+        this.taskResponseWorker.setName("TaskResponseWorker");
         this.taskResponseWorker.start();
+        this.taskResponseEventHandler = new TaskResponseEventHandler();
+        this.taskResponseEventHandler.setName("TaskResponseEventHandler");
+        this.taskResponseEventHandler.start();
     }
 
     @PreDestroy
     public void stop() {
         try {
             this.taskResponseWorker.interrupt();
-            if (!eventQueue.isEmpty()) {
-                List<TaskResponseEvent> remainEvents = new 
ArrayList<>(eventQueue.size());
-                eventQueue.drainTo(remainEvents);
-                for (TaskResponseEvent event : remainEvents) {
-                    this.persist(event);
-                }
-            }
+            this.taskResponseEventHandler.interrupt();
+            this.eventExecService.shutdown();
         } catch (Exception e) {
             logger.error("stop error:", e);
         }
@@ -124,12 +143,26 @@ public class TaskResponseService {
 
         @Override
         public void run() {
-
             while (Stopper.isRunning()) {
                 try {
                     // if not task , blocking here
                     TaskResponseEvent taskResponseEvent = eventQueue.take();
-                    persist(taskResponseEvent);
+                    if 
(processInstanceMap.containsKey(taskResponseEvent.getProcessInstanceId())
+                            && 
!processTaskResponseMap.containsKey(taskResponseEvent.getProcessInstanceId())) {
+                        TaskResponsePersistThread taskResponsePersistThread = 
new TaskResponsePersistThread(
+                                processService, processInstanceMap, 
taskResponseEvent.getProcessInstanceId());
+                        
processTaskResponseMap.put(taskResponseEvent.getProcessInstanceId(), 
taskResponsePersistThread);
+                    }
+                    TaskResponsePersistThread taskResponsePersistThread = 
processTaskResponseMap.get(taskResponseEvent.getProcessInstanceId());
+                    if (null != taskResponsePersistThread) {
+                        if 
(taskResponsePersistThread.addEvent(taskResponseEvent)) {
+                            logger.debug("submit task response persist queue 
success, task instance id:{},process instance id:{}, state:{} ",
+                                    taskResponseEvent.getTaskInstanceId(), 
taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getState());
+                        } else {
+                            logger.error("submit task response persist queue 
error, task instance id:{},process instance id:{} ",
+                                    taskResponseEvent.getTaskInstanceId(), 
taskResponseEvent.getProcessInstanceId());
+                        }
+                    }
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     break;
@@ -142,84 +175,72 @@ public class TaskResponseService {
     }
 
     /**
-     * persist  taskResponseEvent
-     *
-     * @param taskResponseEvent taskResponseEvent
+     * event handler thread
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
-        Event event = taskResponseEvent.getEvent();
-        Channel channel = taskResponseEvent.getChannel();
+    class TaskResponseEventHandler extends Thread {
 
-        TaskInstance taskInstance = 
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-        switch (event) {
-            case ACK:
+        @Override
+        public void run() {
+            logger.info("event handler thread started");
+            while (Stopper.isRunning()) {
                 try {
-                    if (taskInstance != null) {
-                        ExecutionStatus status = 
taskInstance.getState().typeIsFinished() ? taskInstance.getState() : 
taskResponseEvent.getState();
-                        boolean result = 
processService.changeTaskState(taskInstance, status,
-                                taskResponseEvent.getStartTime(),
-                                taskResponseEvent.getWorkerAddress(),
-                                taskResponseEvent.getExecutePath(),
-                                taskResponseEvent.getLogPath(),
-                                taskResponseEvent.getTaskInstanceId());
-                        logger.debug("changeTaskState in ACK , changed in 
meta:{} ,task instance state:{}, task response event state:{}, taskInstance 
id:{},taskInstance host:{}",
-                                result, taskInstance.getState(), 
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
-                    }
-                    // if taskInstance is null (maybe deleted) . retry will be 
meaningless . so ack success
-                    DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskResponseEvent.getTaskInstanceId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
-                    logger.debug("worker ack master success, taskInstance 
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
+                    eventHandler();
+
+                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
                 } catch (Exception e) {
-                    logger.error("worker ack master error", e);
-                    DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : 
taskInstance.getId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
+                    logger.error("event handler thread error", e);
                 }
-                break;
-            case RESULT:
-                try {
-                    boolean result = true;
-                    if (taskInstance != null) {
-                        result = processService.changeTaskState(taskInstance, 
taskResponseEvent.getState(),
-                                taskResponseEvent.getEndTime(),
-                                taskResponseEvent.getProcessId(),
-                                taskResponseEvent.getAppIds(),
-                                taskResponseEvent.getTaskInstanceId(),
-                                taskResponseEvent.getVarPool()
-                        );
-                        logger.debug("changeTaskState in RESULT , changed in 
meta:{} task instance state:{}, task response event state:{}, taskInstance 
id:{},taskInstance host:{}",
-                                result, taskInstance.getState(), 
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
-                    }
-                    if (!result) {
-                        DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), 
taskResponseEvent.getTaskInstanceId());
-                        
channel.writeAndFlush(taskResponseCommand.convert2Command());
-                        logger.debug("worker response master failure, 
taskInstance id:{},taskInstance host:{}", taskInstance.getId(), 
taskInstance.getHost());
-                    } else {
-                        // if taskInstance is null (maybe deleted) . retry 
will be meaningless . so response success
-                        DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), 
taskResponseEvent.getTaskInstanceId());
-                        
channel.writeAndFlush(taskResponseCommand.convert2Command());
-                        logger.debug("worker response master success, 
taskInstance id:{},taskInstance host:{}", taskInstance.getId(), 
taskInstance.getHost());
+            }
+        }
+
+        private void eventHandler() {
+
+            for (TaskResponsePersistThread taskResponsePersistThread: 
processTaskResponseMap.values()) {
+
+                if 
(taskResponseEventHandlerMap.containsKey(taskResponsePersistThread.getKey())) {
+                    continue;
+                }
+                if (taskResponsePersistThread.eventSize() == 0) {
+                    if 
(!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId()))
 {
+                        
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
+                        logger.info("remove process instance: {}", 
taskResponsePersistThread.getProcessInstanceId());
                     }
-                } catch (Exception e) {
-                    logger.error("worker response master error", e);
-                    DBTaskResponseCommand taskResponseCommand = new 
DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
-                    
channel.writeAndFlush(taskResponseCommand.convert2Command());
+                    continue;
                 }
-                break;
-            default:
-                throw new IllegalArgumentException("invalid event type : " + 
event);
-        }
-        WorkflowExecuteThread workflowExecuteThread = 
this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
-        if (workflowExecuteThread != null) {
-            StateEvent stateEvent = new StateEvent();
-            
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
-            
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
-            stateEvent.setExecutionStatus(taskResponseEvent.getState());
-            stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
-            workflowExecuteThread.addStateEvent(stateEvent);
+                logger.info("already exists handler process size:{}", 
taskResponseEventHandlerMap.size());
+                
taskResponseEventHandlerMap.put(taskResponsePersistThread.getKey(), 
taskResponsePersistThread);
+
+                ListenableFuture future = 
listeningExecutorService.submit(taskResponsePersistThread);
+                FutureCallback futureCallback = new FutureCallback() {
+                    @Override
+                    public void onSuccess(Object o) {
+                        logger.info("persist events {} succeeded.", 
taskResponsePersistThread.getProcessInstanceId());
+                        if 
(!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId()))
 {
+                            
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
+                            logger.info("remove process instance: {}", 
taskResponsePersistThread.getProcessInstanceId());
+                        }
+                        
taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        logger.error("persist events failed: {}", throwable);
+                        if 
(!processInstanceMap.containsKey(taskResponsePersistThread.getProcessInstanceId()))
 {
+                            
processTaskResponseMap.remove(taskResponsePersistThread.getProcessInstanceId());
+                            logger.info("remove process instance: {}", 
taskResponsePersistThread.getProcessInstanceId());
+                        }
+                        
taskResponseEventHandlerMap.remove(taskResponsePersistThread.getKey());
+                    }
+                };
+                Futures.addCallback(future, futureCallback, 
listeningExecutorService);
+            }
         }
     }
 
     public BlockingQueue<TaskResponseEvent> getEventQueue() {
         return eventQueue;
     }
-}
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
index aaf1d90..425d6e9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java
@@ -436,13 +436,6 @@ public class MasterRegistryClient {
                 continue;
             }
 
-            if (serverStartupTime != null && processInstance.getRestartTime() 
!= null
-                    && 
processInstance.getRestartTime().after(serverStartupTime)) {
-                continue;
-            }
-
-            logger.info("failover process instance id: {}", 
processInstance.getId());
-
             List<TaskInstance> validTaskInstanceList = 
processService.findValidTaskListByProcessId(processInstance.getId());
             for (TaskInstance taskInstance : validTaskInstanceList) {
                 if (Constants.NULL.equals(taskInstance.getHost())) {
@@ -457,6 +450,13 @@ public class MasterRegistryClient {
                 logger.info("failover task instance id: {}, process instance 
id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
                 failoverTaskInstance(processInstance, taskInstance);
             }
+
+            if (serverStartupTime != null && processInstance.getRestartTime() 
!= null
+                    && 
processInstance.getRestartTime().after(serverStartupTime)) {
+                continue;
+            }
+
+            logger.info("failover process instance id: {}", 
processInstance.getId());
             //updateProcessInstance host is null and insert into command
             
processService.processNeedFailoverProcessInstances(processInstance);
         }
@@ -576,8 +576,8 @@ public class MasterRegistryClient {
     /**
      * get local address
      */
-    private String getLocalAddress() {
+    public String getLocalAddress() {
         return NetUtils.getAddr(masterConfig.getListenPort());
     }
 
-}
+}
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
index 81d02a9..770062f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/FailoverExecuteThread.java
@@ -28,6 +28,7 @@ import 
org.apache.dolphinscheduler.service.registry.RegistryClient;
 
 import org.apache.commons.collections4.CollectionUtils;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -66,10 +67,12 @@ public class FailoverExecuteThread extends Thread {
         while (Stopper.isRunning()) {
             logger.info("failover execute started");
             try {
-                List<String> hosts = 
processService.queryNeedFailoverProcessInstanceHost();
+                List<String> hosts = getNeedFailoverMasterServers();
                 if (CollectionUtils.isEmpty(hosts)) {
                     continue;
                 }
+                logger.info("need failover hosts:{}", hosts);
+
                 for (String host : hosts) {
                     String failoverPath = 
masterRegistryClient.getFailoverLockPath(NodeType.MASTER, host);
                     try {
@@ -88,4 +91,20 @@ public class FailoverExecuteThread extends Thread {
             }
         }
     }
+
+    private List<String> getNeedFailoverMasterServers() {
+        // failover myself && failover dead masters
+        List<String> hosts = 
processService.queryNeedFailoverProcessInstanceHost();
+
+        Iterator<String> iterator = hosts.iterator();
+        while (iterator.hasNext()) {
+            String host = iterator.next();
+            if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
+                if (!host.equals(masterRegistryClient.getLocalAddress())) {
+                    iterator.remove();
+                }
+            }
+        }
+        return hosts;
+    }
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
index 878446c..d787d0c 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java
@@ -19,6 +19,7 @@ package 
org.apache.dolphinscheduler.server.master.processor.queue;
 
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
 import java.util.Date;
@@ -34,12 +35,15 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import io.netty.channel.Channel;
 
-@RunWith(MockitoJUnitRunner.class)
+@RunWith(MockitoJUnitRunner.Silent.class)
 public class TaskResponseServiceTest {
 
     @Mock(name = "processService")
     private ProcessService processService;
 
+    @Mock
+    private MasterConfig masterConfig;
+
     @InjectMocks
     TaskResponseService taskRspService;
 
@@ -54,6 +58,7 @@ public class TaskResponseServiceTest {
 
     @Before
     public void before() {
+        
Mockito.when(masterConfig.getMasterPersistEventStateThreads()).thenReturn(10);
         taskRspService.start();
 
         ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,

Reply via email to