This is an automated email from the ASF dual-hosted git repository.

zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 06d365dbe4 [Fix-17637] [API] Workflow lineage deletion optimization 
(#17641)
06d365dbe4 is described below

commit 06d365dbe4b045a349ef78a2a886c5256757aff4
Author: [email protected] <[email protected]>
AuthorDate: Tue Nov 11 15:03:26 2025 +0800

    [Fix-17637] [API] Workflow lineage deletion optimization (#17641)
---
 .../impl/WorkflowDefinitionServiceImpl.java        |  18 ++-
 .../service/impl/WorkflowLineageServiceImpl.java   |  16 +++
 .../api/service/WorkflowDefinitionServiceTest.java |   9 ++
 .../service/WorkflowTaskLineageServiceTest.java    | 128 +++++++++++++++++++++
 4 files changed, 165 insertions(+), 6 deletions(-)

diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
index 7fe3d00a51..998fb9de92 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
@@ -1123,6 +1123,18 @@ public class WorkflowDefinitionServiceImpl extends 
BaseServiceImpl implements Wo
         // If delete error, we can call this interface again.
         
workflowDefinitionDao.deleteByWorkflowDefinitionCode(workflowDefinition.getCode());
         metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(code);
+
+        // delete workflow lineage (lineage data only keeps one record per 
workflow code)
+        // It's safe to return 0 if no lineage exists (idempotent)
+        int deleteWorkflowLineageResult = workflowLineageService
+                
.deleteWorkflowLineage(Collections.singletonList(workflowDefinition.getCode()));
+        if (deleteWorkflowLineageResult <= 0) {
+            if (deleteWorkflowLineageResult < 0) {
+                throw new 
ServiceException(Status.DELETE_WORKFLOW_LINEAGE_ERROR);
+            } else {
+                log.warn("No workflow lineage to delete, 
workflowDefinitionCode: {}", code);
+            }
+        }
         log.info("Success delete workflow definition workflowDefinitionCode: 
{}", code);
     }
 
@@ -2427,12 +2439,6 @@ public class WorkflowDefinitionServiceImpl extends 
BaseServiceImpl implements Wo
         }
         log.info("Delete version: {} of workflow: {}, projectCode: {}", 
version, code, projectCode);
 
-        // delete workflow lineage
-        int deleteWorkflowLineageResult = 
workflowLineageService.deleteWorkflowLineage(Collections.singletonList(code));
-        if (deleteWorkflowLineageResult <= 0) {
-            log.error("Delete workflow lineage by workflow definition code 
error, workflowDefinitionCode: {}", code);
-            throw new ServiceException(Status.DELETE_WORKFLOW_LINEAGE_ERROR);
-        }
     }
 
     private void updateWorkflowValid(User user, WorkflowDefinition 
oldWorkflowDefinition,
diff --git 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
index 4f4f6b4c05..9e56e72ae9 100644
--- 
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
+++ 
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
@@ -189,10 +189,26 @@ public class WorkflowLineageServiceImpl extends 
BaseServiceImpl implements Workf
             if (workflowTaskLineage.getTaskDefinitionCode() != 0) {
                 TaskDefinition taskDefinition =
                         
taskDefinitionMapper.queryByCode(workflowTaskLineage.getTaskDefinitionCode());
+                // Handle dirty data scenario caused by historical bugs:
+                // There may be orphaned lineage records referencing deleted 
task definitions.
+                // Skip these records to prevent NPE and ensure the method 
continues processing.
+                // Note: These orphaned records should be cleaned up by a 
background cleanup task,
+                // not here to avoid side effects in a read-only query method.
+                if (taskDefinition == null) {
+                    log.warn(
+                            "Orphaned lineage record detected: 
taskDefinitionCode {} not found, workflowTaskLineageId: {}. "
+                                    + "This dirty data should be cleaned up by 
a background task.",
+                            workflowTaskLineage.getTaskDefinitionCode(), 
workflowTaskLineage.getId());
+                    continue;
+                }
                 taskName = taskDefinition.getName();
             }
             taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON, 
workflowDefinition.getName(), taskName));
         }
