This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d39a702415 [Fix-17638][API] Optimize workflow lineage update logic 
(#17678)
d39a702415 is described below

commit d39a702415a95a9d69fd464ab894ec0a164cd8d6
Author: luxiaolong <[email protected]>
AuthorDate: Fri Nov 21 15:45:10 2025 +0800

    [Fix-17638][API] Optimize workflow lineage update logic (#17678)
---
 .../api/service/WorkflowLineageService.java        | 10 ++-
 .../impl/WorkflowDefinitionServiceImpl.java        | 19 +----
 .../service/impl/WorkflowLineageServiceImpl.java   | 23 +++++-
 .../api/service/WorkflowDefinitionServiceTest.java | 90 ++++++++++++++++++++++
 .../service/WorkflowTaskLineageServiceTest.java    | 80 +++++++++++++++++++
 5 files changed, 203 insertions(+), 19 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
index 2857a38ca1..f20503b140 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
@@ -57,7 +57,15 @@ public interface WorkflowLineageService {
 
     int createWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages);
 
-    int updateWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages);
+    /**
+     * Replace the lineage of given workflow definition by new lineage list.
+     * When the list is empty, existing lineage data will be deleted.
+     *
+     * @param workflowDefinitionCode workflow definition to update
+     * @param workflowTaskLineages   new lineage list, can be empty
+     * @return affected rows
+     */
+    int updateWorkflowLineage(long workflowDefinitionCode, 
List<WorkflowTaskLineage> workflowTaskLineages);
 
     int deleteWorkflowLineage(List<Long> workflowDefinitionCodes);
 }
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
index 998fb9de92..f0c0605620 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
@@ -409,23 +409,10 @@ public class WorkflowDefinitionServiceImpl extends 
BaseServiceImpl implements Wo
                                     long workflowDefinitionCode,
                                     int workflowDefinitionVersion,
                                     List<TaskDefinitionLog> 
taskDefinitionLogList) {
-        List<WorkflowTaskLineage> workflowTaskLineageList =
-                generateWorkflowLineageList(taskDefinitionLogList, 
workflowDefinitionCode, workflowDefinitionVersion);
-        if (workflowTaskLineageList.isEmpty()) {
-            return;
-        }
+        List<WorkflowTaskLineage> workflowTaskLineageList = 
generateWorkflowLineageList(taskDefinitionLogList,
+                workflowDefinitionCode, workflowDefinitionVersion);
 
-        int insertWorkflowLineageResult = 
workflowLineageService.updateWorkflowLineage(workflowTaskLineageList);
-        if (insertWorkflowLineageResult <= 0) {
-            log.error(
-                    "Save workflow lineage error, projectCode: {}, 
workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
-                    projectCode, workflowDefinitionCode, 
workflowDefinitionVersion);
-            throw new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR);
-        } else {
-            log.info(
-                    "Save workflow lineage complete, projectCode: {}, 
workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
-                    projectCode, workflowDefinitionCode, 
workflowDefinitionVersion);
-        }
+        workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, 
workflowTaskLineageList);
     }
 
     private List<WorkflowTaskLineage> 
generateWorkflowLineageList(List<TaskDefinitionLog> taskDefinitionLogList,
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
index 9e56e72ae9..bfd8f91dd5 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.dolphinscheduler.dao.repository.WorkflowTaskLineageDao;
 
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -318,8 +319,26 @@ public class WorkflowLineageServiceImpl extends 
BaseServiceImpl implements Workf
     }
 
     @Override
