This is an automated email from the ASF dual-hosted git repository.
caishunfeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 4893bef5a7 [Improvement][TaskInstance] reduce database queries (#11522)
4893bef5a7 is described below
commit 4893bef5a79fc022b74f8c273bd8079517726f35
Author: longtb <[email protected]>
AuthorDate: Tue Aug 23 14:27:26 2022 +0800
[Improvement][TaskInstance] reduce database queries (#11522)
* [Improvement][TaskInstance] reduce database queries
* Update
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
Co-authored-by: caishunfeng <[email protected]>
* [Improvement][TaskInstance] queryByInstanceIdsAndCodes ->
queryByProcessInstanceIdsAndTaskCodes
Co-authored-by: zhangshunmin <[email protected]>
Co-authored-by: caishunfeng <[email protected]>
---
.../service/impl/ProcessDefinitionServiceImpl.java | 34 ++++++++++------
.../service/impl/ProcessInstanceServiceImpl.java | 46 ++++++++++++++--------
.../dao/mapper/TaskInstanceMapper.java | 3 ++
.../dao/mapper/TaskInstanceMapper.xml | 18 +++++++++
.../dao/mapper/TaskInstanceMapperTest.java | 21 ++++++++++
5 files changed, 94 insertions(+), 28 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index 1c7b2aba27..9cc12a8f8f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -114,7 +114,6 @@ import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -722,9 +721,8 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
List<ProcessTaskRelationLog> processTaskRelationLogList =
processTaskRelationLogMapper
.queryByProcessCodeAndVersion(processDefinition.getCode(),
processDefinition.getVersion());
if (taskRelationList.size() == processTaskRelationLogList.size()) {
- Set<ProcessTaskRelationLog> taskRelationSet =
taskRelationList.stream().collect(Collectors.toSet());
- Set<ProcessTaskRelationLog> processTaskRelationLogSet =
-
processTaskRelationLogList.stream().collect(Collectors.toSet());
+ Set<ProcessTaskRelationLog> taskRelationSet = new
HashSet<>(taskRelationList);
+ Set<ProcessTaskRelationLog> processTaskRelationLogSet = new
HashSet<>(processTaskRelationLogList);
if (taskRelationSet.size() ==
processTaskRelationLogSet.size()) {
taskRelationSet.removeAll(processTaskRelationLogSet);
if (!taskRelationSet.isEmpty()) {
@@ -1047,7 +1045,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
@Override
@Transactional
public Map<String, Object> importProcessDefinition(User loginUser, long
projectCode, MultipartFile file) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result;
String dagDataScheduleJson = FileUtils.file2String(file);
List<DagDataSchedule> dagDataScheduleList =
JSONUtils.toList(dagDataScheduleJson, DagDataSchedule.class);
Project project = projectMapper.queryByCode(projectCode);
@@ -1658,7 +1656,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
*/
@Override
public Map<String, Object> viewTree(User loginUser, long projectCode, long
code, Integer limit) {
- Map<String, Object> result = new HashMap<>();
+ Map<String, Object> result;
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
result = projectService.checkProjectAndAuth(loginUser, project,
projectCode, WORKFLOW_TREE_VIEW);
@@ -1716,9 +1714,17 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
while (!ServerLifeCycleManager.isStopped()) {
Set<String> postNodeList;
- Iterator<Map.Entry<String, List<TreeViewDto>>> iter =
runningNodeMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<String, List<TreeViewDto>> en = iter.next();
+ Set<Map.Entry<String, List<TreeViewDto>>> entries =
runningNodeMap.entrySet();
+ List<Integer> processInstanceIds = processInstanceList.stream()
+
.limit(limit).map(ProcessInstance::getId).collect(Collectors.toList());
+ List<Long> nodeCodes = entries.stream().map(e ->
Long.parseLong(e.getKey())).collect(Collectors.toList());
+ List<TaskInstance> taskInstances;
+ if (processInstanceIds.isEmpty() || nodeCodes.isEmpty()) {
+ taskInstances = Collections.emptyList();
+ } else {
+ taskInstances =
taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(processInstanceIds,
nodeCodes);
+ }
+ for (Map.Entry<String, List<TreeViewDto>> en : entries) {
String nodeCode = en.getKey();
parentTreeViewDtoList = en.getValue();
@@ -1730,8 +1736,14 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
// set treeViewDto instances
for (int i = limit - 1; i >= 0; i--) {
ProcessInstance processInstance =
processInstanceList.get(i);
- TaskInstance taskInstance =
taskInstanceMapper.queryByInstanceIdAndCode(processInstance.getId(),
- Long.parseLong(nodeCode));
+ TaskInstance taskInstance = null;
+ for (TaskInstance instance : taskInstances) {
+ if (instance.getTaskCode() == Long.parseLong(nodeCode)
+ && instance.getProcessInstanceId() ==
processInstance.getId()) {
+ taskInstance = instance;
+ break;
+ }
+ }
if (taskInstance == null) {
treeViewDto.getInstances().add(new Instance(-1, "not
running", 0, "null"));
} else {
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 833a734377..67ff0edb9b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -780,24 +780,36 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
ganttDto.setTaskNames(nodeList);
List<Task> taskList = new ArrayList<>();
- for (String node : nodeList) {
- TaskInstance taskInstance =
-
taskInstanceMapper.queryByInstanceIdAndCode(processInstanceId,
Long.parseLong(node));
- if (taskInstance == null) {
- continue;
+ if (!nodeList.isEmpty()) {
+ List<Long> taskCodes =
nodeList.stream().map(Long::parseLong).collect(Collectors.toList());
+ List<TaskInstance> taskInstances =
taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
+ Collections.singletonList(processInstanceId), taskCodes
+ );
+ for (String node : nodeList) {
+ TaskInstance taskInstance = null;
+ for (TaskInstance instance : taskInstances) {
+ if (instance.getProcessInstanceId() == processInstanceId
+ && instance.getTaskCode() == Long.parseLong(node))
{
+ taskInstance = instance;
+ break;
+ }
+ }
+ if (taskInstance == null) {
+ continue;
+ }
+ Date startTime = taskInstance.getStartTime() == null ? new
Date() : taskInstance.getStartTime();
+ Date endTime = taskInstance.getEndTime() == null ? new Date()
: taskInstance.getEndTime();
+ Task task = new Task();
+ task.setTaskName(taskInstance.getName());
+ task.getStartDate().add(startTime.getTime());
+ task.getEndDate().add(endTime.getTime());
+ task.setIsoStart(startTime);
+ task.setIsoEnd(endTime);
+ task.setStatus(taskInstance.getState().toString());
+ task.setExecutionDate(taskInstance.getStartTime());
+ task.setDuration(DateUtils.format2Readable(endTime.getTime() -
startTime.getTime()));
+ taskList.add(task);
}
- Date startTime = taskInstance.getStartTime() == null ? new Date()
: taskInstance.getStartTime();
- Date endTime = taskInstance.getEndTime() == null ? new Date() :
taskInstance.getEndTime();
- Task task = new Task();
- task.setTaskName(taskInstance.getName());
- task.getStartDate().add(startTime.getTime());
- task.getEndDate().add(endTime.getTime());
- task.setIsoStart(startTime);
- task.setIsoEnd(endTime);
- task.setStatus(taskInstance.getState().toString());
- task.setExecutionDate(taskInstance.getStartTime());
- task.setDuration(DateUtils.format2Readable(endTime.getTime() -
startTime.getTime()));
- taskList.add(task);
}
ganttDto.setTasks(taskList);
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
index ca68dc57e5..2ab78f5151 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
@@ -53,6 +53,9 @@ public interface TaskInstanceMapper extends
BaseMapper<TaskInstance> {
TaskInstance queryByInstanceIdAndCode(@Param("processInstanceId") int
processInstanceId,
@Param("taskCode") Long taskCode);
+ List<TaskInstance>
queryByProcessInstanceIdsAndTaskCodes(@Param("processInstanceIds")
List<Integer> processInstanceIds,
+ @Param("taskCodes")
List<Long> taskCodes);
+
Integer countTask(@Param("projectCodes") Long[] projectCodes,
@Param("taskIds") int[] taskIds);
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
index e09b8aa887..0ba80db75f 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
@@ -134,6 +134,24 @@
and flag = 1
limit 1
</select>
+ <select id="queryByProcessInstanceIdsAndTaskCodes"
resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">
+ select
+ <include refid="baseSql"/>
+ from t_ds_task_instance
+ where flag = 1
+ <if test="processInstanceIds != null and processInstanceIds.size() !=
0">
+ and process_instance_id in
+ <foreach collection="processInstanceIds" index="index" item="i"
open="(" separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ <if test="taskCodes != null and taskCodes.size() != 0">
+ and task_code in
+ <foreach collection="taskCodes" index="index" item="i" open="("
separator="," close=")">
+ #{i}
+ </foreach>
+ </if>
+ </select>
<select id="countTask" resultType="java.lang.Integer">
select count(1) as count
from t_ds_task_instance task,t_ds_task_definition_log define
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
index fa5a256caa..014a2a2c3c 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
@@ -26,6 +26,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -278,6 +279,26 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
Assert.assertNotEquals(taskInstance, null);
}
+ /**
+ * test query by process instance ids and task codes
+ */
+ @Test
+ public void testQueryByProcessInstanceIdsAndTaskCodes() {
+ // insert ProcessInstance
+ ProcessInstance processInstance = insertProcessInstance();
+
+ // insert taskInstance
+ TaskInstance task = insertTaskInstance(processInstance.getId());
+ task.setHost("111.111.11.11");
+ taskInstanceMapper.updateById(task);
+
+ List<TaskInstance> taskInstances =
taskInstanceMapper.queryByProcessInstanceIdsAndTaskCodes(
+ Collections.singletonList(task.getProcessInstanceId()),
+ Collections.singletonList(task.getTaskCode()));
+ taskInstanceMapper.deleteById(task.getId());
+ Assert.assertEquals(taskInstances.size(), 1);
+ }
+
/**
* test count task instance
*/