+        // If no valid task dependencies found, return empty Optional to 
indicate no dependencies.
+        if (taskDepStrList.isEmpty()) {
+            return Optional.empty();
+        }
 
         String taskDepStr = String.join(Constants.COMMA, taskDepStrList);
         if (taskCode != 0) {
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
index ecee150ee5..fc788c7684 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
@@ -618,6 +618,7 @@ public class WorkflowDefinitionServiceTest extends 
BaseServiceTestTool {
         when(scheduleMapper.deleteById(46)).thenReturn(1);
         when(workflowLineageService.taskDependentMsg(project.getCode(), 
workflowDefinition.getCode(), 0))
                 .thenReturn(Optional.empty());
+        
when(workflowLineageService.deleteWorkflowLineage(anyList())).thenReturn(1);
         processDefinitionService.deleteWorkflowDefinitionByCode(user, 46L);
         Mockito.verify(metricsCleanUpService, 
times(1)).cleanUpWorkflowMetricsByDefinitionCode(46L);
 
@@ -643,8 +644,16 @@ public class WorkflowDefinitionServiceTest extends 
BaseServiceTestTool {
         when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
         when(workflowLineageService.taskDependentMsg(project.getCode(), 
workflowDefinition.getCode(), 0))
                 .thenReturn(Optional.empty());
+        
when(workflowLineageService.deleteWorkflowLineage(anyList())).thenReturn(1);
         Assertions.assertDoesNotThrow(() -> 
processDefinitionService.deleteWorkflowDefinitionByCode(user, 46L));
         Mockito.verify(metricsCleanUpService, 
times(2)).cleanUpWorkflowMetricsByDefinitionCode(46L);
+
+        // delete success with no lineage (deleteWorkflowLineageResult == 0)
+        // This tests the new logic that handles idempotent deletion gracefully
+        
when(workflowLineageService.deleteWorkflowLineage(anyList())).thenReturn(0);
+        Assertions.assertDoesNotThrow(() -> 
processDefinitionService.deleteWorkflowDefinitionByCode(user, 46L));
+        Mockito.verify(metricsCleanUpService, 
times(3)).cleanUpWorkflowMetricsByDefinitionCode(46L);
+        Mockito.verify(workflowLineageService, 
times(3)).deleteWorkflowLineage(anyList());
     }
 
     @Test
diff --git 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
index 26eb126929..806225e989 100644
--- 
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
+++ 
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
@@ -21,18 +21,23 @@ import static org.mockito.Mockito.when;
 
 import org.apache.dolphinscheduler.api.service.impl.WorkflowLineageServiceImpl;
 import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
 import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
 import org.apache.dolphinscheduler.dao.entity.WorkflowTaskLineage;
 import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
 import org.apache.dolphinscheduler.dao.repository.WorkflowTaskLineageDao;
 
 import org.apache.commons.collections4.CollectionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -57,6 +62,12 @@ public class WorkflowTaskLineageServiceTest {
     @Mock
     private TaskDefinitionLogMapper taskDefinitionLogMapper;
 
+    @Mock
+    private TaskDefinitionMapper taskDefinitionMapper;
+
+    @Mock
+    private WorkflowDefinitionMapper workflowDefinitionMapper;
+
     /**
      * get mock Project
      *
@@ -128,4 +139,121 @@ public class WorkflowTaskLineageServiceTest {
         return workFlowLineages;
     }
 
+    @Test
+    public void testTaskDependentMsgWithOrphanedLineageRecord() {
+        // Test case: Handle dirty data scenario where taskDefinition is null
+        long projectCode = 1L;
+        long workflowDefinitionCode = 100L;
+        long taskCode = 200L;
+
+        // Create orphaned lineage record (taskDefinitionCode exists but 
taskDefinition is null)
+        List<WorkflowTaskLineage> dependentWorkflowList = new ArrayList<>();
+        WorkflowTaskLineage orphanedLineage = new WorkflowTaskLineage();
+        orphanedLineage.setId(1);
+        orphanedLineage.setDeptWorkflowDefinitionCode(50L);
+        orphanedLineage.setTaskDefinitionCode(999L); // This task definition 
doesn't exist
+        dependentWorkflowList.add(orphanedLineage);
+
+        WorkflowDefinition workflowDefinition = new WorkflowDefinition();
+        workflowDefinition.setCode(50L);
+        workflowDefinition.setName("TestWorkflow");
+
+        when(workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode, 
workflowDefinitionCode, taskCode))
+                .thenReturn(dependentWorkflowList);
+        
when(workflowDefinitionMapper.queryByCode(50L)).thenReturn(workflowDefinition);
+        when(taskDefinitionMapper.queryByCode(999L)).thenReturn(null); // Task 
definition not found (dirty data)
+
+        // Should return Optional.empty() because all records are orphaned
+        Optional<String> result =
+                workflowLineageService.taskDependentMsg(projectCode, 
workflowDefinitionCode, taskCode);
+        Assertions.assertFalse(result.isPresent());
+    }
+
+    @Test
+    public void testTaskDependentMsgWithMixedValidAndOrphanedRecords() {
+        // Test case: Some records are valid, some are orphaned
+        long projectCode = 1L;
+        long workflowDefinitionCode = 100L;
+        long taskCode = 200L;
+
+        List<WorkflowTaskLineage> dependentWorkflowList = new ArrayList<>();
+
+        // Valid lineage record
+        WorkflowTaskLineage validLineage = new WorkflowTaskLineage();
+        validLineage.setId(1);
+        validLineage.setDeptWorkflowDefinitionCode(50L);
+        validLineage.setTaskDefinitionCode(300L);
+        dependentWorkflowList.add(validLineage);
+
+        // Orphaned lineage record (dirty data)
+        WorkflowTaskLineage orphanedLineage = new WorkflowTaskLineage();
+        orphanedLineage.setId(2);
+        orphanedLineage.setDeptWorkflowDefinitionCode(60L);
+        orphanedLineage.setTaskDefinitionCode(999L); // This task definition 
doesn't exist
+        dependentWorkflowList.add(orphanedLineage);
+
+        WorkflowDefinition workflowDefinition1 = new WorkflowDefinition();
+        workflowDefinition1.setCode(50L);
+        workflowDefinition1.setName("ValidWorkflow");
+
+        WorkflowDefinition workflowDefinition2 = new WorkflowDefinition();
+        workflowDefinition2.setCode(60L);
+        workflowDefinition2.setName("OrphanedWorkflow");
+
+        TaskDefinition validTaskDefinition = new TaskDefinition();
+        validTaskDefinition.setCode(300L);
+        validTaskDefinition.setName("ValidTask");
+
+        when(workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode, 
workflowDefinitionCode, taskCode))
+                .thenReturn(dependentWorkflowList);
+        
when(workflowDefinitionMapper.queryByCode(50L)).thenReturn(workflowDefinition1);
+        
when(workflowDefinitionMapper.queryByCode(60L)).thenReturn(workflowDefinition2);
+        
when(taskDefinitionMapper.queryByCode(300L)).thenReturn(validTaskDefinition);
+        when(taskDefinitionMapper.queryByCode(999L)).thenReturn(null); // 
Orphaned record
+
+        TaskDefinition taskDefinition = new TaskDefinition();
+        taskDefinition.setCode(taskCode);
+        taskDefinition.setName("TestTask");
+        
when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
+
+        // Should return a message with only the valid record, skipping the 
orphaned one
+        Optional<String> result =
+                workflowLineageService.taskDependentMsg(projectCode, 
workflowDefinitionCode, taskCode);
+        Assertions.assertTrue(result.isPresent());
+        String message = result.get();
+        Assertions.assertTrue(message.contains("ValidWorkflow"));
+        Assertions.assertTrue(message.contains("ValidTask"));
+        // Orphaned record should be skipped, so it shouldn't appear in the 
message
+        Assertions.assertFalse(message.contains("OrphanedWorkflow"));
+    }
+
+    @Test
+    public void 
testTaskDependentMsgWithEmptyListAfterFilteringOrphanedRecords() {
+        // Test case: All records are orphaned, resulting in empty list
+        long projectCode = 1L;
+        long workflowDefinitionCode = 100L;
+        long taskCode = 0L; // No specific task code
+
+        List<WorkflowTaskLineage> dependentWorkflowList = new ArrayList<>();
+        WorkflowTaskLineage orphanedLineage = new WorkflowTaskLineage();
+        orphanedLineage.setId(1);
+        orphanedLineage.setDeptWorkflowDefinitionCode(50L);
+        orphanedLineage.setTaskDefinitionCode(999L); // This task definition 
doesn't exist
+        dependentWorkflowList.add(orphanedLineage);
+
+        WorkflowDefinition workflowDefinition = new WorkflowDefinition();
+        workflowDefinition.setCode(50L);
+        workflowDefinition.setName("TestWorkflow");
+
+        when(workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode, 
workflowDefinitionCode, 0L))
+                .thenReturn(dependentWorkflowList);
+        
when(workflowDefinitionMapper.queryByCode(50L)).thenReturn(workflowDefinition);
+        when(taskDefinitionMapper.queryByCode(999L)).thenReturn(null); // Task 
definition not found
+
+        // Should return Optional.empty() because all records are orphaned
+        Optional<String> result =
+                workflowLineageService.taskDependentMsg(projectCode, 
workflowDefinitionCode, taskCode);
+        Assertions.assertFalse(result.isPresent());
+    }
+
 }

Reply via email to