zhanguohao commented on a change in pull request #4812:
URL: 
https://github.com/apache/incubator-dolphinscheduler/pull/4812#discussion_r579630947



##########
File path: 
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDelayExecManagerThread.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.processor;
+
+import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
+
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manage tasks that need to be delayed
+ */
+public class TaskDelayExecManagerThread implements Runnable {
+
+    private final Logger logger = 
LoggerFactory.getLogger(TaskDelayExecManagerThread.class);
+
+    private final DelayQueue<TaskExecuteThread> delayQueue = new 
DelayQueue<>();
+
+    private final ExecutorService workerExecService;
+
+    public TaskDelayExecManagerThread(ExecutorService executorService) {
+        this.workerExecService = executorService;

Review comment:
       It makes sense in task life cycle management. Here I refactored it. The 
task management is a single thread, and all tasks are hosted on the queue on 
the task management, and it will solve the kill problem of delayed tasks, which 
is beneficial to follow-up. Expansion and task management of workers
   The previous plan did not consider the kill of the delayed task, which would 
cause the delayed task to be in an incorrect state without being executed.
   The status update of the existing logic assumes that the task has already 
run, and then both normal execution and abnormal execution will trigger a 
response and send it to the Master for update. The previous version did not 
consider this situation
   
   
在任务生命周期管理上面是有意义的,这里我重构了一下,任务管理利器单独一个线程,所有的任务托管到任务管理上的队列上面,并且会解决延时任务的kill问题,有利于后续的worker的扩展和任务管理
   之前的方案未考虑到延迟任务的kill,会导致延迟任务在未执行的情况下,状态不对
   现有逻辑的状态更新是假定任务已经跑起来,然后正常执行和异常执行都会触发响应,发送到Master更新,上一版实现并未考虑到这个情况




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to