This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new c4375a5  [Fix][Task] Task lost resource information (#6292) (#6295)
c4375a5 is described below

commit c4375a54c167f8e96d80c3cc67ad0a607ba2ef7b
Author: LiuBodong <[email protected]>
AuthorDate: Thu Sep 23 22:23:17 2021 +0800

    [Fix][Task] Task lost resource information (#6292) (#6295)
    
    
    
    Co-authored-by: liubodong <liubodong>
---
 .../common/process/ResourceInfo.java               |  51 +++++---
 .../service/process/ProcessService.java            |  69 +++++++++++
 .../service/process/ProcessServiceTest.java        | 129 ++++++++++++++++++++-
 3 files changed, 231 insertions(+), 18 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
index 287f726..cacdbf5 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/process/ResourceInfo.java
@@ -20,26 +20,43 @@ package org.apache.dolphinscheduler.common.process;
  * resource info
  */
 public class ResourceInfo {
-  /**
-   * res the name of the resource that was uploaded
-   */
-  private int id;
+    /**
+     * res id of the resource that was uploaded
+     */
+    private int id;
 
-  public int getId() {
-    return id;
-  }
+    private String res;
 
-  public void setId(int id) {
-    this.id = id;
-  }
+    /**
+     * full name of the resource that was uploaded
+     */
+    private String resourceName;
 
-  private String res;
+    public ResourceInfo() {
+        // do nothing, void constructor
+    }
 
-  public String getRes() {
-    return res;
-  }
+    public int getId() {
+        return id;
+    }
 
-  public void setRes(String res) {
-    this.res = res;
-  }
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    public String getRes() {
+        return res;
+    }
+
+    public void setRes(String res) {
+        this.res = res;
+    }
+
+    public String getResourceName() {
+        return resourceName;
+    }
+
+    public void setResourceName(String resourceName) {
+        this.resourceName = resourceName;
+    }
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index c652a51..dca69ff 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -30,6 +30,7 @@ import static 
org.apache.dolphinscheduler.common.Constants.YYYY_MM_DD_HH_MM_SS;
 
 import static java.util.stream.Collectors.toSet;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.AuthorizationType;
 import org.apache.dolphinscheduler.common.enums.CommandType;
@@ -1527,11 +1528,79 @@ public class ProcessService {
         TaskDefinition taskDefinition = 
taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
             taskInstance.getTaskCode(),
             taskInstance.getTaskDefinitionVersion());
+        updateTaskDefinitionResources(taskDefinition);
         taskInstance.setTaskDefine(taskDefinition);
         return taskInstance;
     }
 
     /**
+     * Update {@link ResourceInfo} information in {@link TaskDefinition}
+     *
+     * @param taskDefinition the given {@link TaskDefinition}
+     */
+    private void updateTaskDefinitionResources(TaskDefinition taskDefinition) {
+        Map<String, Object> taskParameters = JSONUtils.parseObject(
+                taskDefinition.getTaskParams(),
+                new TypeReference<Map<String, Object>>() { });
+        if (taskParameters != null) {
+            // if contains mainJar field, query resource from database
+            // Flink, Spark, MR
+            if (taskParameters.containsKey("mainJar")) {
+                Object mainJarObj = taskParameters.get("mainJar");
+                ResourceInfo mainJar = JSONUtils.parseObject(
+                        JSONUtils.toJsonString(mainJarObj),
+                        ResourceInfo.class);
+                ResourceInfo resourceInfo = updateResourceInfo(mainJar);
+                if (resourceInfo != null) {
+                    taskParameters.put("mainJar", resourceInfo);
+                }
+            }
+            // update resourceList information
+            if (taskParameters.containsKey("resourceList")) {
+                String resourceListStr = 
JSONUtils.toJsonString(taskParameters.get("resourceList"));
+                List<ResourceInfo> resourceInfos = 
JSONUtils.toList(resourceListStr, ResourceInfo.class);
+                List<ResourceInfo> updatedResourceInfos = resourceInfos
+                        .stream()
+                        .map(this::updateResourceInfo)
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toList());
+                taskParameters.put("resourceList", updatedResourceInfos);
+            }
+            // set task parameters
+            
taskDefinition.setTaskParams(JSONUtils.toJsonString(taskParameters));
+        }
+    }
+
+    /**
+     * update {@link ResourceInfo} by given original ResourceInfo
+     *
+     * @param res origin resource info
+     * @return {@link ResourceInfo}
+     */
+    private ResourceInfo updateResourceInfo(ResourceInfo res) {
+        ResourceInfo resourceInfo = null;
+        // only if mainJar is not null and does not contains "resourceName" 
field
+        if (res != null) {
+            int resourceId = res.getId();
+            if (resourceId <= 0) {
+                logger.error("invalid resourceId, {}", resourceId);
+                return null;
+            }
+            resourceInfo = new ResourceInfo();
+            // get resource from database, only one resource should be returned
+            Resource resource = getResourceById(resourceId);
+            resourceInfo.setId(resourceId);
+            resourceInfo.setRes(resource.getFileName());
+            resourceInfo.setResourceName(resource.getFullName());
+            if (logger.isInfoEnabled()) {
+                logger.info("updated resource info {}",
+                        JSONUtils.toJsonString(resourceInfo));
+            }
+        }
+        return resourceInfo;
+    }
+
+    /**
      * get id list by task state
      *
      * @param instanceId instanceId
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
index 1bbbca3..69b708a 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.service.process;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
 import static 
org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_ID;
-
 import static org.mockito.ArgumentMatchers.any;
 
 import org.apache.dolphinscheduler.common.Constants;
@@ -32,6 +31,8 @@ import org.apache.dolphinscheduler.common.enums.WarningType;
 import org.apache.dolphinscheduler.common.graph.DAG;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
+import org.apache.dolphinscheduler.common.process.ResourceInfo;
+import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.dao.entity.Command;
@@ -41,6 +42,8 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
 import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.Resource;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
 import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.dao.entity.User;
@@ -51,6 +54,7 @@ import 
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.ProcessTaskRelationMapper;
+import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
 import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
@@ -58,10 +62,13 @@ import org.apache.dolphinscheduler.dao.mapper.UserMapper;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,6 +77,7 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,6 +116,8 @@ public class ProcessServiceTest {
     private ProcessTaskRelationMapper processTaskRelationMapper;
     @Mock
     private ProcessDefinitionLogMapper processDefineLogMapper;
+    @Mock
+    private ResourceMapper resourceMapper;
 
     @Test
     public void testCreateSubCommand() {
@@ -477,4 +487,121 @@ public class ProcessServiceTest {
         processService.changeOutParam(taskInstance);
     }
 
+    @Test
+    public void testUpdateTaskDefinitionResources() throws Exception {
+        TaskDefinition taskDefinition = new TaskDefinition();
+        String taskParameters = "{\n"
+            + "    \"mainClass\": \"org.apache.dolphinscheduler.SparkTest\",\n"
+            + "    \"mainJar\": {\n"
+            + "        \"id\": 1\n"
+            + "    },\n"
+            + "    \"deployMode\": \"cluster\",\n"
+            + "    \"resourceList\": [\n"
+            + "        {\n"
+            + "            \"id\": 3\n"
+            + "        },\n"
+            + "        {\n"
+            + "            \"id\": 4\n"
+            + "        }\n"
+            + "    ],\n"
+            + "    \"localParams\": [],\n"
+            + "    \"driverCores\": 1,\n"
+            + "    \"driverMemory\": \"512M\",\n"
+            + "    \"numExecutors\": 2,\n"
+            + "    \"executorMemory\": \"2G\",\n"
+            + "    \"executorCores\": 2,\n"
+            + "    \"appName\": \"\",\n"
+            + "    \"mainArgs\": \"\",\n"
+            + "    \"others\": \"\",\n"
+            + "    \"programType\": \"JAVA\",\n"
+            + "    \"sparkVersion\": \"SPARK2\",\n"
+            + "    \"dependence\": {},\n"
+            + "    \"conditionResult\": {\n"
+            + "        \"successNode\": [\n"
+            + "            \"\"\n"
+            + "        ],\n"
+            + "        \"failedNode\": [\n"
+            + "            \"\"\n"
+            + "        ]\n"
+            + "    },\n"
+            + "    \"waitStartTimeout\": {}\n"
+            + "}";
+        taskDefinition.setTaskParams(taskParameters);
+
+        Map<Integer, Resource> resourceMap =
+            Stream.of(1, 3, 4)
+                .map(i -> {
+                    Resource resource = new Resource();
+                    resource.setId(i);
+                    resource.setFileName("file" + i);
+                    resource.setFullName("/file" + i);
+                    return resource;
+                })
+                .collect(
+                    Collectors.toMap(
+                        Resource::getId,
+                        resource -> resource)
+                );
+        for (Integer integer : Arrays.asList(1, 3, 4)) {
+            Mockito.when(resourceMapper.selectById(integer))
+                .thenReturn(resourceMap.get(integer));
+        }
+
+        Whitebox.invokeMethod(processService,
+            "updateTaskDefinitionResources",
+            taskDefinition);
+
+        String taskParams = taskDefinition.getTaskParams();
+        SparkParameters sparkParameters = JSONUtils.parseObject(taskParams, 
SparkParameters.class);
+        ResourceInfo mainJar = sparkParameters.getMainJar();
+        Assert.assertEquals(1, mainJar.getId());
+        Assert.assertEquals("file1", mainJar.getRes());
+        Assert.assertEquals("/file1", mainJar.getResourceName());
+
+        Assert.assertEquals(2, sparkParameters.getResourceList().size());
+        ResourceInfo res1 = sparkParameters.getResourceList().get(0);
+        ResourceInfo res2 = sparkParameters.getResourceList().get(1);
+        Assert.assertEquals(3, res1.getId());
+        Assert.assertEquals("file3", res1.getRes());
+        Assert.assertEquals("/file3", res1.getResourceName());
+        Assert.assertEquals(4, res2.getId());
+        Assert.assertEquals("file4", res2.getRes());
+        Assert.assertEquals("/file4", res2.getResourceName());
+
+    }
+
+    @Test
+    public void testUpdateResourceInfo() throws Exception {
+        // test if input is null
+        ResourceInfo resourceInfoNull = null;
+        ResourceInfo updatedResourceInfo1 = 
Whitebox.invokeMethod(processService,
+            "updateResourceInfo",
+            resourceInfoNull);
+        Assert.assertNull(updatedResourceInfo1);
+
+        // test if resource id less than 1
+        ResourceInfo resourceInfoVoid = new ResourceInfo();
+        ResourceInfo updatedResourceInfo2 = 
Whitebox.invokeMethod(processService,
+            "updateResourceInfo",
+            resourceInfoVoid);
+        Assert.assertNull(updatedResourceInfo2);
+
+        // test normal situation
+        ResourceInfo resourceInfoNormal = new ResourceInfo();
+        resourceInfoNormal.setId(1);
+        Resource resource = new Resource();
+        resource.setId(1);
+        resource.setFileName("test.txt");
+        resource.setFullName("/test.txt");
+        Mockito.when(resourceMapper.selectById(1)).thenReturn(resource);
+        ResourceInfo updatedResourceInfo3 = 
Whitebox.invokeMethod(processService,
+            "updateResourceInfo",
+            resourceInfoNormal);
+
+        Assert.assertEquals(1, updatedResourceInfo3.getId());
+        Assert.assertEquals("test.txt", updatedResourceInfo3.getRes());
+        Assert.assertEquals("/test.txt", 
updatedResourceInfo3.getResourceName());
+
+    }
+
 }

Reply via email to