This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 06d365dbe4 [Fix-17637] [API] Workflow lineage deletion optimization
(#17641)
06d365dbe4 is described below
commit 06d365dbe4b045a349ef78a2a886c5256757aff4
Author: [email protected] <[email protected]>
AuthorDate: Tue Nov 11 15:03:26 2025 +0800
[Fix-17637] [API] Workflow lineage deletion optimization (#17641)
---
.../impl/WorkflowDefinitionServiceImpl.java | 18 ++-
.../service/impl/WorkflowLineageServiceImpl.java | 16 +++
.../api/service/WorkflowDefinitionServiceTest.java | 9 ++
.../service/WorkflowTaskLineageServiceTest.java | 128 +++++++++++++++++++++
4 files changed, 165 insertions(+), 6 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
index 7fe3d00a51..998fb9de92 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
@@ -1123,6 +1123,18 @@ public class WorkflowDefinitionServiceImpl extends
BaseServiceImpl implements Wo
// If delete error, we can call this interface again.
workflowDefinitionDao.deleteByWorkflowDefinitionCode(workflowDefinition.getCode());
metricsCleanUpService.cleanUpWorkflowMetricsByDefinitionCode(code);
+
+ // delete workflow lineage (lineage data only keeps one record per
workflow code)
+ // It's safe to return 0 if no lineage exists (idempotent)
+ int deleteWorkflowLineageResult = workflowLineageService
+
.deleteWorkflowLineage(Collections.singletonList(workflowDefinition.getCode()));
+ if (deleteWorkflowLineageResult <= 0) {
+ if (deleteWorkflowLineageResult < 0) {
+ throw new
ServiceException(Status.DELETE_WORKFLOW_LINEAGE_ERROR);
+ } else {
+ log.warn("No workflow lineage to delete,
workflowDefinitionCode: {}", code);
+ }
+ }
log.info("Success delete workflow definition workflowDefinitionCode:
{}", code);
}
@@ -2427,12 +2439,6 @@ public class WorkflowDefinitionServiceImpl extends
BaseServiceImpl implements Wo
}
log.info("Delete version: {} of workflow: {}, projectCode: {}",
version, code, projectCode);
- // delete workflow lineage
- int deleteWorkflowLineageResult =
workflowLineageService.deleteWorkflowLineage(Collections.singletonList(code));
- if (deleteWorkflowLineageResult <= 0) {
- log.error("Delete workflow lineage by workflow definition code
error, workflowDefinitionCode: {}", code);
- throw new ServiceException(Status.DELETE_WORKFLOW_LINEAGE_ERROR);
- }
}
private void updateWorkflowValid(User user, WorkflowDefinition
oldWorkflowDefinition,
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
index 4f4f6b4c05..9e56e72ae9 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
@@ -189,10 +189,26 @@ public class WorkflowLineageServiceImpl extends
BaseServiceImpl implements Workf
if (workflowTaskLineage.getTaskDefinitionCode() != 0) {
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(workflowTaskLineage.getTaskDefinitionCode());
+ // Handle dirty data scenario caused by historical bugs:
+ // There may be orphaned lineage records referencing deleted
task definitions.
+ // Skip these records to prevent NPE and ensure the method
continues processing.
+ // Note: These orphaned records should be cleaned up by a
background cleanup task,
+ // not here to avoid side effects in a read-only query method.
+ if (taskDefinition == null) {
+ log.warn(
+ "Orphaned lineage record detected:
taskDefinitionCode {} not found, workflowTaskLineageId: {}. "
+ + "This dirty data should be cleaned up by
a background task.",
+ workflowTaskLineage.getTaskDefinitionCode(),
workflowTaskLineage.getId());
+ continue;
+ }
taskName = taskDefinition.getName();
}
taskDepStrList.add(String.format(Constants.FORMAT_S_S_COLON,
workflowDefinition.getName(), taskName));
}
+ // If no valid task dependencies found, return empty Optional to
indicate no dependencies.
+ if (taskDepStrList.isEmpty()) {
+ return Optional.empty();
+ }
String taskDepStr = String.join(Constants.COMMA, taskDepStrList);
if (taskCode != 0) {
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
index ecee150ee5..fc788c7684 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
@@ -618,6 +618,7 @@ public class WorkflowDefinitionServiceTest extends
BaseServiceTestTool {
when(scheduleMapper.deleteById(46)).thenReturn(1);
when(workflowLineageService.taskDependentMsg(project.getCode(),
workflowDefinition.getCode(), 0))
.thenReturn(Optional.empty());
+
when(workflowLineageService.deleteWorkflowLineage(anyList())).thenReturn(1);
processDefinitionService.deleteWorkflowDefinitionByCode(user, 46L);
Mockito.verify(metricsCleanUpService,
times(1)).cleanUpWorkflowMetricsByDefinitionCode(46L);
@@ -643,8 +644,16 @@ public class WorkflowDefinitionServiceTest extends
BaseServiceTestTool {
when(scheduleMapper.deleteById(schedule.getId())).thenReturn(1);
when(workflowLineageService.taskDependentMsg(project.getCode(),
workflowDefinition.getCode(), 0))
.thenReturn(Optional.empty());
+
when(workflowLineageService.deleteWorkflowLineage(anyList())).thenReturn(1);
Assertions.assertDoesNotThrow(() ->
processDefinitionService.deleteWorkflowDefinitionByCode(user, 46L));
Mockito.verify(metricsCleanUpService,
times(2)).cleanUpWorkflowMetricsByDefinitionCode(46L);
+
+ // delete success with no lineage (deleteWorkflowLineageResult == 0)
+ // This tests the new logic that handles idempotent deletion gracefully
+
when(workflowLineageService.deleteWorkflowLineage(anyList())).thenReturn(0);
+ Assertions.assertDoesNotThrow(() ->
processDefinitionService.deleteWorkflowDefinitionByCode(user, 46L));
+ Mockito.verify(metricsCleanUpService,
times(3)).cleanUpWorkflowMetricsByDefinitionCode(46L);
+ Mockito.verify(workflowLineageService,
times(3)).deleteWorkflowLineage(anyList());
}
@Test
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
index 26eb126929..806225e989 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
@@ -21,18 +21,23 @@ import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.service.impl.WorkflowLineageServiceImpl;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
+import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowTaskLineage;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.repository.WorkflowTaskLineageDao;
import org.apache.commons.collections4.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -57,6 +62,12 @@ public class WorkflowTaskLineageServiceTest {
@Mock
private TaskDefinitionLogMapper taskDefinitionLogMapper;
+ @Mock
+ private TaskDefinitionMapper taskDefinitionMapper;
+
+ @Mock
+ private WorkflowDefinitionMapper workflowDefinitionMapper;
+
/**
* get mock Project
*
@@ -128,4 +139,121 @@ public class WorkflowTaskLineageServiceTest {
return workFlowLineages;
}
+ @Test
+ public void testTaskDependentMsgWithOrphanedLineageRecord() {
+ // Test case: Handle dirty data scenario where taskDefinition is null
+ long projectCode = 1L;
+ long workflowDefinitionCode = 100L;
+ long taskCode = 200L;
+
+ // Create orphaned lineage record (taskDefinitionCode exists but
taskDefinition is null)
+ List<WorkflowTaskLineage> dependentWorkflowList = new ArrayList<>();
+ WorkflowTaskLineage orphanedLineage = new WorkflowTaskLineage();
+ orphanedLineage.setId(1);
+ orphanedLineage.setDeptWorkflowDefinitionCode(50L);
+ orphanedLineage.setTaskDefinitionCode(999L); // This task definition
doesn't exist
+ dependentWorkflowList.add(orphanedLineage);
+
+ WorkflowDefinition workflowDefinition = new WorkflowDefinition();
+ workflowDefinition.setCode(50L);
+ workflowDefinition.setName("TestWorkflow");
+
+ when(workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode,
workflowDefinitionCode, taskCode))
+ .thenReturn(dependentWorkflowList);
+
when(workflowDefinitionMapper.queryByCode(50L)).thenReturn(workflowDefinition);
+ when(taskDefinitionMapper.queryByCode(999L)).thenReturn(null); // Task
definition not found (dirty data)
+
+ // Should return Optional.empty() because all records are orphaned
+ Optional<String> result =
+ workflowLineageService.taskDependentMsg(projectCode,
workflowDefinitionCode, taskCode);
+ Assertions.assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testTaskDependentMsgWithMixedValidAndOrphanedRecords() {
+ // Test case: Some records are valid, some are orphaned
+ long projectCode = 1L;
+ long workflowDefinitionCode = 100L;
+ long taskCode = 200L;
+
+ List<WorkflowTaskLineage> dependentWorkflowList = new ArrayList<>();
+
+ // Valid lineage record
+ WorkflowTaskLineage validLineage = new WorkflowTaskLineage();
+ validLineage.setId(1);
+ validLineage.setDeptWorkflowDefinitionCode(50L);
+ validLineage.setTaskDefinitionCode(300L);
+ dependentWorkflowList.add(validLineage);
+
+ // Orphaned lineage record (dirty data)
+ WorkflowTaskLineage orphanedLineage = new WorkflowTaskLineage();
+ orphanedLineage.setId(2);
+ orphanedLineage.setDeptWorkflowDefinitionCode(60L);
+ orphanedLineage.setTaskDefinitionCode(999L); // This task definition
doesn't exist
+ dependentWorkflowList.add(orphanedLineage);
+
+ WorkflowDefinition workflowDefinition1 = new WorkflowDefinition();
+ workflowDefinition1.setCode(50L);
+ workflowDefinition1.setName("ValidWorkflow");
+
+ WorkflowDefinition workflowDefinition2 = new WorkflowDefinition();
+ workflowDefinition2.setCode(60L);
+ workflowDefinition2.setName("OrphanedWorkflow");
+
+ TaskDefinition validTaskDefinition = new TaskDefinition();
+ validTaskDefinition.setCode(300L);
+ validTaskDefinition.setName("ValidTask");
+
+ when(workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode,
workflowDefinitionCode, taskCode))
+ .thenReturn(dependentWorkflowList);
+
when(workflowDefinitionMapper.queryByCode(50L)).thenReturn(workflowDefinition1);
+
when(workflowDefinitionMapper.queryByCode(60L)).thenReturn(workflowDefinition2);
+
when(taskDefinitionMapper.queryByCode(300L)).thenReturn(validTaskDefinition);
+ when(taskDefinitionMapper.queryByCode(999L)).thenReturn(null); //
Orphaned record
+
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setCode(taskCode);
+ taskDefinition.setName("TestTask");
+
when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
+
+ // Should return a message with only the valid record, skipping the
orphaned one
+ Optional<String> result =
+ workflowLineageService.taskDependentMsg(projectCode,
workflowDefinitionCode, taskCode);
+ Assertions.assertTrue(result.isPresent());
+ String message = result.get();
+ Assertions.assertTrue(message.contains("ValidWorkflow"));
+ Assertions.assertTrue(message.contains("ValidTask"));
+ // Orphaned record should be skipped, so it shouldn't appear in the
message
+ Assertions.assertFalse(message.contains("OrphanedWorkflow"));
+ }
+
+ @Test
+ public void
testTaskDependentMsgWithEmptyListAfterFilteringOrphanedRecords() {
+ // Test case: All records are orphaned, resulting in empty list
+ long projectCode = 1L;
+ long workflowDefinitionCode = 100L;
+ long taskCode = 0L; // No specific task code
+
+ List<WorkflowTaskLineage> dependentWorkflowList = new ArrayList<>();
+ WorkflowTaskLineage orphanedLineage = new WorkflowTaskLineage();
+ orphanedLineage.setId(1);
+ orphanedLineage.setDeptWorkflowDefinitionCode(50L);
+ orphanedLineage.setTaskDefinitionCode(999L); // This task definition
doesn't exist
+ dependentWorkflowList.add(orphanedLineage);
+
+ WorkflowDefinition workflowDefinition = new WorkflowDefinition();
+ workflowDefinition.setCode(50L);
+ workflowDefinition.setName("TestWorkflow");
+
+ when(workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode,
workflowDefinitionCode, 0L))
+ .thenReturn(dependentWorkflowList);
+
when(workflowDefinitionMapper.queryByCode(50L)).thenReturn(workflowDefinition);
+ when(taskDefinitionMapper.queryByCode(999L)).thenReturn(null); // Task
definition not found
+
+ // Should return Optional.empty() because all records are orphaned
+ Optional<String> result =
+ workflowLineageService.taskDependentMsg(projectCode,
workflowDefinitionCode, taskCode);
+ Assertions.assertFalse(result.isPresent());
+ }
+
}