This is an automated email from the ASF dual-hosted git repository.
changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new a405abec9e [Bug][Master]serial_wait strategy workflow unable to wake
up (#15270)
a405abec9e is described below
commit a405abec9e7486f1bcefa7aa56a790caa37fc7dc
Author: Gallardot <[email protected]>
AuthorDate: Fri Jan 12 17:53:24 2024 +0800
[Bug][Master]serial_wait strategy workflow unable to wake up (#15270)
* fix: serial_wait strategy workflow unable to wake up
Signed-off-by: Gallardot <[email protected]>
* fix: serial_wait strategy workflow unable to wake up
Signed-off-by: Gallardot <[email protected]>
---------
Signed-off-by: Gallardot <[email protected]>
Co-authored-by: fuchanghai <[email protected]>
---
.../dolphinscheduler/dao/repository/ProcessInstanceDao.java | 7 +++++++
.../dao/repository/impl/ProcessInstanceDaoImpl.java | 9 +++++++++
.../server/master/runner/WorkflowExecuteRunnable.java | 2 +-
.../dolphinscheduler/service/process/ProcessServiceImpl.java | 10 +++++-----
4 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index 91a29eacba..6aa48ea12d 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends
IDao<ProcessInstance> {
*/
void upsertProcessInstance(ProcessInstance processInstance);
+ /**
+ * performs an "upsert" operation (update or insert) on a ProcessInstance
object within a new transaction
+ *
+ * @param processInstance processInstance
+ */
+ void performTransactionalUpsert(ProcessInstance processInstance);
+
/**
* find last scheduler process instance in the date interval
*
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 688556c2da..fca93da29d 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -30,6 +30,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Repository
@@ -53,6 +56,12 @@ public class ProcessInstanceDaoImpl extends
BaseDao<ProcessInstance, ProcessInst
}
}
+ @Override
+ @Transactional(propagation = Propagation.REQUIRES_NEW, isolation =
Isolation.READ_COMMITTED, rollbackFor = Exception.class)
+ public void performTransactionalUpsert(ProcessInstance processInstance) {
+ this.upsertProcessInstance(processInstance);
+ }
+
/**
* find last scheduler process instance in the date interval
*
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9006812431..012cc8149a 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -1842,7 +1842,7 @@ public class WorkflowExecuteRunnable implements
IWorkflowExecuteRunnable {
workflowInstance.setEndTime(new Date());
}
try {
- processInstanceDao.updateById(workflowInstance);
+
processInstanceDao.performTransactionalUpsert(workflowInstance);
} catch (Exception ex) {
// recover the status
workflowInstance.setStateWithDesc(originStates, "recover state
by DB error");
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 44945d03fb..706e183cf0 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -325,7 +325,7 @@ public class ProcessServiceImpl implements ProcessService {
protected void saveSerialProcess(ProcessInstance processInstance,
ProcessDefinition processDefinition) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT,
"wait by serial_wait strategy");
- processInstanceDao.upsertProcessInstance(processInstance);
+ processInstanceDao.performTransactionalUpsert(processInstance);
// serial wait
// when we get the running instance(or waiting instance) only get the
priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
@@ -338,7 +338,7 @@ public class ProcessServiceImpl implements ProcessService {
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_wait strategy");
- processInstanceDao.upsertProcessInstance(processInstance);
+ processInstanceDao.performTransactionalUpsert(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard())
{
List<ProcessInstance> runningProcessInstances =
@@ -349,12 +349,12 @@ public class ProcessServiceImpl implements ProcessService
{
processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP,
"stop by serial_discard strategy");
- processInstanceDao.upsertProcessInstance(processInstance);
+ processInstanceDao.performTransactionalUpsert(processInstance);
return;
}
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit from serial_discard strategy");
- processInstanceDao.upsertProcessInstance(processInstance);
+ processInstanceDao.performTransactionalUpsert(processInstance);
} else if
(processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances =
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
@@ -384,7 +384,7 @@ public class ProcessServiceImpl implements ProcessService {
}
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
"submit by serial_priority strategy");
- processInstanceDao.upsertProcessInstance(processInstance);
+ processInstanceDao.performTransactionalUpsert(processInstance);
}
}