-    public int updateWorkflowLineage(List<WorkflowTaskLineage> 
workflowTaskLineages) {
-        return 
workflowTaskLineageDao.updateWorkflowTaskLineage(workflowTaskLineages);
+    public int updateWorkflowLineage(long workflowDefinitionCode, 
List<WorkflowTaskLineage> workflowTaskLineages) {
+        // Remove existing lineage first to keep data consistent
+        workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(
+                Collections.singletonList(workflowDefinitionCode));
+
+        if (CollectionUtils.isEmpty(workflowTaskLineages)) {
+            log.info("Current lineage is empty, workflowDefinitionCode: {}",
+                    workflowDefinitionCode);
+            return 0;
+        }
+
+        int insertResult = 
workflowTaskLineageDao.batchInsert(workflowTaskLineages);
+        if (insertResult <= 0) {
+            log.error("Save workflow lineage error, workflowDefinitionCode: 
{}", workflowDefinitionCode);
+            throw new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR);
+        }
+
+        log.info("Save workflow lineage complete, workflowDefinitionCode: {}, 
inserted rows: {}",
+                workflowDefinitionCode, insertResult);
+        return insertResult;
     }
 
     @Override
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
index fc788c7684..d11f1ef9b5 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
@@ -35,6 +35,7 @@ import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
@@ -1390,4 +1391,93 @@ public class WorkflowDefinitionServiceTest extends 
BaseServiceTestTool {
                 content);
         return multipartFile;
     }
+
+    @Test
+    public void testSaveWorkflowLineageWithEmptyList() {
+        // Test case: Empty lineage list should delete historical lineage
+        long projectCode = 1L;
+        long workflowDefinitionCode = 100L;
+        int workflowDefinitionVersion = 1;
+        List<TaskDefinitionLog> emptyTaskDefinitionLogList = new ArrayList<>();
+
+        // Mock updateWorkflowLineage to return 0 for empty list
+        
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode), 
anyList()))
+                .thenReturn(0);
+
+        // Execute - should not throw exception
+        Assertions.assertDoesNotThrow(() -> {
+            processDefinitionService.saveWorkflowLineage(projectCode, 
workflowDefinitionCode,
+                    workflowDefinitionVersion, emptyTaskDefinitionLogList);
+        });
+
+        // Verify that updateWorkflowLineage was called with empty list
+        
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode),
 anyList());
+    }
+
+    @Test
+    public void testSaveWorkflowLineageWithNonEmptyList() {
+        // Test case: Normal save with non-empty lineage list
+        long projectCode = 1L;
+        long workflowDefinitionCode = 100L;
+        int workflowDefinitionVersion = 1;
+
+        // Create task definition logs with dependent tasks
+        List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+        TaskDefinitionLog taskLog = new TaskDefinitionLog();
+        taskLog.setCode(200L);
+        taskLog.setVersion(1);
+        taskLog.setProjectCode(projectCode);
+        taskLog.setTaskType("DEPENDENT");
+        // Set taskParams with dependent parameters
+        String taskParams =
+                
"{\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"definitionCode\":50,\"depTaskCode\":300}]}]}}";
+        taskLog.setTaskParams(taskParams);
+        taskDefinitionLogList.add(taskLog);
+
+        // Mock updateWorkflowLineage to return success
+        
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode), 
anyList()))
+                .thenReturn(1);
+
+        // Execute - should not throw exception
+        Assertions.assertDoesNotThrow(() -> {
+            processDefinitionService.saveWorkflowLineage(projectCode, 
workflowDefinitionCode,
+                    workflowDefinitionVersion, taskDefinitionLogList);
+        });
+
+        // Verify that updateWorkflowLineage was called
+        
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode),
 anyList());
+    }
+
+    @Test
+    public void testSaveWorkflowLineageWithInsertFailure() {
+        // Test case: Should throw exception when insert fails
+        long projectCode = 1L;
+        long workflowDefinitionCode = 100L;
+        int workflowDefinitionVersion = 1;
+
+        // Create task definition logs
+        List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+        TaskDefinitionLog taskLog = new TaskDefinitionLog();
+        taskLog.setCode(200L);
+        taskLog.setVersion(1);
+        taskLog.setProjectCode(projectCode);
+        taskLog.setTaskType("DEPENDENT");
+        String taskParams =
+                
"{\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"definitionCode\":50,\"depTaskCode\":300}]}]}}";
+        taskLog.setTaskParams(taskParams);
+        taskDefinitionLogList.add(taskLog);
+
+        // Mock updateWorkflowLineage to throw exception (insert failure)
+        
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode), 
anyList()))
+                .thenThrow(new 
ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR));
+
+        // Execute and verify exception
+        ServiceException exception = 
Assertions.assertThrows(ServiceException.class, () -> {
+            processDefinitionService.saveWorkflowLineage(projectCode, 
workflowDefinitionCode,
+                    workflowDefinitionVersion, taskDefinitionLogList);
+        });
+
+        
Assertions.assertEquals(Status.CREATE_WORKFLOW_LINEAGE_ERROR.getCode(), 
exception.getCode());
+        
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode),
 anyList());
