This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d7df899909 [Fix-11003]Task group queue is not updated to final state
(#11004)
d7df899909 is described below
commit d7df89990965155e7695da402dcf322ec276355a
Author: Mr.An <[email protected]>
AuthorDate: Tue Jul 19 17:48:16 2022 +0800
[Fix-11003]Task group queue is not updated to final state (#11004)
* fix after the task group is forced to start the task. Task group status
has not changed
---
.../service/process/ProcessServiceImpl.java | 50 +++++++++++-----------
1 file changed, 25 insertions(+), 25 deletions(-)
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 476cb03d7f..a305ca35a0 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
+import static java.util.stream.Collectors.toSet;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static
org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
@@ -31,8 +32,6 @@ import static
org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static
org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
-import static java.util.stream.Collectors.toSet;
-
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -322,14 +321,14 @@ public class ProcessServiceImpl implements ProcessService
{
//when we get the running instance(or waiting instance) only get the
priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard())
{
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
@@ -339,7 +338,7 @@ public class ProcessServiceImpl implements ProcessService {
saveProcessInstance(processInstance);
} else if
(processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
- processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
+ processInstance.getProcessDefinitionVersion(),
Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
@@ -356,13 +355,13 @@ public class ProcessServiceImpl implements ProcessService
{
// determine whether the process is normal
if (update > 0) {
StateEventChangeCommand stateEventChangeCommand = new
StateEventChangeCommand(
- info.getId(), 0, info.getState(), info.getId(), 0
+ info.getId(), 0, info.getState(), info.getId(), 0
);
try {
Host host = new Host(info.getHost());
stateEventCallbackService.sendResult(host,
stateEventChangeCommand.convert2Command());
} catch (Exception e) {
- logger.error("sendResultError",e );
+ logger.error("sendResultError", e);
}
}
}
@@ -798,10 +797,10 @@ public class ProcessServiceImpl implements ProcessService
{
}
String globalParams =
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- getCommandTypeIfComplement(processInstance, command),
- processInstance.getScheduleTime(), timezoneId);
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ getCommandTypeIfComplement(processInstance, command),
+ processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
// set process instance priority
@@ -952,10 +951,10 @@ public class ProcessServiceImpl implements ProcessService
{
// Recalculate global parameters after rerun.
String globalParams =
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- commandTypeIfComplement,
- processInstance.getScheduleTime(), timezoneId);
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ commandTypeIfComplement,
+ processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition);
}
@@ -1146,9 +1145,9 @@ public class ProcessServiceImpl implements ProcessService
{
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
String globalParams =
curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
- processDefinition.getGlobalParamMap(),
- processDefinition.getGlobalParamList(),
- CommandType.COMPLEMENT_DATA,
processInstance.getScheduleTime(), timezoneId);
+ processDefinition.getGlobalParamMap(),
+ processDefinition.getGlobalParamList(),
+ CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(),
timezoneId);
processInstance.setGlobalParams(globalParams);
}
@@ -1310,9 +1309,9 @@ public class ProcessServiceImpl implements ProcessService
{
TaskInstance task = submitTaskInstanceToDB(taskInstance,
processInstance);
if (task == null) {
logger.error("Save taskInstance to db error, task name:{}, process
id:{} state: {} ",
- taskInstance.getName(),
- taskInstance.getProcessInstance().getId(),
- processInstance.getState());
+ taskInstance.getName(),
+ taskInstance.getProcessInstance().getId(),
+ processInstance.getState());
return null;
}
@@ -1559,9 +1558,9 @@ public class ProcessServiceImpl implements ProcessService
{
ExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.typeIsFinished() || processInstanceState ==
ExecutionStatus.READY_STOP) {
logger.warn("processInstance: {} state was: {}, skip submit this
task, taskCode: {}",
- processInstance.getId(),
- processInstanceState,
- taskInstance.getTaskCode());
+ processInstance.getId(),
+ processInstanceState,
+ taskInstance.getTaskCode());
return null;
}
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
@@ -2989,7 +2988,7 @@ public class ProcessServiceImpl implements ProcessService
{
return null;
}
try {
- while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize()
+ while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() &&
taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(),
taskGroup.getUseSize()
, thisTaskGroupQueue.getId(),
TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
thisTaskGroupQueue =
this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() ==
TaskGroupQueueStatus.RELEASE) {
@@ -3089,6 +3088,7 @@ public class ProcessServiceImpl implements ProcessService
{
throw new ServiceException("delete command fail, id:" + commandId);
}
}
+
/**
* find k8s config yaml by clusterName
*