This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 88d2803fe1 fix task dispatch error overload resource pool of task
group (#9667)
88d2803fe1 is described below
commit 88d2803fe1bd59b1fa719e3cddce1a9e7d1313e8
Author: caishunfeng <[email protected]>
AuthorDate: Fri Apr 22 18:39:40 2022 +0800
fix task dispatch error overload resource pool of task group (#9667)
---
.../server/master/runner/WorkflowExecuteThread.java | 4 ----
.../server/master/runner/task/CommonTaskProcessor.java | 7 ++++++-
.../dolphinscheduler/service/process/ProcessServiceImpl.java | 2 +-
3 files changed, 7 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 8f99f1bb93..c22d5811c3 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -351,8 +351,6 @@ public class WorkflowExecuteThread {
if (taskGroupQueue.getForceStart() == Flag.YES.getCode()) {
TaskInstance taskInstance =
this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor =
activeTaskProcessorMaps.get(taskInstance.getTaskCode());
- ProcessInstance processInstance =
this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
- taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
this.processService.updateTaskGroupQueueStatus(taskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode());
return true;
@@ -362,8 +360,6 @@ public class WorkflowExecuteThread {
if (acquireTaskGroup) {
TaskInstance taskInstance =
this.processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
ITaskProcessor taskProcessor =
activeTaskProcessorMaps.get(taskInstance.getTaskCode());
- ProcessInstance processInstance =
this.processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
- taskProcessor.init(taskInstance, processInstance);
taskProcessor.action(TaskAction.DISPATCH);
return true;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
index 73e1b1693b..5833bc56c1 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
@@ -61,7 +61,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
taskInstance.getName(),
taskGroupId,
taskInstance.getProcessInstanceId(),
- taskInstance.getTaskInstancePriority().getCode());
+ taskInstance.getTaskGroupPriority());
if (!acquireTaskGroup) {
logger.info("submit task name :{}, but the first time to try
to acquire task group failed", taskInstance.getName());
return true;
@@ -117,6 +117,11 @@ public class CommonTaskProcessor extends BaseTaskProcessor
{
taskInstance.getId(),
org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
TaskExecutionContext taskExecutionContext =
getTaskExecutionContext(taskInstance);
+ if (taskExecutionContext == null) {
+ logger.error("task get taskExecutionContext fail: {}",
taskInstance);
+ return false;
+ }
+
taskPriority.setTaskExecutionContext(taskExecutionContext);
taskUpdateQueue.put(taskPriority);
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 1b1421ea6b..c899a7baf5 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -1490,7 +1490,6 @@ public class ProcessServiceImpl implements ProcessService
{
taskInstance.setState(ExecutionStatus.PAUSE);
}
taskInstance.setExecutorId(processInstance.getExecutorId());
-
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
taskInstance.setState(getSubmitTaskState(taskInstance,
processInstance));
if (taskInstance.getSubmitTime() == null) {
taskInstance.setSubmitTime(new Date());
@@ -1670,6 +1669,7 @@ public class ProcessServiceImpl implements ProcessService
{
public void packageTaskInstance(TaskInstance taskInstance, ProcessInstance
processInstance) {
taskInstance.setProcessInstance(processInstance);
taskInstance.setProcessDefine(processInstance.getProcessDefinition());
+
taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority());
TaskDefinition taskDefinition = this.findTaskDefinition(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());