+    }
 }
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
index 806225e989..c8be7bba69 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
@@ -17,8 +17,13 @@
 
 package org.apache.dolphinscheduler.api.service;
 
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
 import org.apache.dolphinscheduler.api.service.impl.WorkflowLineageServiceImpl;
 import org.apache.dolphinscheduler.dao.entity.Project;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -256,4 +261,79 @@ public class WorkflowTaskLineageServiceTest {
         Assertions.assertFalse(result.isPresent());
     }
 
+    @Test
+    public void testUpdateWorkflowLineageWithNonEmptyList() {
+        // Test case: Normal update with non-empty lineage list
+        long workflowDefinitionCode = 100L;
+        List<WorkflowTaskLineage> workflowTaskLineages = new ArrayList<>();
+
+        WorkflowTaskLineage lineage1 = new WorkflowTaskLineage();
+        lineage1.setWorkflowDefinitionCode(workflowDefinitionCode);
+        lineage1.setTaskDefinitionCode(200L);
+        workflowTaskLineages.add(lineage1);
+
+        WorkflowTaskLineage lineage2 = new WorkflowTaskLineage();
+        lineage2.setWorkflowDefinitionCode(workflowDefinitionCode);
+        lineage2.setTaskDefinitionCode(300L);
+        workflowTaskLineages.add(lineage2);
+
+        // Mock DAO methods
+        
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(2);
+        
when(workflowTaskLineageDao.batchInsert(workflowTaskLineages)).thenReturn(2);
+
+        // Execute
+        int result = 
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, 
workflowTaskLineages);
+
+        // Verify
+        Assertions.assertEquals(2, result);
+        verify(workflowTaskLineageDao)
+                
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
+        verify(workflowTaskLineageDao).batchInsert(workflowTaskLineages);
+    }
+
+    @Test
+    public void testUpdateWorkflowLineageWithEmptyList() {
+        // Test case: Empty list should delete historical lineage and return 0
+        long workflowDefinitionCode = 100L;
+        List<WorkflowTaskLineage> emptyList = new ArrayList<>();
+
+        // Mock DAO method
+        
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(1);
+
+        // Execute
+        int result = 
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, emptyList);
+
+        // Verify
+        Assertions.assertEquals(0, result);
+        verify(workflowTaskLineageDao)
+                
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
+        // batchInsert should not be called when list is empty
+    }
+
+    @Test
+    public void testUpdateWorkflowLineageWithInsertFailure() {
+        // Test case: Should throw exception when insert fails
+        long workflowDefinitionCode = 100L;
+        List<WorkflowTaskLineage> workflowTaskLineages = new ArrayList<>();
+
+        WorkflowTaskLineage lineage1 = new WorkflowTaskLineage();
+        lineage1.setWorkflowDefinitionCode(workflowDefinitionCode);
+        lineage1.setTaskDefinitionCode(200L);
+        workflowTaskLineages.add(lineage1);
+
+        // Mock DAO methods
+        
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(1);
+        
when(workflowTaskLineageDao.batchInsert(workflowTaskLineages)).thenReturn(0); 
// Insert failure
+
+        // Execute and verify exception
+        ServiceException exception = 
Assertions.assertThrows(ServiceException.class, () -> {
+            
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, 
workflowTaskLineages);
+        });
+
+        
Assertions.assertEquals(Status.CREATE_WORKFLOW_LINEAGE_ERROR.getCode(), 
exception.getCode());
+        verify(workflowTaskLineageDao)
+                
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
+        verify(workflowTaskLineageDao).batchInsert(workflowTaskLineages);
+    }
+
 }

Reply via email to