This is an automated email from the ASF dual-hosted git repository.

journey pushed a commit to branch 1.3.1-release
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.1-release by this push:
     new 07a6aec  fix 2975:download resource need find the tenant code of 
resource owner (#3109)
07a6aec is described below

commit 07a6aec1fc4b89b6704d669ee92eee38ed776fcc
Author: lgcareer <[email protected]>
AuthorDate: Wed Jul 1 19:04:03 2020 +0800

    fix 2975:download resource need find the tenant code of resource owner 
(#3109)
---
 .../dolphinscheduler/dao/entity/TaskInstance.java  | 20 ++++++++--------
 .../server/entity/TaskExecutionContext.java        |  9 ++++----
 .../master/consumer/TaskPriorityQueueConsumer.java | 27 +++++++++++-----------
 .../server/worker/runner/TaskExecuteThread.java    | 23 ++++++++++--------
 4 files changed, 42 insertions(+), 37 deletions(-)

diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 53b56e5..5a6aa49 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -16,20 +16,20 @@
  */
 package org.apache.dolphinscheduler.dao.entity;
 
+import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.Flag;
 import org.apache.dolphinscheduler.common.enums.Priority;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
 
 import java.io.Serializable;
 import java.util.Date;
-import java.util.List;
+import java.util.Map;
 
 /**
  * task instance
@@ -209,7 +209,7 @@ public class TaskInstance implements Serializable {
 
 
     @TableField(exist = false)
-    private List<String> resources;
+    private Map<String,String> resources;
 
 
 
@@ -451,10 +451,14 @@ public class TaskInstance implements Serializable {
                 || (this.getState().typeIsFailure() && !taskCanRetry());
     }
 
-    public List<String> getResources() {
+    public Map<String, String> getResources() {
         return resources;
     }
 
+    public void setResources(Map<String, String> resources) {
+        this.resources = resources;
+    }
+
     public boolean isSubProcess(){
         return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
     }
@@ -467,9 +471,7 @@ public class TaskInstance implements Serializable {
         return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
     }
 
-    public void setResources(List<String> resources) {
-        this.resources = resources;
-    }
+
 
     /**
      * determine if you can try again
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 563f5c8..41331ab 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -23,7 +23,6 @@ import 
org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
 
 import java.io.Serializable;
 import java.util.Date;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -168,9 +167,9 @@ public class TaskExecutionContext implements Serializable{
     private String workerGroup;
 
     /**
-     * resources full name
+     * resources full name and tenant code
      */
-    private List<String> resources;
+    private Map<String,String> resources;
 
     /**
      *  sql TaskExecutionContext
@@ -443,11 +442,11 @@ public class TaskExecutionContext implements Serializable{
         this.dependenceTaskExecutionContext = dependenceTaskExecutionContext;
     }
 
-    public List<String> getResources() {
+    public Map<String, String> getResources() {
         return resources;
     }
 
-    public void setResources(List<String> resources) {
+    public void setResources(Map<String, String> resources) {
         this.resources = resources;
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 904914a..ee48ca0 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
 import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -32,7 +33,6 @@ import 
org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import 
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
 import 
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -50,14 +50,10 @@ import 
org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
 /**
  * TaskUpdateQueue consumer
  */
@@ -362,10 +358,10 @@ public class TaskPriorityQueueConsumer extends Thread{
     }
 
     /**
-     * get resource full name list
+     * get resource map key is full name and value is tenantCode
      */
-    private List<String> getResourceFullNames(TaskNode taskNode) {
-        List<String> resourceFullNameList = new ArrayList<>();
+    private Map<String,String> getResourceFullNames(TaskNode taskNode) {
+        Map<String,String> resourceMap = new HashMap<>();
         AbstractParameters baseParam = 
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
 
         if (baseParam != null) {
@@ -375,7 +371,10 @@ public class TaskPriorityQueueConsumer extends Thread{
                 // filter the resources that the resource id equals 0
                 Set<ResourceInfo> oldVersionResources = 
projectResourceFiles.stream().filter(t -> t.getId() == 
0).collect(Collectors.toSet());
                 if (CollectionUtils.isNotEmpty(oldVersionResources)) {
-                    
resourceFullNameList.addAll(oldVersionResources.stream().map(resource -> 
resource.getRes()).collect(Collectors.toSet()));
+
+                    oldVersionResources.forEach(
+                            (t)->resourceMap.put(t.getRes(), 
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
+                    );
                 }
 
                 // get the resource id in order to get the resource names in 
batch
@@ -386,13 +385,13 @@ public class TaskPriorityQueueConsumer extends Thread{
                     Integer[] resourceIds = resourceIdsSet.toArray(new 
Integer[resourceIdsSet.size()]);
 
                     List<Resource> resources = 
processService.listResourceByIds(resourceIds);
-                    resourceFullNameList.addAll(resources.stream()
-                            .map(resourceInfo -> resourceInfo.getFullName())
-                            .collect(Collectors.toList()));
+                    resources.forEach(
+                            
(t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(),
 ResourceType.FILE))
+                    );
                 }
             }
         }
 
-        return resourceFullNameList;
+        return resourceMap;
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index d2d783a..592060b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -18,13 +18,16 @@ package org.apache.dolphinscheduler.server.worker.runner;
 
 
 import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.collections.MapUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.model.TaskNode;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@@ -94,7 +97,6 @@ public class TaskExecuteThread implements Runnable {
             // copy hdfs/minio file to local
             downloadResource(taskExecutionContext.getExecutePath(),
                     taskExecutionContext.getResources(),
-                    taskExecutionContext.getTenantCode(),
                     logger);
 
             taskExecutionContext.setTaskParams(taskNode.getParams());
@@ -224,22 +226,25 @@ public class TaskExecuteThread implements Runnable {
      * @param logger
      */
     private void downloadResource(String execLocalPath,
-                                  List<String> projectRes,
-                                  String tenantCode,
+                                  Map<String,String> projectRes,
                                   Logger logger) throws Exception {
-        if (CollectionUtils.isEmpty(projectRes)){
+        if (MapUtils.isEmpty(projectRes)){
             return;
         }
 
-        for (String resource : projectRes) {
-            File resFile = new File(execLocalPath, resource);
+        Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
+
+        for (Map.Entry<String,String> resource : resEntries) {
+            String fullName = resource.getKey();
+            String tenantCode = resource.getValue();
+            File resFile = new File(execLocalPath, fullName);
             if (!resFile.exists()) {
                 try {
                     // query the tenant code of the resource according to the 
name of the resource
-                    String resHdfsPath = 
HadoopUtils.getHdfsResourceFileName(tenantCode, resource);
+                    String resHdfsPath = 
HadoopUtils.getHdfsResourceFileName(tenantCode, fullName);
 
                     logger.info("get resource file from hdfs :{}", 
resHdfsPath);
-                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, 
execLocalPath + File.separator + resource, false, true);
+                    HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, 
execLocalPath + File.separator + fullName, false, true);
                 }catch (Exception e){
                     logger.error(e.getMessage(),e);
                     throw new RuntimeException(e.getMessage());

Reply via email to