This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 2119e41800 [Improvement] Move delay calculation to Master (#15278)
2119e41800 is described below
commit 2119e41800de0b0df7341e166a48fd8283fab98e
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Dec 6 09:37:14 2023 +0800
[Improvement] Move delay calculation to Master (#15278)
---
.../master/event/TaskTimeoutStateEventHandler.java | 2 +-
...ogicITaskInstanceDispatchOperationFunction.java | 43 ++++---------
.../LogicITaskInstanceKillOperationFunction.java | 22 +++----
.../LogicITaskInstancePauseOperationFunction.java | 14 ++---
.../server/master/runner/BaseTaskDispatcher.java | 1 -
.../master/runner/BaseTaskExecuteRunnable.java | 55 ++++++++++++++++
.../{execute => }/DefaultTaskExecuteRunnable.java | 4 +-
...a => GlobalMasterTaskExecuteRunnableQueue.java} | 32 ++++++----
...lobalMasterTaskExecuteRunnableQueueLooper.java} | 24 +++----
.../runner/GlobalTaskDispatchWaitingQueue.java | 6 +-
.../GlobalTaskDispatchWaitingQueueLooper.java | 1 -
.../MasterDelayTaskExecuteRunnableDelayQueue.java | 54 ----------------
.../master/runner/MasterTaskExecutorBootstrap.java | 8 +--
....java => PriorityDelayTaskExecuteRunnable.java} | 73 ++++++++--------------
.../master/runner/StreamTaskExecuteRunnable.java | 2 -
.../runner/{execute => }/TaskExecuteRunnable.java | 4 +-
.../{execute => }/TaskExecuteRunnableFactory.java | 7 ++-
.../{execute => }/TaskExecutionContextFactory.java | 2 +-
.../master/runner/WorkflowExecuteRunnable.java | 2 -
.../runner/dispatcher/MasterTaskDispatcher.java | 2 +-
.../master/runner/dispatcher/TaskDispatcher.java | 2 +-
.../runner/dispatcher/WorkerTaskDispatcher.java | 2 +-
...eRunnable.java => AsyncMasterTaskExecutor.java} | 10 +--
...ry.java => AsyncMasterTaskExecutorFactory.java} | 8 +--
.../execute/AsyncTaskCallbackFunctionImpl.java | 22 +++----
.../execute/DefaultTaskExecuteRunnableFactory.java | 3 +
.../execute/MasterDelayTaskExecuteRunnable.java | 68 --------------------
...xecuteRunnable.java => MasterTaskExecutor.java} | 10 +--
...Factory.java => MasterTaskExecutorFactory.java} | 4 +-
....java => MasterTaskExecutorFactoryBuilder.java} | 15 +++--
...leHolder.java => MasterTaskExecutorHolder.java} | 14 ++---
...Pool.java => MasterTaskExecutorThreadPool.java} | 6 +-
...teRunnable.java => SyncMasterTaskExecutor.java} | 10 +--
...ory.java => SyncMasterTaskExecutorFactory.java} | 8 +--
.../BaseTaskExecuteRunnableDispatchOperator.java | 26 +++++++-
.../BaseTaskExecuteRunnableKillOperator.java | 2 +-
.../BaseTaskExecuteRunnablePauseOperator.java | 2 +-
.../BaseTaskExecuteRunnableTimeoutOperator.java | 2 +-
.../LogicTaskExecuteRunnableDispatchOperator.java | 6 +-
.../TaskExecuteRunnableDispatchOperator.java | 14 ++---
.../operator/TaskExecuteRunnableOperator.java | 2 +-
.../TaskExecuteRunnableOperatorManager.java | 2 +-
.../operator/TaskExecuteRunnablePauseOperator.java | 2 +-
.../master/runner/WorkflowExecuteRunnableTest.java | 1 -
.../dispatcher/MasterTaskDispatcherTest.java | 2 +-
.../dispatcher/WorkerTaskDispatcherTest.java | 2 +-
...a => PriorityDelayTaskExecuteRunnableTest.java} | 8 ++-
.../shell/BaseLinuxShellInterceptorBuilder.java | 7 ++-
.../server/worker/WorkerServer.java | 6 +-
.../worker/registry/WorkerWaitingStrategy.java | 6 +-
.../rpc/StreamingTaskInstanceOperatorImpl.java | 8 +--
...faultWorkerDelayTaskExecuteRunnableFactory.java | 59 -----------------
...unnable.java => DefaultWorkerTaskExecutor.java} | 14 ++---
....java => DefaultWorkerTaskExecutorFactory.java} | 44 +++++++------
...ue.java => GlobalTaskInstanceWaitingQueue.java} | 4 +-
...a => GlobalTaskInstanceWaitingQueueLooper.java} | 35 +++--------
.../runner/WorkerDelayTaskExecuteRunnable.java | 69 --------------------
.../server/worker/runner/WorkerExecService.java | 14 ++---
.../server/worker/runner/WorkerManagerThread.java | 23 +++----
...xecuteRunnable.java => WorkerTaskExecutor.java} | 18 +++---
...Factory.java => WorkerTaskExecutorFactory.java} | 4 +-
....java => WorkerTaskExecutorFactoryBuilder.java} | 16 ++---
.../TaskInstanceDispatchOperationFunction.java | 6 +-
.../TaskInstanceKillOperationFunction.java | 8 +--
...est.java => DefaultWorkerTaskExecutorTest.java} | 10 +--
65 files changed, 370 insertions(+), 592 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
index 16476ad7f7..c04eb9c338 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskTimeoutStateEventHandler.java
@@ -22,8 +22,8 @@ import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import java.util.Map;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
index c60e860aa7..f69ff4fb77 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
@@ -17,18 +17,14 @@
package org.apache.dolphinscheduler.server.master.rpc;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest;
import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
-import
org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableFactoryBuilder;
+import
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
-
-import java.util.concurrent.TimeUnit;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -42,10 +38,10 @@ public class LogicITaskInstanceDispatchOperationFunction
ITaskInstanceOperationFunction<LogicTaskDispatchRequest,
LogicTaskDispatchResponse> {
@Autowired
- private MasterTaskExecuteRunnableFactoryBuilder
masterTaskExecuteRunnableFactoryBuilder;
+ private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;
@Autowired
- private MasterDelayTaskExecuteRunnableDelayQueue
masterDelayTaskExecuteRunnableDelayQueue;
+ private GlobalMasterTaskExecuteRunnableQueue
globalMasterTaskExecuteRunnableQueue;
@Override
public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest
taskDispatchRequest) {
@@ -63,34 +59,17 @@ public class LogicITaskInstanceDispatchOperationFunction
MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext);
- int delayTime = taskExecutionContext.getDelayTime();
- if (delayTime > 0) {
- // todo: calculate the delay in master dispatcher then we
don't need to use a queue to store the task
- final long remainTime =
-
DateUtils.getRemainTime(DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
- TimeUnit.SECONDS.toMillis(delayTime));
- if (remainTime > 0) {
- log.info(
- "Current taskInstance: {} is choosing delay
execution, delay time: {}/ms, remainTime: {}/ms",
- taskExecutionContext.getTaskName(),
-
TimeUnit.SECONDS.toMillis(taskExecutionContext.getDelayTime()), remainTime);
-
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
- // todo: send delay execution message
- return
LogicTaskDispatchResponse.success(taskExecutionContext.getTaskInstanceId());
- }
- }
- final MasterDelayTaskExecuteRunnable
masterDelayTaskExecuteRunnable =
- masterTaskExecuteRunnableFactoryBuilder
-
.createWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext.getTaskType())
-
.createWorkerTaskExecuteRunnable(taskExecutionContext);
- if (masterDelayTaskExecuteRunnableDelayQueue
-
.submitMasterDelayTaskExecuteRunnable(masterDelayTaskExecuteRunnable)) {
+ MasterTaskExecutor masterTaskExecutor =
masterTaskExecutorFactoryBuilder
+
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
+ .createMasterTaskExecutor(taskExecutionContext);
+ if (globalMasterTaskExecuteRunnableQueue
+ .submitMasterTaskExecuteRunnable(masterTaskExecutor)) {
log.info("Submit LogicTask: {} to
MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
return LogicTaskDispatchResponse.success(taskInstanceId);
} else {
log.error(
"Submit LogicTask: {} to
MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue
size: {} is full",
- taskInstanceName,
masterDelayTaskExecuteRunnableDelayQueue.size());
+ taskInstanceName,
globalMasterTaskExecuteRunnableQueue.size());
return LogicTaskDispatchResponse.failed(taskInstanceId,
"MasterDelayTaskExecuteRunnableDelayQueue is full");
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
index 6b43a7690a..27fc52333d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
@@ -21,10 +21,10 @@ import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import
org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
+import
org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
import lombok.extern.slf4j.Slf4j;
@@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<LogicTaskKillRequest,
LogicTaskKillResponse> {
@Autowired
- private MasterDelayTaskExecuteRunnableDelayQueue
masterDelayTaskExecuteRunnableDelayQueue;
+ private GlobalMasterTaskExecuteRunnableQueue
globalMasterTaskExecuteRunnableQueue;
@Override
public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest)
{
@@ -46,16 +46,16 @@ public class LogicITaskInstanceKillOperationFunction
try {
LogUtils.setTaskInstanceIdMDC(taskKillRequest.getTaskInstanceId());
log.info("Received killLogicTask request: {}", taskKillRequest);
- final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId);
- if (masterTaskExecuteRunnable == null) {
+ final MasterTaskExecutor masterTaskExecutor =
+
MasterTaskExecutorHolder.getMasterTaskExecutor(taskInstanceId);
+ if (masterTaskExecutor == null) {
log.error("Cannot find the MasterTaskExecuteRunnable, this
task may already been killed");
return LogicTaskKillResponse.fail("Cannot find the
MasterTaskExecuteRunnable");
}
try {
- masterTaskExecuteRunnable.cancelTask();
- masterDelayTaskExecuteRunnableDelayQueue
-
.removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable);
+ masterTaskExecutor.cancelTask();
+ globalMasterTaskExecuteRunnableQueue
+ .removeMasterTaskExecuteRunnable(masterTaskExecutor);
return LogicTaskKillResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);
@@ -63,7 +63,7 @@ public class LogicITaskInstanceKillOperationFunction
} finally {
// todo: If cancel failed, we cannot remove the context?
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
-
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
+
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId);
}
} finally {
LogUtils.removeTaskInstanceIdMDC();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
index 912193b91e..a95d2fc667 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstancePauseOperationFunction.java
@@ -22,8 +22,8 @@ import
org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResp
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import
org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
import lombok.extern.slf4j.Slf4j;
@@ -39,16 +39,16 @@ public class LogicITaskInstancePauseOperationFunction
public LogicTaskPauseResponse operate(LogicTaskPauseRequest
taskPauseRequest) {
try {
LogUtils.setTaskInstanceIdMDC(taskPauseRequest.getTaskInstanceId());
- final MasterTaskExecuteRunnable masterTaskExecuteRunnable =
-
MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId());
- if (masterTaskExecuteRunnable == null) {
+ final MasterTaskExecutor masterTaskExecutor =
+
MasterTaskExecutorHolder.getMasterTaskExecutor(taskPauseRequest.getTaskInstanceId());
+ if (masterTaskExecutor == null) {
log.info("Cannot find the MasterTaskExecuteRunnable");
return LogicTaskPauseResponse.fail("Cannot find the
MasterTaskExecuteRunnable");
}
- final TaskExecutionContext taskExecutionContext =
masterTaskExecuteRunnable.getTaskExecutionContext();
+ final TaskExecutionContext taskExecutionContext =
masterTaskExecutor.getTaskExecutionContext();
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
- masterTaskExecuteRunnable.pauseTask();
+ masterTaskExecutor.pauseTask();
return LogicTaskPauseResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Pause MasterTaskExecuteRunnable failed", e);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
index 0ad515e93a..30ab8fadec 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskDispatcher.java
@@ -30,7 +30,6 @@ import
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import java.util.Date;
import java.util.Optional;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
new file mode 100644
index 0000000000..fefdbf3493
--- /dev/null
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/BaseTaskExecuteRunnable.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.runner;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+
+public abstract class BaseTaskExecuteRunnable implements TaskExecuteRunnable {
+
+ protected final ProcessInstance workflowInstance;
+ protected final TaskInstance taskInstance;
+ protected final TaskExecutionContext taskExecutionContext;
+
+ public BaseTaskExecuteRunnable(ProcessInstance workflowInstance,
+ TaskInstance taskInstance,
+ TaskExecutionContext taskExecutionContext) {
+ this.taskInstance = checkNotNull(taskInstance);
+ this.workflowInstance = checkNotNull(workflowInstance);
+ this.taskExecutionContext = checkNotNull(taskExecutionContext);
+ }
+
+ @Override
+ public ProcessInstance getWorkflowInstance() {
+ return workflowInstance;
+ }
+
+ @Override
+ public TaskInstance getTaskInstance() {
+ return taskInstance;
+ }
+
+ @Override
+ public TaskExecutionContext getTaskExecutionContext() {
+ return taskExecutionContext;
+ }
+
+}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
similarity index 94%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
index 6f736139b6..c1b13717bd 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/DefaultTaskExecuteRunnable.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
-public class DefaultTaskExecuteRunnable extends PriorityTaskExecuteRunnable {
+public class DefaultTaskExecuteRunnable extends
PriorityDelayTaskExecuteRunnable {
private final TaskExecuteRunnableOperatorManager
taskExecuteRunnableOperatorManager;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
similarity index 53%
copy from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
copy to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
index a8f8f88498..9005416b92 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
@@ -17,30 +17,36 @@
package org.apache.dolphinscheduler.server.master.runner;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
-import java.util.concurrent.PriorityBlockingQueue;
-
-import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.stereotype.Component;
-@Slf4j
+/**
+ *
+ */
@Component
-public class GlobalTaskDispatchWaitingQueue {
+public class GlobalMasterTaskExecuteRunnableQueue {
- private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue =
new PriorityBlockingQueue<>();
+ private final BlockingQueue<MasterTaskExecutor>
masterTaskExecutorBlockingQueue =
+ new LinkedBlockingQueue<>();
+
+ public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor
masterTaskExecutor) {
+ return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
+ }
- public void
submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable
priorityTaskExecuteRunnable) {
- queue.put(priorityTaskExecuteRunnable);
+ public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws
InterruptedException {
+ return masterTaskExecutorBlockingQueue.take();
}
- public DefaultTaskExecuteRunnable takeNeedToDispatchTaskExecuteRunnable()
throws InterruptedException {
- return queue.take();
+ public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor
masterTaskExecutor) {
+ return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor);
}
- public int getWaitingDispatchTaskNumber() {
- return queue.size();
+ public int size() {
+ return masterTaskExecutorBlockingQueue.size();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
similarity index 74%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
index 557f2ca447..e0b1f80704 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
@@ -18,9 +18,9 @@
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
+import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -31,17 +31,17 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends
BaseDaemonThread implements AutoCloseable {
+public class GlobalMasterTaskExecuteRunnableQueueLooper extends
BaseDaemonThread implements AutoCloseable {
@Autowired
- private MasterDelayTaskExecuteRunnableDelayQueue
masterDelayTaskExecuteRunnableDelayQueue;
+ private GlobalMasterTaskExecuteRunnableQueue
globalMasterTaskExecuteRunnableQueue;
@Autowired
- private MasterTaskExecuteRunnableThreadPool
masterTaskExecuteRunnableThreadPool;
+ private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool;
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
- public MasterDelayTaskExecuteRunnableDelayQueueLooper() {
+ public GlobalMasterTaskExecuteRunnableQueueLooper() {
super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
}
@@ -53,7 +53,7 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper
extends BaseDaemonTh
}
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting...");
super.start();
- masterTaskExecuteRunnableThreadPool.start();
+ masterTaskExecutorThreadPool.start();
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started...");
}
@@ -61,10 +61,10 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper
extends BaseDaemonTh
public void run() {
while (RUNNING_FLAG.get()) {
try {
- final MasterDelayTaskExecuteRunnable
masterDelayTaskExecuteRunnable =
-
masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable();
-
masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
-
MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable);
+ final MasterTaskExecutor masterTaskExecutor =
+
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
+
masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor);
+
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has
been interrupted, will stop loop");
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
index a8f8f88498..f4d50537c6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
@@ -17,9 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
-
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.DelayQueue;
import lombok.extern.slf4j.Slf4j;
@@ -29,7 +27,7 @@ import org.springframework.stereotype.Component;
@Component
public class GlobalTaskDispatchWaitingQueue {
- private final PriorityBlockingQueue<DefaultTaskExecuteRunnable> queue =
new PriorityBlockingQueue<>();
+ private final DelayQueue<DefaultTaskExecuteRunnable> queue = new
DelayQueue<>();
public void
submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable
priorityTaskExecuteRunnable) {
queue.put(priorityTaskExecuteRunnable);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
index c54aa5aff8..b496bea5a5 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
@@ -21,7 +21,6 @@ import
org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatchFactory;
import
org.apache.dolphinscheduler.server.master.runner.dispatcher.TaskDispatcher;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java
deleted file mode 100644
index bdd1510527..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueue.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner;
-
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable;
-import
org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable;
-
-import java.util.concurrent.DelayQueue;
-
-import org.springframework.stereotype.Component;
-
-/**
- *
- */
-@Component
-public class MasterDelayTaskExecuteRunnableDelayQueue {
-
- private final DelayQueue<MasterDelayTaskExecuteRunnable>
masterDelayTaskExecuteRunnableDelayQueue =
- new DelayQueue<>();
-
- public boolean
submitMasterDelayTaskExecuteRunnable(MasterDelayTaskExecuteRunnable
masterDelayTaskExecuteRunnable) {
- return
masterDelayTaskExecuteRunnableDelayQueue.offer(masterDelayTaskExecuteRunnable);
- }
-
- public MasterDelayTaskExecuteRunnable takeMasterDelayTaskExecuteRunnable()
throws InterruptedException {
- return masterDelayTaskExecuteRunnableDelayQueue.take();
- }
-
- // todo: if we move the delay process to master, than we don't need this
method, since dispatchProcess can directly
- // submit to thread pool
- public boolean
removeMasterDelayTaskExecuteRunnable(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
- return
masterDelayTaskExecuteRunnableDelayQueue.remove(masterTaskExecuteRunnable);
- }
-
- public int size() {
- return masterDelayTaskExecuteRunnableDelayQueue.size();
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
index 744e560c00..3e99d2141c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
@@ -32,7 +32,7 @@ public class MasterTaskExecutorBootstrap implements
AutoCloseable {
private GlobalTaskDispatchWaitingQueueLooper
globalTaskDispatchWaitingQueueLooper;
@Autowired
- private MasterDelayTaskExecuteRunnableDelayQueueLooper
masterDelayTaskExecuteRunnableDelayQueueLooper;
+ private GlobalMasterTaskExecuteRunnableQueueLooper
globalMasterTaskExecuteRunnableQueueLooper;
@Autowired
private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper;
@@ -40,7 +40,7 @@ public class MasterTaskExecutorBootstrap implements
AutoCloseable {
public synchronized void start() {
log.info("MasterTaskExecutorBootstrap starting...");
globalTaskDispatchWaitingQueueLooper.start();
- masterDelayTaskExecuteRunnableDelayQueueLooper.start();
+ globalMasterTaskExecuteRunnableQueueLooper.start();
asyncMasterTaskDelayQueueLooper.start();
log.info("MasterTaskExecutorBootstrap started...");
}
@@ -51,8 +51,8 @@ public class MasterTaskExecutorBootstrap implements
AutoCloseable {
try (
final GlobalTaskDispatchWaitingQueueLooper
globalTaskDispatchWaitingQueueLooper1 =
globalTaskDispatchWaitingQueueLooper;
- final MasterDelayTaskExecuteRunnableDelayQueueLooper
masterDelayTaskExecuteRunnableDelayQueueLooper1 =
- masterDelayTaskExecuteRunnableDelayQueueLooper;
+ final GlobalMasterTaskExecuteRunnableQueueLooper
globalMasterTaskExecuteRunnableQueueLooper1 =
+ globalMasterTaskExecuteRunnableQueueLooper;
final AsyncMasterTaskDelayQueueLooper
asyncMasterTaskDelayQueueLooper1 =
asyncMasterTaskDelayQueueLooper) {
// closed the resource
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
similarity index 57%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
index 2e23feb7bd..255ec6c8ac 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/PriorityDelayTaskExecuteRunnable.java
@@ -15,62 +15,57 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.execute;
-
-import static com.google.common.base.Preconditions.checkNotNull;
+package org.apache.dolphinscheduler.server.master.runner;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.jetbrains.annotations.NotNull;
-
-public abstract class PriorityTaskExecuteRunnable implements
TaskExecuteRunnable, Comparable<TaskExecuteRunnable> {
-
- private final ProcessInstance workflowInstance;
- private final TaskInstance taskInstance;
- private final TaskExecutionContext taskExecutionContext;
-
- public PriorityTaskExecuteRunnable(ProcessInstance workflowInstance,
- TaskInstance taskInstance,
- TaskExecutionContext
taskExecutionContext) {
- this.taskInstance = checkNotNull(taskInstance);
- this.workflowInstance = checkNotNull(workflowInstance);
- this.taskExecutionContext = checkNotNull(taskExecutionContext);
- }
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
- @Override
- public ProcessInstance getWorkflowInstance() {
- return workflowInstance;
- }
+public abstract class PriorityDelayTaskExecuteRunnable extends
BaseTaskExecuteRunnable implements Delayed {
- @Override
- public TaskInstance getTaskInstance() {
- return taskInstance;
+ public PriorityDelayTaskExecuteRunnable(ProcessInstance workflowInstance,
+ TaskInstance taskInstance,
+ TaskExecutionContext
taskExecutionContext) {
+ super(workflowInstance, taskInstance, taskExecutionContext);
}
@Override
- public TaskExecutionContext getTaskExecutionContext() {
- return taskExecutionContext;
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(
+
DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
+ taskExecutionContext.getDelayTime() * 60L),
+ TimeUnit.SECONDS);
}
@Override
- public int compareTo(@NotNull TaskExecuteRunnable other) {
+ public int compareTo(Delayed o) {
+ if (o == null) {
+ return 1;
+ }
+ int delayTimeCompareResult =
+ Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
+ if (delayTimeCompareResult != 0) {
+ return delayTimeCompareResult;
+ }
+ PriorityDelayTaskExecuteRunnable other =
(PriorityDelayTaskExecuteRunnable) o;
// the smaller dispatch fail times, the higher priority
int dispatchFailTimesCompareResult =
taskExecutionContext.getDispatchFailTimes()
- other.getTaskExecutionContext().getDispatchFailTimes();
if (dispatchFailTimesCompareResult != 0) {
return dispatchFailTimesCompareResult;
}
-
int workflowInstancePriorityCompareResult =
workflowInstance.getProcessInstancePriority().getCode()
-
other.getWorkflowInstance().getProcessInstancePriority().getCode();
if (workflowInstancePriorityCompareResult != 0) {
return workflowInstancePriorityCompareResult;
}
- int workflowInstanceIdCompareResult = workflowInstance.getId() -
other.getWorkflowInstance().getId();
+ long workflowInstanceIdCompareResult =
workflowInstance.getId().compareTo(other.getWorkflowInstance().getId());
if (workflowInstanceIdCompareResult != 0) {
- return workflowInstanceIdCompareResult;
+ return workflowInstancePriorityCompareResult;
}
int taskInstancePriorityCompareResult =
taskInstance.getTaskInstancePriority().getCode()
- other.getTaskInstance().getTaskInstancePriority().getCode();
@@ -84,21 +79,7 @@ public abstract class PriorityTaskExecuteRunnable implements
TaskExecuteRunnable
return -taskGroupPriorityCompareResult;
}
// The task instance shouldn't be equals
- return taskInstance.getId() - other.getTaskInstance().getId();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof PriorityTaskExecuteRunnable) {
- PriorityTaskExecuteRunnable other = (PriorityTaskExecuteRunnable)
obj;
- return compareTo(other) == 0;
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return taskInstance.getId();
+ return taskInstance.getId().compareTo(other.getTaskInstance().getId());
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 24e524d171..ab6fc555a6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -57,9 +57,7 @@ import
org.apache.dolphinscheduler.server.master.event.StateEventHandleException
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import
org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
similarity index 90%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
index 02980aff49..8f66189617 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnable.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -25,7 +25,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
* This interface is used to define a task which is executing.
* todo: split to MasterTaskExecuteRunnable and WorkerTaskExecuteRunnable
*/
-public interface TaskExecuteRunnable extends Comparable<TaskExecuteRunnable> {
+public interface TaskExecuteRunnable {
void dispatch();
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java
similarity index 89%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java
index e3afb2d6b3..43ee971160 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecuteRunnableFactory.java
@@ -15,11 +15,16 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import
org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException;
+/**
+ * Use to create TaskExecuteRunnable
+ *
+ * @param <T> TaskExecuteRunnable
+ */
public interface TaskExecuteRunnableFactory<T extends TaskExecuteRunnable> {
T createTaskExecuteRunnable(TaskInstance taskInstance) throws
TaskExecuteRunnableCreateException;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
similarity index 99%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
index d8f72564bd..3fb2a9c850 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.master.runner.execute;
+package org.apache.dolphinscheduler.server.master.runner;
import static org.apache.dolphinscheduler.common.constants.Constants.ADDRESS;
import static org.apache.dolphinscheduler.common.constants.Constants.DATABASE;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 00c1515648..35ad1e8c82 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -80,9 +80,7 @@ import
org.apache.dolphinscheduler.server.master.event.TaskStateEvent;
import org.apache.dolphinscheduler.server.master.event.WorkflowStateEvent;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
index d30a1e554d..cca7112975 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcher.java
@@ -27,7 +27,7 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
import java.util.Optional;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
index 32a195fb5a..f595d5a490 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatcher.java
@@ -19,7 +19,7 @@ package
org.apache.dolphinscheduler.server.master.runner.dispatcher;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
/**
* Used to do task dispatcher.
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
index 36739a1163..3320082694 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcher.java
@@ -31,7 +31,7 @@ import
org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import
org.apache.dolphinscheduler.server.master.exception.TaskDispatchException;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.server.master.runner.BaseTaskDispatcher;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
import java.util.Optional;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java
similarity index 82%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java
index 8a6e6d8e87..d289a318da 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutor.java
@@ -26,14 +26,14 @@ import
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFact
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class AsyncMasterDelayTaskExecuteRunnable extends
MasterDelayTaskExecuteRunnable {
+public class AsyncMasterTaskExecutor extends MasterTaskExecutor {
private final AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
- public AsyncMasterDelayTaskExecuteRunnable(TaskExecutionContext
taskExecutionContext,
- LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
-
LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager,
- AsyncMasterTaskDelayQueue
asyncTaskDelayQueue) {
+ public AsyncMasterTaskExecutor(TaskExecutionContext taskExecutionContext,
+ LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
+
LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager,
+ AsyncMasterTaskDelayQueue
asyncTaskDelayQueue) {
super(taskExecutionContext, logicTaskPluginFactoryBuilder,
logicTaskInstanceExecutionEventSenderManager);
this.asyncMasterTaskDelayQueue = asyncTaskDelayQueue;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java
similarity index 83%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java
index a71f394b7d..6c81cd7862 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterDelayTaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskExecutorFactory.java
@@ -25,9 +25,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
-public class AsyncMasterDelayTaskExecuteRunnableFactory
+public class AsyncMasterTaskExecutorFactory
implements
-
MasterDelayTaskExecuteRunnableFactory<AsyncMasterDelayTaskExecuteRunnable> {
+ MasterTaskExecutorFactory<AsyncMasterTaskExecutor> {
@Autowired
private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder;
@@ -39,8 +39,8 @@ public class AsyncMasterDelayTaskExecuteRunnableFactory
private AsyncMasterTaskDelayQueue asyncTaskDelayQueue;
@Override
- public AsyncMasterDelayTaskExecuteRunnable
createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) {
- return new AsyncMasterDelayTaskExecuteRunnable(taskExecutionContext,
+ public AsyncMasterTaskExecutor
createMasterTaskExecutor(TaskExecutionContext taskExecutionContext) {
+ return new AsyncMasterTaskExecutor(taskExecutionContext,
logicTaskPluginFactoryBuilder,
logicTaskInstanceExecutionEventSenderManager,
asyncTaskDelayQueue);
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
index e9d4fe4430..6c83a3ccb9 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java
@@ -26,41 +26,41 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AsyncTaskCallbackFunctionImpl implements
AsyncTaskCallbackFunction {
- private final AsyncMasterDelayTaskExecuteRunnable
asyncMasterDelayTaskExecuteRunnable;
+ private final AsyncMasterTaskExecutor asyncMasterTaskExecuteRunnable;
- public AsyncTaskCallbackFunctionImpl(@NonNull
AsyncMasterDelayTaskExecuteRunnable asyncMasterDelayTaskExecuteRunnable) {
- this.asyncMasterDelayTaskExecuteRunnable =
asyncMasterDelayTaskExecuteRunnable;
+ public AsyncTaskCallbackFunctionImpl(@NonNull AsyncMasterTaskExecutor
asyncMasterTaskExecuteRunnable) {
+ this.asyncMasterTaskExecuteRunnable = asyncMasterTaskExecuteRunnable;
}
@Override
public void executeSuccess() {
- asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext()
+ asyncMasterTaskExecuteRunnable.getTaskExecutionContext()
.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
executeFinished();
}
@Override
public void executeFailed() {
- asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext()
+ asyncMasterTaskExecuteRunnable.getTaskExecutionContext()
.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
executeFinished();
}
@Override
public void executeThrowing(Throwable throwable) {
- asyncMasterDelayTaskExecuteRunnable.afterThrowing(throwable);
+ asyncMasterTaskExecuteRunnable.afterThrowing(throwable);
}
private void executeFinished() {
TaskInstanceLogHeader.printFinalizeTaskHeader();
- int taskInstanceId =
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
+ int taskInstanceId =
asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId);
-
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId);
+ MasterTaskExecutorHolder.removeMasterTaskExecutor(taskInstanceId);
log.info("Task execute finished, removed the TaskExecutionContext");
- asyncMasterDelayTaskExecuteRunnable.sendTaskResult();
+ asyncMasterTaskExecuteRunnable.sendTaskResult();
log.info(
"Execute task finished, will send the task execute result to
master, the current task execute result is {}",
-
asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name());
- asyncMasterDelayTaskExecuteRunnable.closeLogAppender();
+
asyncMasterTaskExecuteRunnable.getTaskExecutionContext().getCurrentExecutionStatus().name());
+ asyncMasterTaskExecuteRunnable.closeLogAppender();
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
index 64443855fa..ab749b5861 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/DefaultTaskExecuteRunnableFactory.java
@@ -21,6 +21,9 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import
org.apache.dolphinscheduler.server.master.exception.TaskExecuteRunnableCreateException;
import
org.apache.dolphinscheduler.server.master.exception.TaskExecutionContextCreateException;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnableFactory;
+import
org.apache.dolphinscheduler.server.master.runner.TaskExecutionContextFactory;
import
org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java
deleted file mode 100644
index 70f3e93521..0000000000
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.runner.execute;
-
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import
org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager;
-import
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFactoryBuilder;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-public abstract class MasterDelayTaskExecuteRunnable extends
MasterTaskExecuteRunnable implements Delayed {
-
- public MasterDelayTaskExecuteRunnable(TaskExecutionContext
taskExecutionContext,
- LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
-
LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager) {
- super(taskExecutionContext, logicTaskPluginFactoryBuilder,
logicTaskInstanceExecutionEventSenderManager);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof MasterDelayTaskExecuteRunnable)) {
- return false;
- }
- MasterDelayTaskExecuteRunnable other =
(MasterDelayTaskExecuteRunnable) obj;
- return other.getTaskExecutionContext().getTaskInstanceId() ==
this.getTaskExecutionContext()
- .getTaskInstanceId();
- }
-
- @Override
- public int hashCode() {
- return this.getTaskExecutionContext().getTaskInstanceId();
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
- return unit.convert(
- DateUtils.getRemainTime(
- taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L),
- TimeUnit.SECONDS);
- }
-
- @Override
- public int compareTo(Delayed o) {
- if (o == null) {
- return 1;
- }
- return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
- }
-
-}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
similarity index 94%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
index 9e08c1a4d2..8c1e2feaba 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutor.java
@@ -35,16 +35,16 @@ import
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFact
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public abstract class MasterTaskExecuteRunnable implements Runnable {
+public abstract class MasterTaskExecutor implements Runnable {
protected final TaskExecutionContext taskExecutionContext;
protected final LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder;
protected final LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager;
protected ILogicTask logicTask;
- public MasterTaskExecuteRunnable(TaskExecutionContext taskExecutionContext,
- LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
-
LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager) {
+ public MasterTaskExecutor(TaskExecutionContext taskExecutionContext,
+ LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
+ LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager) {
this.taskExecutionContext = taskExecutionContext;
this.logicTaskPluginFactoryBuilder = logicTaskPluginFactoryBuilder;
this.logicTaskInstanceExecutionEventSenderManager =
logicTaskInstanceExecutionEventSenderManager;
@@ -68,7 +68,7 @@ public abstract class MasterTaskExecuteRunnable implements
Runnable {
"Get a exception when execute the task, sent the task execute
result to master, the current task execute result is {}",
taskExecutionContext.getCurrentExecutionStatus());
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
-
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
+
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
log.info("Get a exception when execute the task, removed the
TaskExecutionContext");
closeLogAppender();
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java
similarity index 83%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java
index 0fd79dfabe..d1f76aedbc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterDelayTaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactory.java
@@ -19,8 +19,8 @@ package
org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-public interface MasterDelayTaskExecuteRunnableFactory<T extends
MasterDelayTaskExecuteRunnable> {
+public interface MasterTaskExecutorFactory<T extends MasterTaskExecutor> {
- T createWorkerTaskExecuteRunnable(TaskExecutionContext
taskExecutionContext);
+ T createMasterTaskExecutor(TaskExecutionContext taskExecutionContext);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java
similarity index 75%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java
index c5689f6a1b..84c342c074 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableFactoryBuilder.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorFactoryBuilder.java
@@ -28,24 +28,27 @@ import org.springframework.stereotype.Component;
import com.google.common.collect.Sets;
+/**
+ * Use to create MasterTaskExecutorFactory
+ */
@Component
-public class MasterTaskExecuteRunnableFactoryBuilder {
+public class MasterTaskExecutorFactoryBuilder {
@Autowired
- private AsyncMasterDelayTaskExecuteRunnableFactory
asyncMasterDelayTaskExecuteRunnableFactory;
+ private AsyncMasterTaskExecutorFactory asyncMasterTaskExecutorFactory;
@Autowired
- private SyncMasterDelayTaskExecuteRunnableFactory
syncMasterDelayTaskExecuteRunnableFactory;
+ private SyncMasterTaskExecutorFactory syncMasterTaskExecutorFactory;
private static final Set<String> ASYNC_TASK_TYPE = Sets.newHashSet(
DependentLogicTask.TASK_TYPE,
SubWorkflowLogicTask.TASK_TYPE,
DynamicLogicTask.TASK_TYPE);
- public MasterDelayTaskExecuteRunnableFactory<? extends
MasterDelayTaskExecuteRunnable>
createWorkerDelayTaskExecuteRunnableFactory(String taskType) {
+ public MasterTaskExecutorFactory<? extends MasterTaskExecutor>
createMasterTaskExecutorFactory(String taskType) {
if (ASYNC_TASK_TYPE.contains(taskType)) {
- return asyncMasterDelayTaskExecuteRunnableFactory;
+ return asyncMasterTaskExecutorFactory;
}
- return syncMasterDelayTaskExecuteRunnableFactory;
+ return syncMasterTaskExecutorFactory;
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java
similarity index 66%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java
index 6b29897611..983542cae2 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorHolder.java
@@ -25,20 +25,20 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@UtilityClass
-public class MasterTaskExecuteRunnableHolder {
+public class MasterTaskExecutorHolder {
- private static final Map<Integer, MasterTaskExecuteRunnable>
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
+ private static final Map<Integer, MasterTaskExecutor>
SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>();
- public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
-
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(),
- masterTaskExecuteRunnable);
+ public void putMasterTaskExecuteRunnable(MasterTaskExecutor
masterTaskExecutor) {
+
SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecutor.getTaskExecutionContext().getTaskInstanceId(),
+ masterTaskExecutor);
}
- public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer
taskInstanceId) {
+ public MasterTaskExecutor getMasterTaskExecutor(Integer taskInstanceId) {
return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId);
}
- public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) {
+ public void removeMasterTaskExecutor(Integer taskInstanceId) {
SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId);
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
similarity index 89%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
index 4f542556e4..d61d058d22 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java
@@ -30,7 +30,7 @@ import com.google.common.util.concurrent.MoreExecutors;
@Slf4j
@Component
-public class MasterTaskExecuteRunnableThreadPool {
+public class MasterTaskExecutorThreadPool {
@Autowired
private MasterConfig masterConfig;
@@ -44,8 +44,8 @@ public class MasterTaskExecuteRunnableThreadPool {
log.info("MasterTaskExecuteRunnableThreadPool started...");
}
- public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable
masterTaskExecuteRunnable) {
- listeningExecutorService.submit(masterTaskExecuteRunnable);
+ public void submitMasterTaskExecutor(MasterTaskExecutor
masterTaskExecutor) {
+ listeningExecutorService.submit(masterTaskExecutor);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java
similarity index 82%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java
index 62365f78cb..7f303487ef 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutor.java
@@ -27,11 +27,11 @@ import
org.apache.dolphinscheduler.server.master.runner.task.LogicTaskPluginFact
import lombok.extern.slf4j.Slf4j;
@Slf4j
-public class SyncMasterDelayTaskExecuteRunnable extends
MasterDelayTaskExecuteRunnable {
+public class SyncMasterTaskExecutor extends MasterTaskExecutor {
- public SyncMasterDelayTaskExecuteRunnable(TaskExecutionContext
taskExecutionContext,
- LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
-
LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager) {
+ public SyncMasterTaskExecutor(TaskExecutionContext taskExecutionContext,
+ LogicTaskPluginFactoryBuilder
logicTaskPluginFactoryBuilder,
+ LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager) {
super(taskExecutionContext, logicTaskPluginFactoryBuilder,
logicTaskInstanceExecutionEventSenderManager);
}
@@ -56,7 +56,7 @@ public class SyncMasterDelayTaskExecuteRunnable extends
MasterDelayTaskExecuteRu
taskExecutionContext.getCurrentExecutionStatus().name());
closeLogAppender();
MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId());
-
MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId());
+
MasterTaskExecutorHolder.removeMasterTaskExecutor(taskExecutionContext.getTaskInstanceId());
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java
similarity index 81%
rename from
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java
rename to
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java
index 2bf829c021..b591711e3d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterTaskExecutorFactory.java
@@ -28,9 +28,9 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class SyncMasterDelayTaskExecuteRunnableFactory
+public class SyncMasterTaskExecutorFactory
implements
-
MasterDelayTaskExecuteRunnableFactory<SyncMasterDelayTaskExecuteRunnable> {
+ MasterTaskExecutorFactory<SyncMasterTaskExecutor> {
@Autowired
private LogicTaskPluginFactoryBuilder logicTaskPluginFactoryBuilder;
@@ -38,8 +38,8 @@ public class SyncMasterDelayTaskExecuteRunnableFactory
private LogicTaskInstanceExecutionEventSenderManager
logicTaskInstanceExecutionEventSenderManager;
@Override
- public SyncMasterDelayTaskExecuteRunnable
createWorkerTaskExecuteRunnable(TaskExecutionContext taskExecutionContext) {
- return new SyncMasterDelayTaskExecuteRunnable(taskExecutionContext,
logicTaskPluginFactoryBuilder,
+ public SyncMasterTaskExecutor
createMasterTaskExecutor(TaskExecutionContext taskExecutionContext) {
+ return new SyncMasterTaskExecutor(taskExecutionContext,
logicTaskPluginFactoryBuilder,
logicTaskInstanceExecutionEventSenderManager);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
index 6294f581ec..6f0419ae97 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
@@ -17,19 +17,41 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
+import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public abstract class BaseTaskExecuteRunnableDispatchOperator implements
TaskExecuteRunnableOperator {
private final GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue;
- public
BaseTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue) {
+ private final TaskInstanceDao taskInstanceDao;
+
+ public BaseTaskExecuteRunnableDispatchOperator(
+
GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue,
+ TaskInstanceDao
taskInstanceDao) {
this.globalTaskDispatchWaitingQueue = globalTaskDispatchWaitingQueue;
+ this.taskInstanceDao = taskInstanceDao;
}
@Override
public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
+ long remainTime = taskExecuteRunnable.getDelay(TimeUnit.SECONDS);
+ TaskInstance taskInstance = taskExecuteRunnable.getTaskInstance();
+ if (remainTime > 0) {
+ taskInstance.setState(TaskExecutionStatus.DELAY_EXECUTION);
+ taskInstanceDao.updateById(taskInstance);
+ log.info("Current taskInstance: {} is choose delay execution,
delay time: {}/s, remainTime: {}/s",
+ taskInstance.getName(),
+ taskInstance.getDelayTime(), remainTime);
+ }
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
index a1d0a893fc..1b7a92db98 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableKillOperator.java
@@ -20,7 +20,7 @@ package
org.apache.dolphinscheduler.server.master.runner.operator;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import java.util.Date;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
index 383752d3f3..8163817afc 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnablePauseOperator.java
@@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import lombok.extern.slf4j.Slf4j;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
index 4068bfa082..ef9dc80901 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableTimeoutOperator.java
@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import java.util.Date;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
index d5b0802372..ed1b777aeb 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/LogicTaskExecuteRunnableDispatchOperator.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
import org.springframework.stereotype.Component;
@@ -24,8 +25,9 @@ import org.springframework.stereotype.Component;
@Component
public class LogicTaskExecuteRunnableDispatchOperator extends
BaseTaskExecuteRunnableDispatchOperator {
- public
LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue) {
- super(globalTaskDispatchWaitingQueue);
+ public
LogicTaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue,
+ TaskInstanceDao
taskInstanceDao) {
+ super(globalTaskDispatchWaitingQueue, taskInstanceDao);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
index 5a9f070641..5a31f1138d 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableDispatchOperator.java
@@ -17,20 +17,16 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
+import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import
org.apache.dolphinscheduler.server.master.runner.GlobalTaskDispatchWaitingQueue;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
-public class TaskExecuteRunnableDispatchOperator implements
TaskExecuteRunnableOperator {
+public class TaskExecuteRunnableDispatchOperator extends
BaseTaskExecuteRunnableDispatchOperator {
- @Autowired
- private GlobalTaskDispatchWaitingQueue globalTaskDispatchWaitingQueue;
-
- @Override
- public void operate(DefaultTaskExecuteRunnable taskExecuteRunnable) {
-
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
+ public TaskExecuteRunnableDispatchOperator(GlobalTaskDispatchWaitingQueue
globalTaskDispatchWaitingQueue,
+ TaskInstanceDao
taskInstanceDao) {
+ super(globalTaskDispatchWaitingQueue, taskInstanceDao);
}
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
index d270103da1..1d397e3575 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperator.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
public interface TaskExecuteRunnableOperator {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
index b217aff866..1b92f5e75c 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnableOperatorManager.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.operator;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import org.springframework.beans.factory.annotation.Autowired;
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
index 39896bb0cb..f450044377 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskExecuteRunnablePauseOperator.java
@@ -22,7 +22,7 @@ import
org.apache.dolphinscheduler.extract.base.client.SingletonJdkDynamicRpcCli
import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseRequest;
import
org.apache.dolphinscheduler.extract.worker.transportor.TaskInstancePauseResponse;
-import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
import org.apache.commons.lang3.StringUtils;
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
index d121d7e198..28c0010e3b 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java
@@ -39,7 +39,6 @@ import
org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
index c9710209e5..f57c9b6a68 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/MasterTaskDispatcherTest.java
@@ -20,7 +20,7 @@ package
org.apache.dolphinscheduler.server.master.runner.dispatcher;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
index 9b447aada3..46c4f53e1a 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/WorkerTaskDispatcherTest.java
@@ -23,7 +23,7 @@ import
org.apache.dolphinscheduler.server.master.config.MasterConfig;
import
org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
-import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.master.runner.TaskExecuteRunnable;
import java.util.Optional;
diff --git
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
similarity index 89%
rename from
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java
rename to
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
index 3a3a80b3ca..778884e066 100644
---
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java
+++
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityDelayTaskExecuteRunnableTest.java
@@ -21,12 +21,14 @@ import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import
org.apache.dolphinscheduler.server.master.runner.DefaultTaskExecuteRunnable;
+import
org.apache.dolphinscheduler.server.master.runner.PriorityDelayTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.operator.TaskExecuteRunnableOperatorManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-public class PriorityTaskExecuteRunnableTest {
+public class PriorityDelayTaskExecuteRunnableTest {
@Test
public void testCompareTo() {
@@ -46,9 +48,9 @@ public class PriorityTaskExecuteRunnableTest {
TaskExecutionContext context1 = new TaskExecutionContext();
TaskExecutionContext context2 = new TaskExecutionContext();
- PriorityTaskExecuteRunnable p1 =
+ PriorityDelayTaskExecuteRunnable p1 =
new DefaultTaskExecuteRunnable(workflowInstance, t1, context1,
taskOperatorManager);
- PriorityTaskExecuteRunnable p2 =
+ PriorityDelayTaskExecuteRunnable p2 =
new DefaultTaskExecuteRunnable(workflowInstance, t2, context2,
taskOperatorManager);
Assertions.assertEquals(0, p1.compareTo(p2));
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
index 3c24977c4a..d5a858a437 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.api.shell;
+import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.exception.FileOperateException;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
@@ -67,8 +68,10 @@ public abstract class BaseLinuxShellInterceptorBuilder<T
extends BaseLinuxShellI
protected List<String> generateBootstrapCommand() throws
FileOperateException {
if (sudoEnable) {
- // Set the tenant owner as the working directory
- FileUtils.setDirectoryOwner(Paths.get(shellDirectory), runUser);
+ if (!TenantConstants.BOOTSTRAPT_SYSTEM_USER.equals(runUser)) {
+ // Set the tenant owner as the working directory
+ FileUtils.setDirectoryOwner(Paths.get(shellDirectory),
runUser);
+ }
return bootstrapCommandInSudoMode();
}
return bootstrapCommandInNormalMode();
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 082d784aab..e61409c9d2 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -29,7 +29,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueueLooper;
+import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.commons.collections4.CollectionUtils;
@@ -68,7 +68,7 @@ public class WorkerServer implements IStoppable {
private MessageRetryRunner messageRetryRunner;
@Autowired
- private GlobalTaskInstanceDispatchQueueLooper
globalTaskInstanceDispatchQueueLooper;
+ private GlobalTaskInstanceWaitingQueueLooper
globalTaskInstanceWaitingQueueLooper;
/**
* worker server startup, not use web service
@@ -91,7 +91,7 @@ public class WorkerServer implements IStoppable {
this.workerManagerThread.start();
this.messageRetryRunner.start();
- this.globalTaskInstanceDispatchQueueLooper.start();
+ this.globalTaskInstanceWaitingQueueLooper.start();
/*
* registry hooks, which are called before the process exits
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
index 2ab6ed8ecd..dc97a2c9a4 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
-import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueue;
+import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import java.time.Duration;
@@ -57,7 +57,7 @@ public class WorkerWaitingStrategy implements
WorkerConnectStrategy {
private WorkerManagerThread workerManagerThread;
@Autowired
- private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue;
+ private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
@Override
public void disconnect() {
@@ -121,7 +121,7 @@ public class WorkerWaitingStrategy implements
WorkerConnectStrategy {
workerRpcServer.close();
log.warn("Worker server close the RPC server due to lost connection
from registry");
workerManagerThread.clearTask();
- globalTaskInstanceDispatchQueue.clearTask();
+ globalTaskInstanceWaitingQueue.clearTask();
log.warn("Worker server clear the tasks due to lost connection from
registry");
messageRetryRunner.clearMessage();
log.warn("Worker server clear the retry message due to lost connection
from registry");
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
index 3466d52c28..c57ee2efca 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
@@ -26,7 +26,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import lombok.extern.slf4j.Slf4j;
@@ -53,12 +53,12 @@ public class StreamingTaskInstanceOperatorImpl implements
IStreamingTaskInstance
log.error("Cannot find TaskExecutionContext for taskInstance:
{}", taskInstanceId);
return TaskInstanceTriggerSavepointResponse.fail("Cannot find
TaskExecutionContext");
}
- WorkerTaskExecuteRunnable workerTaskExecuteRunnable =
workerManager.getTaskExecuteThread(taskInstanceId);
- if (workerTaskExecuteRunnable == null) {
+ WorkerTaskExecutor workerTaskExecutor =
workerManager.getTaskExecuteThread(taskInstanceId);
+ if (workerTaskExecutor == null) {
log.error("Cannot find WorkerTaskExecuteRunnable for
taskInstance: {}", taskInstanceId);
return TaskInstanceTriggerSavepointResponse.fail("Cannot find
WorkerTaskExecuteRunnable");
}
- AbstractTask task = workerTaskExecuteRunnable.getTask();
+ AbstractTask task = workerTaskExecutor.getTask();
if (task == null) {
log.error("Cannot find StreamTask for taskInstance:{}",
taskInstanceId);
return TaskInstanceTriggerSavepointResponse.fail("Cannot find
StreamTask");
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
deleted file mode 100644
index 631b29cc8e..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import javax.annotation.Nullable;
-
-import lombok.NonNull;
-
-public class DefaultWorkerDelayTaskExecuteRunnableFactory
- extends
-
WorkerDelayTaskExecuteRunnableFactory<DefaultWorkerDelayTaskExecuteRunnable> {
-
- protected DefaultWorkerDelayTaskExecuteRunnableFactory(@NonNull
TaskExecutionContext taskExecutionContext,
- @NonNull
WorkerConfig workerConfig,
- @NonNull
WorkerMessageSender workerMessageSender,
- @NonNull
TaskPluginManager taskPluginManager,
- @Nullable
StorageOperate storageOperate,
- @NonNull
WorkerRegistryClient workerRegistryClient) {
- super(taskExecutionContext,
- workerConfig,
- workerMessageSender,
- taskPluginManager,
- storageOperate,
- workerRegistryClient);
- }
-
- @Override
- public DefaultWorkerDelayTaskExecuteRunnable
createWorkerTaskExecuteRunnable() {
- return new DefaultWorkerDelayTaskExecuteRunnable(
- taskExecutionContext,
- workerConfig,
- workerMessageSender,
- taskPluginManager,
- storageOperate,
- workerRegistryClient);
- }
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
similarity index 76%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
index 82b91e16ba..19421ee05e 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnable.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutor.java
@@ -30,14 +30,14 @@ import javax.annotation.Nullable;
import lombok.NonNull;
-public class DefaultWorkerDelayTaskExecuteRunnable extends
WorkerDelayTaskExecuteRunnable {
+public class DefaultWorkerTaskExecutor extends WorkerTaskExecutor {
- public DefaultWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext
taskExecutionContext,
- @NonNull WorkerConfig
workerConfig,
- @NonNull WorkerMessageSender
workerMessageSender,
- @NonNull TaskPluginManager
taskPluginManager,
- @Nullable StorageOperate
storageOperate,
- @NonNull WorkerRegistryClient
workerRegistryClient) {
+ public DefaultWorkerTaskExecutor(@NonNull TaskExecutionContext
taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull WorkerMessageSender
workerMessageSender,
+ @NonNull TaskPluginManager
taskPluginManager,
+ @Nullable StorageOperate storageOperate,
+ @NonNull WorkerRegistryClient
workerRegistryClient) {
super(taskExecutionContext,
workerConfig,
workerMessageSender,
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
similarity index 55%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
index f2d39835b1..0141a5cd17 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorFactory.java
@@ -28,24 +28,23 @@ import javax.annotation.Nullable;
import lombok.NonNull;
-public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends
WorkerDelayTaskExecuteRunnable>
+public class DefaultWorkerTaskExecutorFactory
implements
- WorkerTaskExecuteRunnableFactory<T> {
-
- protected final @NonNull TaskExecutionContext taskExecutionContext;
- protected final @NonNull WorkerConfig workerConfig;
- protected final @NonNull WorkerMessageSender workerMessageSender;
- protected final @NonNull TaskPluginManager taskPluginManager;
- protected final @Nullable StorageOperate storageOperate;
- protected final @NonNull WorkerRegistryClient workerRegistryClient;
-
- protected WorkerDelayTaskExecuteRunnableFactory(
- @NonNull
TaskExecutionContext taskExecutionContext,
- @NonNull WorkerConfig
workerConfig,
- @NonNull
WorkerMessageSender workerMessageSender,
- @NonNull TaskPluginManager
taskPluginManager,
- @Nullable StorageOperate
storageOperate,
- @NonNull
WorkerRegistryClient workerRegistryClient) {
+ WorkerTaskExecutorFactory<DefaultWorkerTaskExecutor> {
+
+ private final @NonNull TaskExecutionContext taskExecutionContext;
+ private final @NonNull WorkerConfig workerConfig;
+ private final @NonNull WorkerMessageSender workerMessageSender;
+ private final @NonNull TaskPluginManager taskPluginManager;
+ private final @Nullable StorageOperate storageOperate;
+ private final @NonNull WorkerRegistryClient workerRegistryClient;
+
+ public DefaultWorkerTaskExecutorFactory(@NonNull TaskExecutionContext
taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull WorkerMessageSender
workerMessageSender,
+ @NonNull TaskPluginManager
taskPluginManager,
+ @Nullable StorageOperate
storageOperate,
+ @NonNull WorkerRegistryClient
workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
@@ -54,5 +53,14 @@ public abstract class
WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDela
this.workerRegistryClient = workerRegistryClient;
}
- public abstract T createWorkerTaskExecuteRunnable();
+ @Override
+ public DefaultWorkerTaskExecutor createWorkerTaskExecutor() {
+ return new DefaultWorkerTaskExecutor(
+ taskExecutionContext,
+ workerConfig,
+ workerMessageSender,
+ taskPluginManager,
+ storageOperate,
+ workerRegistryClient);
+ }
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
similarity index 95%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
index 2b894913a7..e1ac97b2d3 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueue.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
@@ -31,13 +31,13 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class GlobalTaskInstanceDispatchQueue {
+public class GlobalTaskInstanceWaitingQueue {
private final WorkerConfig workerConfig;
private final BlockingQueue<TaskExecutionContext> blockingQueue;
- public GlobalTaskInstanceDispatchQueue(WorkerConfig workerConfig) {
+ public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.blockingQueue = new
ArrayBlockingQueue<>(workerConfig.getExecThreads());
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
similarity index 66%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
index 654a1dd9fe..6e501a1be6 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceDispatchQueueLooper.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
@@ -18,12 +18,9 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import
org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
@@ -36,10 +33,10 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
-public class GlobalTaskInstanceDispatchQueueLooper extends BaseDaemonThread {
+public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread {
@Autowired
- private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue;
+ private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
@Autowired
private WorkerConfig workerConfig;
@@ -59,7 +56,7 @@ public class GlobalTaskInstanceDispatchQueueLooper extends
BaseDaemonThread {
@Autowired
private WorkerRegistryClient workerRegistryClient;
- protected GlobalTaskInstanceDispatchQueueLooper() {
+ protected GlobalTaskInstanceWaitingQueueLooper() {
super("GlobalTaskDispatchQueueLooper");
}
@@ -72,39 +69,25 @@ public class GlobalTaskInstanceDispatchQueueLooper extends
BaseDaemonThread {
public void run() {
while (true) {
try {
- TaskExecutionContext taskExecutionContext =
globalTaskInstanceDispatchQueue.take();
+ TaskExecutionContext taskExecutionContext =
globalTaskInstanceWaitingQueue.take();
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
- int delayTime = taskExecutionContext.getDelayTime();
- if (delayTime > 0) {
- // delay task process
- long remainTime =
- DateUtils.getRemainTime(
-
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
- delayTime * 60L);
- if (remainTime > 0) {
- log.info("Current taskInstance is choose delay
execution, delay time: {}s", remainTime);
-
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.DELAY_EXECUTION);
- // todo: use delay running event
- workerMessageSender.sendMessage(taskExecutionContext,
-
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
- }
- }
- WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable =
WorkerTaskExecuteRunnableFactoryBuilder
- .createWorkerDelayTaskExecuteRunnableFactory(
+ WorkerTaskExecutor workerTaskExecutor =
WorkerTaskExecutorFactoryBuilder
+ .createWorkerTaskExecutorFactory(
taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient)
- .createWorkerTaskExecuteRunnable();
- if (workerManager.offer(workerTaskExecuteRunnable)) {
+ .createWorkerTaskExecutor();
+ if (workerManager.offer(workerTaskExecutor)) {
log.info("Success submit WorkerDelayTaskExecuteRunnable to
WorkerManagerThread's waiting queue");
}
} catch (InterruptedException e) {
log.error("GlobalTaskDispatchQueueLooper interrupted");
+ Thread.currentThread().interrupt();
break;
} catch (Exception ex) {
log.error("GlobalTaskDispatchQueueLooper error", ex);
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
deleted file mode 100644
index 07d85dc57b..0000000000
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerDelayTaskExecuteRunnable.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.runner;
-
-import org.apache.dolphinscheduler.common.utils.DateUtils;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import lombok.NonNull;
-
-public abstract class WorkerDelayTaskExecuteRunnable extends
WorkerTaskExecuteRunnable implements Delayed {
-
- protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext
taskExecutionContext,
- @NonNull WorkerConfig
workerConfig,
- @NonNull WorkerMessageSender
workerMessageSender,
- @NonNull TaskPluginManager
taskPluginManager,
- @Nullable StorageOperate
storageOperate,
- @NonNull WorkerRegistryClient
workerRegistryClient) {
- super(taskExecutionContext,
- workerConfig,
- workerMessageSender,
- taskPluginManager,
- storageOperate,
- workerRegistryClient);
- }
-
- @Override
- public long getDelay(TimeUnit unit) {
- TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
- return unit.convert(
- DateUtils.getRemainTime(
-
DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()),
- taskExecutionContext.getDelayTime() * 60L),
- TimeUnit.SECONDS);
- }
-
- @Override
- public int compareTo(Delayed o) {
- if (o == null) {
- return 1;
- }
- return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
- }
-
-}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
index f16e887bec..2a6b7feec2 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
@@ -37,25 +37,19 @@ public class WorkerExecService {
private final ListeningExecutorService listeningExecutorService;
- /**
- * thread executor service
- */
private final ExecutorService execService;
- /**
- * running task
- */
- private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable>
taskExecuteThreadMap;
+ private final ConcurrentHashMap<Integer, WorkerTaskExecutor>
taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService,
- ConcurrentHashMap<Integer,
WorkerTaskExecuteRunnable> taskExecuteThreadMap) {
+ ConcurrentHashMap<Integer, WorkerTaskExecutor>
taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService =
MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size);
}
- public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
+ public void submit(final WorkerTaskExecutor taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
taskExecuteThread);
ListenableFuture future =
this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@@ -90,7 +84,7 @@ public class WorkerExecService {
return ((ThreadPoolExecutor) this.execService).getActiveCount();
}
- public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
+ public Map<Integer, WorkerTaskExecutor> getTaskExecuteThreadMap() {
return taskExecuteThreadMap;
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 3fdcec4e89..4b666cfb20 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -25,8 +25,9 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
@@ -41,28 +42,22 @@ import org.springframework.stereotype.Component;
@Slf4j
public class WorkerManagerThread implements Runnable {
- private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
+ private final BlockingQueue<WorkerTaskExecutor> waitSubmitQueue;
private final WorkerExecService workerExecService;
- private final WorkerConfig workerConfig;
private final int workerExecThreads;
- /**
- * running task
- */
- private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable>
taskExecuteThreadMap =
- new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Integer, WorkerTaskExecutor>
taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
- this.workerConfig = workerConfig;
workerExecThreads = workerConfig.getExecThreads();
- this.waitSubmitQueue = new DelayQueue<>();
+ this.waitSubmitQueue = new LinkedBlockingQueue<>();
workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads()),
taskExecuteThreadMap);
}
- public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer
taskInstanceId) {
+ public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer
taskInstanceId) {
return taskExecuteThreadMap.get(taskInstanceId);
}
@@ -95,7 +90,7 @@ public class WorkerManagerThread implements Runnable {
.forEach(waitSubmitQueue::remove);
}
- public boolean offer(WorkerDelayTaskExecuteRunnable
workerDelayTaskExecuteRunnable) {
+ public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) {
return waitSubmitQueue.add(workerDelayTaskExecuteRunnable);
}
@@ -122,8 +117,8 @@ public class WorkerManagerThread implements Runnable {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
- final WorkerDelayTaskExecuteRunnable
workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
- workerExecService.submit(workerDelayTaskExecuteRunnable);
+ WorkerTaskExecutor workerTaskExecutor =
waitSubmitQueue.take();
+ workerExecService.submit(workerTaskExecutor);
} else {
WorkerServerMetrics.incWorkerOverloadCount();
log.info("Exec queue is full, waiting submit queue {},
waiting exec queue size {}",
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
similarity index 95%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
index 15141ed747..c0c7c35b7d 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
@@ -67,9 +67,9 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
-public abstract class WorkerTaskExecuteRunnable implements Runnable {
+public abstract class WorkerTaskExecutor implements Runnable {
- protected static final Logger log =
LoggerFactory.getLogger(WorkerTaskExecuteRunnable.class);
+ protected static final Logger log =
LoggerFactory.getLogger(WorkerTaskExecutor.class);
protected final TaskExecutionContext taskExecutionContext;
protected final WorkerConfig workerConfig;
@@ -80,13 +80,13 @@ public abstract class WorkerTaskExecuteRunnable implements
Runnable {
protected @Nullable AbstractTask task;
- protected WorkerTaskExecuteRunnable(
- @NonNull TaskExecutionContext
taskExecutionContext,
- @NonNull WorkerConfig workerConfig,
- @NonNull WorkerMessageSender
workerMessageSender,
- @NonNull TaskPluginManager
taskPluginManager,
- @Nullable StorageOperate
storageOperate,
- @NonNull WorkerRegistryClient
workerRegistryClient) {
+ protected WorkerTaskExecutor(
+ @NonNull TaskExecutionContext
taskExecutionContext,
+ @NonNull WorkerConfig workerConfig,
+ @NonNull WorkerMessageSender
workerMessageSender,
+ @NonNull TaskPluginManager taskPluginManager,
+ @Nullable StorageOperate storageOperate,
+ @NonNull WorkerRegistryClient
workerRegistryClient) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workerMessageSender = workerMessageSender;
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java
similarity index 90%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java
index 441662f4bc..5a185dbba0 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactory.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactory.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
-public interface WorkerTaskExecuteRunnableFactory<T> {
+public interface WorkerTaskExecutorFactory<T> {
- T createWorkerTaskExecuteRunnable();
+ T createWorkerTaskExecutor();
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
similarity index 72%
rename from
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
rename to
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
index 603462e5b0..c2efdc9c7a 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnableFactoryBuilder.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
@@ -30,15 +30,15 @@ import lombok.NonNull;
import lombok.experimental.UtilityClass;
@UtilityClass
-public class WorkerTaskExecuteRunnableFactoryBuilder {
+public class WorkerTaskExecutorFactoryBuilder {
- public static WorkerDelayTaskExecuteRunnableFactory<?>
createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext
taskExecutionContext,
-
@NonNull WorkerConfig workerConfig,
-
@NonNull WorkerMessageSender workerMessageSender,
-
@NonNull TaskPluginManager taskPluginManager,
-
@Nullable StorageOperate storageOperate,
-
@NonNull WorkerRegistryClient workerRegistryClient) {
- return new
DefaultWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
+ public static WorkerTaskExecutorFactory<? extends WorkerTaskExecutor>
createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext
taskExecutionContext,
+
@NonNull WorkerConfig workerConfig,
+
@NonNull WorkerMessageSender workerMessageSender,
+
@NonNull TaskPluginManager taskPluginManager,
+
@Nullable StorageOperate storageOperate,
+
@NonNull WorkerRegistryClient workerRegistryClient) {
+ return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
index 3ff110c459..4d3ebc9aa9 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
@@ -24,7 +24,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
-import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceDispatchQueue;
+import
org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
import lombok.extern.slf4j.Slf4j;
@@ -41,7 +41,7 @@ public class TaskInstanceDispatchOperationFunction
private WorkerConfig workerConfig;
@Autowired
- private GlobalTaskInstanceDispatchQueue globalTaskInstanceDispatchQueue;
+ private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
@Override
public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest
taskInstanceDispatchRequest) {
@@ -57,7 +57,7 @@ public class TaskInstanceDispatchOperationFunction
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
- if
(!globalTaskInstanceDispatchQueue.addDispatchTask(taskExecutionContext)) {
+ if
(!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) {
log.error("Submit task: {} to wait queue error, current queue
size: {} is full",
taskExecutionContext.getTaskName(),
workerConfig.getExecThreads());
return
TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index 347a52fa50..c5f4ffb78b 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -29,7 +29,7 @@ import
org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
-import
org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
+import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import lombok.extern.slf4j.Slf4j;
@@ -103,12 +103,12 @@ public class TaskInstanceKillOperationFunction
}
protected void cancelApplication(int taskInstanceId) {
- WorkerTaskExecuteRunnable workerTaskExecuteRunnable =
workerManager.getTaskExecuteThread(taskInstanceId);
- if (workerTaskExecuteRunnable == null) {
+ WorkerTaskExecutor workerTaskExecutor =
workerManager.getTaskExecuteThread(taskInstanceId);
+ if (workerTaskExecutor == null) {
log.warn("taskExecuteThread not found, taskInstanceId:{}",
taskInstanceId);
return;
}
- AbstractTask task = workerTaskExecuteRunnable.getTask();
+ AbstractTask task = workerTaskExecutor.getTask();
if (task == null) {
log.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
similarity index 91%
rename from
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
rename to
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
index 79d04edae1..43ef6f87d0 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerTaskExecutorTest.java
@@ -30,7 +30,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
-public class DefaultWorkerDelayTaskExecuteRunnableTest {
+public class DefaultWorkerTaskExecutorTest {
private TaskExecutionContext taskExecutionContext =
Mockito.mock(TaskExecutionContext.class);
@@ -54,7 +54,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
.processDefineId(0)
.firstSubmitTime(System.currentTimeMillis())
.build();
- WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new
DefaultWorkerDelayTaskExecuteRunnable(
+ WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor(
taskExecutionContext,
workerConfig,
workerMessageSender,
@@ -62,7 +62,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
storageOperate,
workerRegistryClient);
- Assertions.assertAll(workerTaskExecuteRunnable::run);
+ Assertions.assertAll(workerTaskExecutor::run);
Assertions.assertEquals(TaskExecutionStatus.SUCCESS,
taskExecutionContext.getCurrentExecutionStatus());
}
@@ -78,7 +78,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
.taskParams(
"{\"localParams\":[],\"resourceList\":[],\"type\":\"POSTGRESQL\",\"datasource\":null,\"sql\":\"select
* from
t_ds_user\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}")
.build();
- WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new
DefaultWorkerDelayTaskExecuteRunnable(
+ WorkerTaskExecutor workerTaskExecutor = new DefaultWorkerTaskExecutor(
taskExecutionContext,
workerConfig,
workerMessageSender,
@@ -86,7 +86,7 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
storageOperate,
workerRegistryClient);
- Assertions.assertAll(workerTaskExecuteRunnable::run);
+ Assertions.assertAll(workerTaskExecutor::run);
Assertions.assertEquals(TaskExecutionStatus.FAILURE,
taskExecutionContext.getCurrentExecutionStatus());
}
}