This is an automated email from the ASF dual-hosted git repository.

changhaifu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new a405abec9e [Bug][Master]serial_wait strategy workflow unable to wake 
up (#15270)
a405abec9e is described below

commit a405abec9e7486f1bcefa7aa56a790caa37fc7dc
Author: Gallardot <[email protected]>
AuthorDate: Fri Jan 12 17:53:24 2024 +0800

    [Bug][Master]serial_wait strategy workflow unable to wake up (#15270)
    
    * fix: serial_wait strategy workflow unable to wake up
    
    Signed-off-by: Gallardot <[email protected]>
    
    * fix: serial_wait strategy workflow unable to wake up
    
    Signed-off-by: Gallardot <[email protected]>
    
    ---------
    
    Signed-off-by: Gallardot <[email protected]>
    Co-authored-by: fuchanghai <[email protected]>
---
 .../dolphinscheduler/dao/repository/ProcessInstanceDao.java    |  7 +++++++
 .../dao/repository/impl/ProcessInstanceDaoImpl.java            |  9 +++++++++
 .../server/master/runner/WorkflowExecuteRunnable.java          |  2 +-
 .../dolphinscheduler/service/process/ProcessServiceImpl.java   | 10 +++++-----
 4 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
index 91a29eacba..6aa48ea12d 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/ProcessInstanceDao.java
@@ -31,6 +31,13 @@ public interface ProcessInstanceDao extends 
IDao<ProcessInstance> {
      */
     void upsertProcessInstance(ProcessInstance processInstance);
 
+    /**
+     * performs an "upsert" operation (update or insert) on a ProcessInstance 
object within a new transaction
+     *
+     * @param processInstance processInstance
+     */
+    void performTransactionalUpsert(ProcessInstance processInstance);
+
     /**
      * find last scheduler process instance in the date interval
      *
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
index 688556c2da..fca93da29d 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/ProcessInstanceDaoImpl.java
@@ -30,6 +30,9 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
 
 @Slf4j
 @Repository
@@ -53,6 +56,12 @@ public class ProcessInstanceDaoImpl extends 
BaseDao<ProcessInstance, ProcessInst
         }
     }
 
+    @Override
+    @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = 
Isolation.READ_COMMITTED, rollbackFor = Exception.class)
+    public void performTransactionalUpsert(ProcessInstance processInstance) {
+        this.upsertProcessInstance(processInstance);
+    }
+
     /**
      * find last scheduler process instance in the date interval
      *
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
index 9006812431..012cc8149a 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
@@ -1842,7 +1842,7 @@ public class WorkflowExecuteRunnable implements 
IWorkflowExecuteRunnable {
                 workflowInstance.setEndTime(new Date());
             }
             try {
-                processInstanceDao.updateById(workflowInstance);
+                
processInstanceDao.performTransactionalUpsert(workflowInstance);
             } catch (Exception ex) {
                 // recover the status
                 workflowInstance.setStateWithDesc(originStates, "recover state 
by DB error");
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
index 44945d03fb..706e183cf0 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
@@ -325,7 +325,7 @@ public class ProcessServiceImpl implements ProcessService {
 
     protected void saveSerialProcess(ProcessInstance processInstance, 
ProcessDefinition processDefinition) {
         processInstance.setStateWithDesc(WorkflowExecutionStatus.SERIAL_WAIT, 
"wait by serial_wait strategy");
-        processInstanceDao.upsertProcessInstance(processInstance);
+        processInstanceDao.performTransactionalUpsert(processInstance);
         // serial wait
         // when we get the running instance(or waiting instance) only get the 
priority instance(by id)
         if (processDefinition.getExecutionType().typeIsSerialWait()) {
@@ -338,7 +338,7 @@ public class ProcessServiceImpl implements ProcessService {
             if (CollectionUtils.isEmpty(runningProcessInstances)) {
                 
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
                         "submit from serial_wait strategy");
-                processInstanceDao.upsertProcessInstance(processInstance);
+                processInstanceDao.performTransactionalUpsert(processInstance);
             }
         } else if (processDefinition.getExecutionType().typeIsSerialDiscard()) 
{
             List<ProcessInstance> runningProcessInstances =
@@ -349,12 +349,12 @@ public class ProcessServiceImpl implements ProcessService 
{
                             processInstance.getId());
             if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
                 processInstance.setStateWithDesc(WorkflowExecutionStatus.STOP, 
"stop by serial_discard strategy");
-                processInstanceDao.upsertProcessInstance(processInstance);
+                processInstanceDao.performTransactionalUpsert(processInstance);
                 return;
             }
             
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
                     "submit from serial_discard strategy");
-            processInstanceDao.upsertProcessInstance(processInstance);
+            processInstanceDao.performTransactionalUpsert(processInstance);
         } else if 
(processDefinition.getExecutionType().typeIsSerialPriority()) {
             List<ProcessInstance> runningProcessInstances =
                     
this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(
@@ -384,7 +384,7 @@ public class ProcessServiceImpl implements ProcessService {
             }
             
processInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION,
                     "submit by serial_priority strategy");
-            processInstanceDao.upsertProcessInstance(processInstance);
+            processInstanceDao.performTransactionalUpsert(processInstance);
         }
     }
 

Reply via email to