This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new c4375a5 [Fix][Task] Task lost resource information (#6292) (#6295)
c4375a5 is described below
commit c4375a54c167f8e96d80c3cc67ad0a607ba2ef7b
Author: LiuBodong <[email protected]>
AuthorDate: Thu Sep 23 22:23:17 2021 +0800
[Fix][Task] Task lost resource information (#6292) (#6295)
Co-authored-by: liubodong <liubodong>
---
.../common/process/ResourceInfo.java | 51 +++++---
.../service/process/ProcessService.java | 69 +++++++++++
.../service/process/ProcessServiceTest.java | 129 ++++++++++++++++++++-
3 files changed, 231 insertions(+), 18 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
index 287f726..cacdbf5 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
@@ -20,26 +20,43 @@ package org.apache.dolphinscheduler.common.process;
* resource info
*/
public class ResourceInfo {
- /**
- * res the name of the resource that was uploaded
- */
- private int id;
+ /**
+ * res id of the resource that was uploaded
+ */
+ private int id;
- public int getId() {
- return id;
- }
+ private String res;
- public void setId(int id) {
- this.id = id;
- }
+ /**
+ * full name of the resource that was uploaded
+ */
+ private String resourceName;
- private String res;
+ public ResourceInfo() {
+ // do nothing, void constructor
+ }
- public String getRes() {
- return res;
- }
+ public int getId() {
+ return id;
+ }
- public void setRes(String res) {
- this.res = res;
- }
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getRes() {
+ return res;
+ }
+
+ public void setRes(String res) {
+ this.res = res;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
}
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c652a51..dca69ff 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -30,6 +30,7 @@ import static
org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
import static java.util.stream.Collectors.toSet;
+import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -1527,11 +1528,79 @@ public class ProcessService {
TaskDefinition taskDefinition =
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(),
taskInstance.getTaskDefinitionVersion());
+ updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
return taskInstance;
}
/**
+ * Update {@link ResourceInfo} information in {@link TaskDefinition}
+ *
+ * @param taskDefinition the given {@link TaskDefinition}
+ */
+ private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
+ Map<String, Object> taskParameters = JSONUtils.parseObject(
+ taskDefinition.getTaskParams(),
+ new TypeReference<Map<String, Object>>() { });
+ if (taskParameters != null) {
+ // if contains mainJar field, query resource from database
+ // Flink, Spark, MR
+ if (taskParameters.containsKey("mainJar")) {
+ Object mainJarObj = taskParameters.get("mainJar");
+ ResourceInfo mainJar = JSONUtils.parseObject(
+ JSONUtils.toJsonString(mainJarObj),
+ ResourceInfo.class);
+ ResourceInfo resourceInfo = updateResourceInfo(mainJar);
+ if (resourceInfo != null) {
+ taskParameters.put("mainJar", resourceInfo);
+ }
+ }
+ // update resourceList information
+ if (taskParameters.containsKey("resourceList")) {
+ String resourceListStr =
JSONUtils.toJsonString(taskParameters.get("resourceList"));
+ List<ResourceInfo> resourceInfos =
JSONUtils.toList(resourceListStr, ResourceInfo.class);
+ List<ResourceInfo> updatedResourceInfos = resourceInfos
+ .stream()
+ .map(this::updateResourceInfo)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ taskParameters.put("resourceList", updatedResourceInfos);
+ }
+ // set task parameters
+
taskDefinition.setTaskParams(JSONUtils.toJsonString(taskParameters));
+ }
+ }
+
+ /**
+ * update {@link ResourceInfo} by given original ResourceInfo
+ *
+ * @param res origin resource info
+ * @return {@link ResourceInfo}
+ */
+ private ResourceInfo updateResourceInfo(ResourceInfo res) {
+ ResourceInfo resourceInfo = null;
+ // only if mainJar is not null and does not contains "resourceName"
field
+ if (res != null) {
+ int resourceId = res.getId();
+ if (resourceId <= 0) {
+ logger.error("invalid resourceId, {}", resourceId);
+ return null;
+ }
+ resourceInfo = new ResourceInfo();
+ // get resource from database, only one resource should be returned
+ Resource resource = getResourceById(resourceId);
+ resourceInfo.setId(resourceId);
+ resourceInfo.setRes(resource.getFileName());
+ resourceInfo.setResourceName(resource.getFullName());
+ if (logger.isInfoEnabled()) {
+ logger.info("updated resource info {}",
+ JSONUtils.toJsonString(resourceInfo));
+ }
+ }
+ return resourceInfo;
+ }
+
+ /**
* get id list by task state
*
* @param instanceId instanceId
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 1bbbca3..69b708a 100644
---
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.service.process;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
-
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
@@ -32,6 +31,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
@@ -41,6 +42,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -51,6 +54,7 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@@ -58,10 +62,13 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Test;
@@ -70,6 +77,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,6 +116,8 @@ public class ProcessServiceTest {
private ProcessTaskRelationMapper processTaskRelationMapper;
@Mock
private ProcessDefinitionLogMapper processDefineLogMapper;
+ @Mock
+ private ResourceMapper resourceMapper;
@Test
public void testCreateSubCommand() {
@@ -477,4 +487,121 @@ public class ProcessServiceTest {
processService.changeOutParam(taskInstance);
}
+ @Test
+ public void testUpdateTaskDefinitionResources() throws Exception {
+ TaskDefinition taskDefinition = new TaskDefinition();
+ String taskParameters = "{\n"
+ + " \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
+ + " \"mainJar\": {\n"
+ + " \"id\": 1\n"
+ + " },\n"
+ + " \"deployMode\": \"cluster\",\n"
+ + " \"resourceList\": [\n"
+ + " {\n"
+ + " \"id\": 3\n"
+ + " },\n"
+ + " {\n"
+ + " \"id\": 4\n"
+ + " }\n"
+ + " ],\n"
+ + " \"localParams\": [],\n"
+ + " \"driverCores\": 1,\n"
+ + " \"driverMemory\": \"512M\",\n"
+ + " \"numExecutors\": 2,\n"
+ + " \"executorMemory\": \"2G\",\n"
+ + " \"executorCores\": 2,\n"
+ + " \"appName\": \"\",\n"
+ + " \"mainArgs\": \"\",\n"
+ + " \"others\": \"\",\n"
+ + " \"programType\": \"JAVA\",\n"
+ + " \"sparkVersion\": \"SPARK2\",\n"
+ + " \"dependence\": {},\n"
+ + " \"conditionResult\": {\n"
+ + " \"successNode\": [\n"
+ + " \"\"\n"
+ + " ],\n"
+ + " \"failedNode\": [\n"
+ + " \"\"\n"
+ + " ]\n"
+ + " },\n"
+ + " \"waitStartTimeout\": {}\n"
+ + "}";
+ taskDefinition.setTaskParams(taskParameters);
+
+ Map<Integer, Resource> resourceMap =
+ Stream.of(1, 3, 4)
+ .map(i -> {
+ Resource resource = new Resource();
+ resource.setId(i);
+ resource.setFileName("file" + i);
+ resource.setFullName("/file" + i);
+ return resource;
+ })
+ .collect(
+ Collectors.toMap(
+ Resource::getId,
+ resource -> resource)
+ );
+ for (Integer integer : Arrays.asList(1, 3, 4)) {
+ Mockito.when(resourceMapper.selectById(integer))
+ .thenReturn(resourceMap.get(integer));
+ }
+
+ Whitebox.invokeMethod(processService,
+ "updateTaskDefinitionResources",
+ taskDefinition);
+
+ String taskParams = taskDefinition.getTaskParams();
+ SparkParameters sparkParameters = JSONUtils.parseObject(taskParams,
SparkParameters.class);
+ ResourceInfo mainJar = sparkParameters.getMainJar();
+ Assert.assertEquals(1, mainJar.getId());
+ Assert.assertEquals("file1", mainJar.getRes());
+ Assert.assertEquals("/file1", mainJar.getResourceName());
+
+ Assert.assertEquals(2, sparkParameters.getResourceList().size());
+ ResourceInfo res1 = sparkParameters.getResourceList().get(0);
+ ResourceInfo res2 = sparkParameters.getResourceList().get(1);
+ Assert.assertEquals(3, res1.getId());
+ Assert.assertEquals("file3", res1.getRes());
+ Assert.assertEquals("/file3", res1.getResourceName());
+ Assert.assertEquals(4, res2.getId());
+ Assert.assertEquals("file4", res2.getRes());
+ Assert.assertEquals("/file4", res2.getResourceName());
+
+ }
+
+ @Test
+ public void testUpdateResourceInfo() throws Exception {
+ // test if input is null
+ ResourceInfo resourceInfoNull = null;
+ ResourceInfo updatedResourceInfo1 =
Whitebox.invokeMethod(processService,
+ "updateResourceInfo",
+ resourceInfoNull);
+ Assert.assertNull(updatedResourceInfo1);
+
+ // test if resource id less than 1
+ ResourceInfo resourceInfoVoid = new ResourceInfo();
+ ResourceInfo updatedResourceInfo2 =
Whitebox.invokeMethod(processService,
+ "updateResourceInfo",
+ resourceInfoVoid);
+ Assert.assertNull(updatedResourceInfo2);
+
+ // test normal situation
+ ResourceInfo resourceInfoNormal = new ResourceInfo();
+ resourceInfoNormal.setId(1);
+ Resource resource = new Resource();
+ resource.setId(1);
+ resource.setFileName("test.txt");
+ resource.setFullName("/test.txt");
+ Mockito.when(resourceMapper.selectById(1)).thenReturn(resource);
+ ResourceInfo updatedResourceInfo3 =
Whitebox.invokeMethod(processService,
+ "updateResourceInfo",
+ resourceInfoNormal);
+
+ Assert.assertEquals(1, updatedResourceInfo3.getId());
+ Assert.assertEquals("test.txt", updatedResourceInfo3.getRes());
+ Assert.assertEquals("/test.txt",
updatedResourceInfo3.getResourceName());
+
+ }
+
}