caishunfeng commented on a change in pull request #7560:
URL: https://github.com/apache/dolphinscheduler/pull/7560#discussion_r774405009
##########
File path:
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
##########
@@ -142,80 +174,62 @@ public void run() {
}
/**
- * persist taskResponseEvent
- *
- * @param taskResponseEvent taskResponseEvent
+ * event handler thread
*/
- private void persist(TaskResponseEvent taskResponseEvent) {
- Event event = taskResponseEvent.getEvent();
- Channel channel = taskResponseEvent.getChannel();
+ class EventHandler extends Thread {
- TaskInstance taskInstance =
processService.findTaskInstanceById(taskResponseEvent.getTaskInstanceId());
- switch (event) {
- case ACK:
+ @Override
+ public void run() {
+ logger.info("event handler thread started");
+ while (Stopper.isRunning()) {
try {
- if (taskInstance != null) {
- ExecutionStatus status =
taskInstance.getState().typeIsFinished() ? taskInstance.getState() :
taskResponseEvent.getState();
- boolean result =
processService.changeTaskState(taskInstance, status,
- taskResponseEvent.getStartTime(),
- taskResponseEvent.getWorkerAddress(),
- taskResponseEvent.getExecutePath(),
- taskResponseEvent.getLogPath(),
- taskResponseEvent.getTaskInstanceId());
- logger.debug("changeTaskState in ACK , changed in
meta:{} ,task instance state:{}, task response event state:{}, taskInstance
id:{},taskInstance host:{}",
- result, taskInstance.getState(),
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
- }
- // if taskInstance is null (maybe deleted) . retry will be
meaningless . so ack success
- DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskResponseEvent.getTaskInstanceId());
- channel.writeAndFlush(taskAckCommand.convert2Command());
- logger.debug("worker ack master success, taskInstance
id:{},taskInstance host:{}", taskInstance.getId(), taskInstance.getHost());
+ eventHandler();
+
+ TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
} catch (Exception e) {
- logger.error("worker ack master error", e);
- DBTaskAckCommand taskAckCommand = new
DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), taskInstance == null ? -1 :
taskInstance.getId());
- channel.writeAndFlush(taskAckCommand.convert2Command());
+ logger.error("event handler thread error", e);
}
- break;
- case RESULT:
- try {
- boolean result = true;
- if (taskInstance != null) {
- result = processService.changeTaskState(taskInstance,
taskResponseEvent.getState(),
- taskResponseEvent.getEndTime(),
- taskResponseEvent.getProcessId(),
- taskResponseEvent.getAppIds(),
- taskResponseEvent.getTaskInstanceId(),
- taskResponseEvent.getVarPool()
- );
- logger.debug("changeTaskState in RESULT , changed in
meta:{} task instance state:{}, task response event state:{}, taskInstance
id:{},taskInstance host:{}",
- result, taskInstance.getState(),
taskResponseEvent.getState(), taskInstance.getId(), taskInstance.getHost());
+ }
+ }
+
+ private void eventHandler() {
+
+ Iterator<Map.Entry<Integer, TaskResponsePersistThread>> iter =
processTaskResponseMapper.entrySet().iterator();
+
+ while (iter.hasNext()) {
+ Map.Entry<Integer, TaskResponsePersistThread> entry =
iter.next();
+ int processInstanceId = entry.getKey();
+ TaskResponsePersistThread taskResponsePersistThread =
entry.getValue();
+ if (taskResponsePersistThread.isEmpty()) {
+ continue;
Review comment:
Maybe it's better to add the remove logic here if process instance was
finished and no task response event.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]