ruanwenjun commented on code in PR #17678:
URL:
https://github.com/apache/dolphinscheduler/pull/17678#discussion_r2536675872
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java:
##########
@@ -409,14 +409,16 @@ public void saveWorkflowLineage(long projectCode,
long workflowDefinitionCode,
int workflowDefinitionVersion,
List<TaskDefinitionLog>
taskDefinitionLogList) {
- List<WorkflowTaskLineage> workflowTaskLineageList =
- generateWorkflowLineageList(taskDefinitionLogList,
workflowDefinitionCode, workflowDefinitionVersion);
- if (workflowTaskLineageList.isEmpty()) {
- return;
- }
+ List<WorkflowTaskLineage> workflowTaskLineageList =
generateWorkflowLineageList(taskDefinitionLogList,
+ workflowDefinitionCode, workflowDefinitionVersion);
- int insertWorkflowLineageResult =
workflowLineageService.updateWorkflowLineage(workflowTaskLineageList);
- if (insertWorkflowLineageResult <= 0) {
+ int insertWorkflowLineageResult =
+
workflowLineageService.updateWorkflowLineage(workflowDefinitionCode,
workflowTaskLineageList);
+ if (CollectionUtils.isEmpty(workflowTaskLineageList)) {
+ log.info(
+ "Delete workflow lineage because current lineage is empty,
projectCode: {}, workflowDefinitionCode: {}, workflowDefinitionVersion: {}",
+ projectCode, workflowDefinitionCode,
workflowDefinitionVersion);
+ } else if (insertWorkflowLineageResult <= 0) {
Review Comment:
```suggestion
if (!CollectionUtils.isNotEmpty(workflowTaskLineageList) &&
insertWorkflowLineageResult <= 0) {
```
Remove the log here, this service should not be aware of delete operations.
In fact, throwing exceptions should be the responsibility of the DAO layer.
##########
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java:
##########
@@ -318,8 +319,26 @@ public int createWorkflowLineage(List<WorkflowTaskLineage>
workflowTaskLineages)
}
@Override
- public int updateWorkflowLineage(List<WorkflowTaskLineage>
workflowTaskLineages) {
- return
workflowTaskLineageDao.updateWorkflowTaskLineage(workflowTaskLineages);
+ public int updateWorkflowLineage(long workflowDefinitionCode,
List<WorkflowTaskLineage> workflowTaskLineages) {
+ // Remove existing lineage first to keep data consistent
+ workflowTaskLineageDao.batchDeleteByWorkflowDefinitionCode(
+ Collections.singletonList(workflowDefinitionCode));
+
+ if (CollectionUtils.isEmpty(workflowTaskLineages)) {
+ return 0;
+ }
+
+ boolean hasMismatch = workflowTaskLineages.stream()
+ .anyMatch(lineage -> lineage.getWorkflowDefinitionCode() !=
workflowDefinitionCode);
+ if (hasMismatch) {
+ log.warn("Skip updating lineage due to workflowDefinitionCode
mismatch, expected: {}",
+ workflowDefinitionCode);
+ throw new IllegalArgumentException(
+ String.format("All lineage items must belong to
workflowDefinitionCode %s",
+ workflowDefinitionCode));
+ }
Review Comment:
```suggestion
```
We don't need to check this here, otherwise we need to do a lot of check
logic in every method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]