caishunfeng commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774405009



##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
##########
@@ -142,80 +174,62 @@ public void run() {
     }
 
     /**
-     * persist  taskResponseEvent
-     *
-     * @param taskResponseEvent taskResponseEvent
+     * event handler thread
      */
-    private void persist(TaskResponseEvent taskResponseEvent) {
-        Event event = taskResponseEvent.getEvent();
-        Channel channel = taskResponseEvent.getChannel();
+    class EventHandler extends Thread {
 
-        TaskInstance taskInstance = 
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
-        switch (event) {
-            case ACK:
+        @Override
+        public void run() {
+            logger.info("event handler thread started");
+            while (Stopper.isRunning()) {
                 try {
-                    if (taskInstance != null) {
-                        ExecutionStatus status = 
taskInstance.getState().typeIsFinished() ? taskInstance.getState() : 
taskResponseEvent.getState();
-                        boolean result = 
processService.changeTaskState(taskInstance, status,
-                                taskResponseEvent.getStartTime(),
-                                taskResponseEvent.getWorkerAddress(),
-                                taskResponseEvent.getExecutePath(),
-                                taskResponseEvent.getLogPath(),
-                                taskResponseEvent.getTaskInstanceId());
-                        logger.debug("changeTaskState in ACK , changed in 
meta:{} ,task instance state:{}, task response event state:{}, taskInstance 
id:{},taskInstance host:{}",
-                                result, taskInstance.getState(), 
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
-                    }
-                    // if taskInstance is null (maybe deleted) . retry will be 
meaningless . so ack success
-                    DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), 
taskResponseEvent.getTaskInstanceId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
-                    logger.debug("worker ack master success, taskInstance 
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
+                    eventHandler();
+
+                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    break;
                 } catch (Exception e) {
-                    logger.error("worker ack master error", e);
-                    DBTaskAckCommand taskAckCommand = new 
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 : 
taskInstance.getId());
-                    channel.writeAndFlush(taskAckCommand.convert2Command());
+                    logger.error("event handler thread error", e);
                 }
-                break;
-            case RESULT:
-                try {
-                    boolean result = true;
-                    if (taskInstance != null) {
-                        result = processService.changeTaskState(taskInstance, 
taskResponseEvent.getState(),
-                                taskResponseEvent.getEndTime(),
-                                taskResponseEvent.getProcessId(),
-                                taskResponseEvent.getAppIds(),
-                                taskResponseEvent.getTaskInstanceId(),
-                                taskResponseEvent.getVarPool()
-                        );
-                        logger.debug("changeTaskState in RESULT , changed in 
meta:{} task instance state:{}, task response event state:{}, taskInstance 
id:{},taskInstance host:{}",
-                                result, taskInstance.getState(), 
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
+            }
+        }
+
+        private void eventHandler() {
+
+            Iterator<Map.Entry<Integer, TaskResponsePersistThread>> iter = 
processTaskResponseMapper.entrySet().iterator();
+
+            while (iter.hasNext()) {
+                Map.Entry<Integer, TaskResponsePersistThread> entry = 
iter.next();
+                int processInstanceId = entry.getKey();
+                TaskResponsePersistThread taskResponsePersistThread = 
entry.getValue();
+                if (taskResponsePersistThread.isEmpty()) {
+                    continue;

Review comment:
       Maybe it's better to add the remove logic here if process instance was 
finished and no task response event.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to