This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d39a702415 [Fix-17638][API] Optimize workflow lineage update logic
(#17678)
d39a702415 is described below
commit d39a702415a95a9d69fd464ab894ec0a164cd8d6
Author: luxiaolong <[email protected]>
AuthorDate: Fri Nov 21 15:45:10 2025 +0800
[Fix-17638][API] Optimize workflow lineage update logic (#17678)
---
.../api/service/WorkflowLineageService.java | 10 ++-
.../impl/WorkflowDefinitionServiceImpl.java | 19 +----
.../service/impl/WorkflowLineageServiceImpl.java | 23 +++++-
.../api/service/WorkflowDefinitionServiceTest.java | 90 ++++++++++++++++++++++
.../service/WorkflowTaskLineageServiceTest.java | 80 +++++++++++++++++++
5 files changed, 203 insertions(+), 19 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
index 2857a38ca1..f20503b140 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkflowLineageService.java
@@ -57,7 +57,15 @@ public interface WorkflowLineageService {
int createWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages);
- int updateWorkflowLineage(List<WorkflowTaskLineage> workflowTaskLineages);
+ /**
+ * Replace the lineage of given workflow definition by new lineage list.
+ * When the list is empty, existing lineage data will be deleted.
+ *
+ * @param workflowDefinitionCode workflow definition to update
+ * @param workflowTaskLineages new lineage list, can be empty
+ * @return affected rows
+ */
+ int updateWorkflowLineage(long workflowDefinitionCode,
List<WorkflowTaskLineage> workflowTaskLineages);
int deleteWorkflowLineage(List<Long> workflowDefinitionCodes);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
index 998fb9de92..f0c0605620 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java
@@ -409,23 +409,10 @@ public class WorkflowDefinitionServiceImpl extends
BaseServiceImpl implements Wo
long workflowDefinitionCode,
int workflowDefinitionVersion,
List<TaskDefinitionLog>
taskDefinitionLogList) {
- List<WorkflowTaskLineage> workflowTaskLineageList =
- generateWorkflowLineageList(taskDefinitionLogList,
workflowDefinitionCode, workflowDefinitionVersion);
- if (workflowTaskLineageList.isEmpty()) {
- return;
- }
+ List<WorkflowTaskLineage> workflowTaskLineageList =
generateWorkflowLineageList(taskDefinitionLogList,
+ workflowDefinitionCode, workflowDefinitionVersion);
- int insertWorkflowLineageResult =
workflowLineageService.updateWorkflowLineage(workflowTaskLineageList);
- if (insertWorkflowLineageResult <= 0) {
- log.error(
- "Save workflow lineage error, projectCode: {},
workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
- projectCode, workflowDefinitionCode,
workflowDefinitionVersion);
- throw new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR);
- } else {
- log.info(
- "Save workflow lineage complete, projectCode: {},
workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
- projectCode, workflowDefinitionCode,
workflowDefinitionVersion);
- }
+ workflowLineageService.updateWorkflowLineage(workflowDefinitionCode,
workflowTaskLineageList);
}
private List<WorkflowTaskLineage>
generateWorkflowLineageList(List<TaskDefinitionLog> taskDefinitionLogList,
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
index 9e56e72ae9..bfd8f91dd5 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java
@@ -38,6 +38,7 @@ import
org.apache.dolphinscheduler.dao.repository.WorkflowTaskLineageDao;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -318,8 +319,26 @@ public class WorkflowLineageServiceImpl extends
BaseServiceImpl implements Workf
}
@Override
- public int updateWorkflowLineage(List<WorkflowTaskLineage>
workflowTaskLineages) {
- return
workflowTaskLineageDao.updateWorkflowTaskLineage(workflowTaskLineages);
+ public int updateWorkflowLineage(long workflowDefinitionCode,
List<WorkflowTaskLineage> workflowTaskLineages) {
+ // Remove existing lineage first to keep data consistent
+ workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(
+ Collections.singletonList(workflowDefinitionCode));
+
+ if (CollectionUtils.isEmpty(workflowTaskLineages)) {
+ log.info("Current lineage is empty, workflowDefinitionCode: {}",
+ workflowDefinitionCode);
+ return 0;
+ }
+
+ int insertResult =
workflowTaskLineageDao.batchInsert(workflowTaskLineages);
+ if (insertResult <= 0) {
+ log.error("Save workflow lineage error, workflowDefinitionCode:
{}", workflowDefinitionCode);
+ throw new ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR);
+ }
+
+ log.info("Save workflow lineage complete, workflowDefinitionCode: {},
inserted rows: {}",
+ workflowDefinitionCode, insertResult);
+ return insertResult;
}
@Override
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
index fc788c7684..d11f1ef9b5 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowDefinitionServiceTest.java
@@ -35,6 +35,7 @@ import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
@@ -1390,4 +1391,93 @@ public class WorkflowDefinitionServiceTest extends
BaseServiceTestTool {
content);
return multipartFile;
}
+
+ @Test
+ public void testSaveWorkflowLineageWithEmptyList() {
+ // Test case: Empty lineage list should delete historical lineage
+ long projectCode = 1L;
+ long workflowDefinitionCode = 100L;
+ int workflowDefinitionVersion = 1;
+ List<TaskDefinitionLog> emptyTaskDefinitionLogList = new ArrayList<>();
+
+ // Mock updateWorkflowLineage to return 0 for empty list
+
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode),
anyList()))
+ .thenReturn(0);
+
+ // Execute - should not throw exception
+ Assertions.assertDoesNotThrow(() -> {
+ processDefinitionService.saveWorkflowLineage(projectCode,
workflowDefinitionCode,
+ workflowDefinitionVersion, emptyTaskDefinitionLogList);
+ });
+
+ // Verify that updateWorkflowLineage was called with empty list
+
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode),
anyList());
+ }
+
+ @Test
+ public void testSaveWorkflowLineageWithNonEmptyList() {
+ // Test case: Normal save with non-empty lineage list
+ long projectCode = 1L;
+ long workflowDefinitionCode = 100L;
+ int workflowDefinitionVersion = 1;
+
+ // Create task definition logs with dependent tasks
+ List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+ TaskDefinitionLog taskLog = new TaskDefinitionLog();
+ taskLog.setCode(200L);
+ taskLog.setVersion(1);
+ taskLog.setProjectCode(projectCode);
+ taskLog.setTaskType("DEPENDENT");
+ // Set taskParams with dependent parameters
+ String taskParams =
+
"{\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"definitionCode\":50,\"depTaskCode\":300}]}]}}";
+ taskLog.setTaskParams(taskParams);
+ taskDefinitionLogList.add(taskLog);
+
+ // Mock updateWorkflowLineage to return success
+
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode),
anyList()))
+ .thenReturn(1);
+
+ // Execute - should not throw exception
+ Assertions.assertDoesNotThrow(() -> {
+ processDefinitionService.saveWorkflowLineage(projectCode,
workflowDefinitionCode,
+ workflowDefinitionVersion, taskDefinitionLogList);
+ });
+
+ // Verify that updateWorkflowLineage was called
+
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode),
anyList());
+ }
+
+ @Test
+ public void testSaveWorkflowLineageWithInsertFailure() {
+ // Test case: Should throw exception when insert fails
+ long projectCode = 1L;
+ long workflowDefinitionCode = 100L;
+ int workflowDefinitionVersion = 1;
+
+ // Create task definition logs
+ List<TaskDefinitionLog> taskDefinitionLogList = new ArrayList<>();
+ TaskDefinitionLog taskLog = new TaskDefinitionLog();
+ taskLog.setCode(200L);
+ taskLog.setVersion(1);
+ taskLog.setProjectCode(projectCode);
+ taskLog.setTaskType("DEPENDENT");
+ String taskParams =
+
"{\"dependence\":{\"dependTaskList\":[{\"dependItemList\":[{\"definitionCode\":50,\"depTaskCode\":300}]}]}}";
+ taskLog.setTaskParams(taskParams);
+ taskDefinitionLogList.add(taskLog);
+
+ // Mock updateWorkflowLineage to throw exception (insert failure)
+
when(workflowLineageService.updateWorkflowLineage(eq(workflowDefinitionCode),
anyList()))
+ .thenThrow(new
ServiceException(Status.CREATE_WORKFLOW_LINEAGE_ERROR));
+
+ // Execute and verify exception
+ ServiceException exception =
Assertions.assertThrows(ServiceException.class, () -> {
+ processDefinitionService.saveWorkflowLineage(projectCode,
workflowDefinitionCode,
+ workflowDefinitionVersion, taskDefinitionLogList);
+ });
+
+
Assertions.assertEquals(Status.CREATE_WORKFLOW_LINEAGE_ERROR.getCode(),
exception.getCode());
+
verify(workflowLineageService).updateWorkflowLineage(eq(workflowDefinitionCode),
anyList());
+ }
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
index 806225e989..c8be7bba69 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowTaskLineageServiceTest.java
@@ -17,8 +17,13 @@
package org.apache.dolphinscheduler.api.service;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.impl.WorkflowLineageServiceImpl;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -256,4 +261,79 @@ public class WorkflowTaskLineageServiceTest {
Assertions.assertFalse(result.isPresent());
}
+ @Test
+ public void testUpdateWorkflowLineageWithNonEmptyList() {
+ // Test case: Normal update with non-empty lineage list
+ long workflowDefinitionCode = 100L;
+ List<WorkflowTaskLineage> workflowTaskLineages = new ArrayList<>();
+
+ WorkflowTaskLineage lineage1 = new WorkflowTaskLineage();
+ lineage1.setWorkflowDefinitionCode(workflowDefinitionCode);
+ lineage1.setTaskDefinitionCode(200L);
+ workflowTaskLineages.add(lineage1);
+
+ WorkflowTaskLineage lineage2 = new WorkflowTaskLineage();
+ lineage2.setWorkflowDefinitionCode(workflowDefinitionCode);
+ lineage2.setTaskDefinitionCode(300L);
+ workflowTaskLineages.add(lineage2);
+
+ // Mock DAO methods
+
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(2);
+
when(workflowTaskLineageDao.batchInsert(workflowTaskLineages)).thenReturn(2);
+
+ // Execute
+ int result =
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode,
workflowTaskLineages);
+
+ // Verify
+ Assertions.assertEquals(2, result);
+ verify(workflowTaskLineageDao)
+
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
+ verify(workflowTaskLineageDao).batchInsert(workflowTaskLineages);
+ }
+
+ @Test
+ public void testUpdateWorkflowLineageWithEmptyList() {
+ // Test case: Empty list should delete historical lineage and return 0
+ long workflowDefinitionCode = 100L;
+ List<WorkflowTaskLineage> emptyList = new ArrayList<>();
+
+ // Mock DAO method
+
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(1);
+
+ // Execute
+ int result =
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode, emptyList);
+
+ // Verify
+ Assertions.assertEquals(0, result);
+ verify(workflowTaskLineageDao)
+
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
+ // batchInsert should not be called when list is empty
+ }
+
+ @Test
+ public void testUpdateWorkflowLineageWithInsertFailure() {
+ // Test case: Should throw exception when insert fails
+ long workflowDefinitionCode = 100L;
+ List<WorkflowTaskLineage> workflowTaskLineages = new ArrayList<>();
+
+ WorkflowTaskLineage lineage1 = new WorkflowTaskLineage();
+ lineage1.setWorkflowDefinitionCode(workflowDefinitionCode);
+ lineage1.setTaskDefinitionCode(200L);
+ workflowTaskLineages.add(lineage1);
+
+ // Mock DAO methods
+
when(workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(anyList())).thenReturn(1);
+
when(workflowTaskLineageDao.batchInsert(workflowTaskLineages)).thenReturn(0);
// Insert failure
+
+ // Execute and verify exception
+ ServiceException exception =
Assertions.assertThrows(ServiceException.class, () -> {
+
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode,
workflowTaskLineages);
+ });
+
+
Assertions.assertEquals(Status.CREATE_WORKFLOW_LINEAGE_ERROR.getCode(),
exception.getCode());
+ verify(workflowTaskLineageDao)
+
.batchDeleteByWorkflowDefinitionCode(eq(java.util.Collections.singletonList(workflowDefinitionCode)));
+ verify(workflowTaskLineageDao).batchInsert(workflowTaskLineages);
+ }
+
}