This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch 1.3.1-release
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/1.3.1-release by this push:
new 07a6aec fix 2975:download resource need find the tenant code of
resource owner (#3109)
07a6aec is described below
commit 07a6aec1fc4b89b6704d669ee92eee38ed776fcc
Author: lgcareer <[email protected]>
AuthorDate: Wed Jul 1 19:04:03 2020 +0800
fix 2975:download resource need find the tenant code of resource owner
(#3109)
---
.../dolphinscheduler/dao/entity/TaskInstance.java | 20 ++++++++--------
.../server/entity/TaskExecutionContext.java | 9 ++++----
.../master/consumer/TaskPriorityQueueConsumer.java | 27 +++++++++++-----------
.../server/worker/runner/TaskExecuteThread.java | 23 ++++++++++--------
4 files changed, 42 insertions(+), 37 deletions(-)
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 53b56e5..5a6aa49 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -16,20 +16,20 @@
*/
package org.apache.dolphinscheduler.dao.entity;
+import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.util.Date;
-import java.util.List;
+import java.util.Map;
/**
* task instance
@@ -209,7 +209,7 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
- private List<String> resources;
+ private Map<String,String> resources;
@@ -451,10 +451,14 @@ public class TaskInstance implements Serializable {
|| (this.getState().typeIsFailure() && !taskCanRetry());
}
- public List<String> getResources() {
+ public Map<String, String> getResources() {
return resources;
}
+ public void setResources(Map<String, String> resources) {
+ this.resources = resources;
+ }
+
public boolean isSubProcess(){
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}
@@ -467,9 +471,7 @@ public class TaskInstance implements Serializable {
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
}
- public void setResources(List<String> resources) {
- this.resources = resources;
- }
+
/**
* determine if you can try again
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
index 563f5c8..41331ab 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java
@@ -23,7 +23,6 @@ import
org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
-import java.util.List;
import java.util.Map;
/**
@@ -168,9 +167,9 @@ public class TaskExecutionContext implements Serializable{
private String workerGroup;
/**
- * resources full name
+ * resources full name and tenant code
*/
- private List<String> resources;
+ private Map<String,String> resources;
/**
* sql TaskExecutionContext
@@ -443,11 +442,11 @@ public class TaskExecutionContext implements Serializable{
this.dependenceTaskExecutionContext = dependenceTaskExecutionContext;
}
- public List<String> getResources() {
+ public Map<String, String> getResources() {
return resources;
}
- public void setResources(List<String> resources) {
+ public void setResources(Map<String, String> resources) {
this.resources = resources;
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 904914a..ee48ca0 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
+import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -32,7 +33,6 @@ import
org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import
org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import
org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
-import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -50,14 +50,10 @@ import
org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
-
/**
* TaskUpdateQueue consumer
*/
@@ -362,10 +358,10 @@ public class TaskPriorityQueueConsumer extends Thread{
}
/**
- * get resource full name list
+ * get resource map key is full name and value is tenantCode
*/
- private List<String> getResourceFullNames(TaskNode taskNode) {
- List<String> resourceFullNameList = new ArrayList<>();
+ private Map<String,String> getResourceFullNames(TaskNode taskNode) {
+ Map<String,String> resourceMap = new HashMap<>();
AbstractParameters baseParam =
TaskParametersUtils.getParameters(taskNode.getType(), taskNode.getParams());
if (baseParam != null) {
@@ -375,7 +371,10 @@ public class TaskPriorityQueueConsumer extends Thread{
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources =
projectResourceFiles.stream().filter(t -> t.getId() ==
0).collect(Collectors.toSet());
if (CollectionUtils.isNotEmpty(oldVersionResources)) {
-
resourceFullNameList.addAll(oldVersionResources.stream().map(resource ->
resource.getRes()).collect(Collectors.toSet()));
+
+ oldVersionResources.forEach(
+ (t)->resourceMap.put(t.getRes(),
processService.queryTenantCodeByResName(t.getRes(), ResourceType.FILE))
+ );
}
// get the resource id in order to get the resource names in
batch
@@ -386,13 +385,13 @@ public class TaskPriorityQueueConsumer extends Thread{
Integer[] resourceIds = resourceIdsSet.toArray(new
Integer[resourceIdsSet.size()]);
List<Resource> resources =
processService.listResourceByIds(resourceIds);
- resourceFullNameList.addAll(resources.stream()
- .map(resourceInfo -> resourceInfo.getFullName())
- .collect(Collectors.toList()));
+ resources.forEach(
+
(t)->resourceMap.put(t.getFullName(),processService.queryTenantCodeByResName(t.getFullName(),
ResourceType.FILE))
+ );
}
}
}
- return resourceFullNameList;
+ return resourceMap;
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index d2d783a..592060b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -18,13 +18,16 @@ package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.CommonUtils;
+import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
@@ -94,7 +97,6 @@ public class TaskExecuteThread implements Runnable {
// copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources(),
- taskExecutionContext.getTenantCode(),
logger);
taskExecutionContext.setTaskParams(taskNode.getParams());
@@ -224,22 +226,25 @@ public class TaskExecuteThread implements Runnable {
* @param logger
*/
private void downloadResource(String execLocalPath,
- List<String> projectRes,
- String tenantCode,
+ Map<String,String> projectRes,
Logger logger) throws Exception {
- if (CollectionUtils.isEmpty(projectRes)){
+ if (MapUtils.isEmpty(projectRes)){
return;
}
- for (String resource : projectRes) {
- File resFile = new File(execLocalPath, resource);
+ Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
+
+ for (Map.Entry<String,String> resource : resEntries) {
+ String fullName = resource.getKey();
+ String tenantCode = resource.getValue();
+ File resFile = new File(execLocalPath, fullName);
if (!resFile.exists()) {
try {
// query the tenant code of the resource according to the
name of the resource
- String resHdfsPath =
HadoopUtils.getHdfsResourceFileName(tenantCode, resource);
+ String resHdfsPath =
HadoopUtils.getHdfsResourceFileName(tenantCode, fullName);
logger.info("get resource file from hdfs :{}",
resHdfsPath);
- HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath,
execLocalPath + File.separator + resource, false, true);
+ HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath,
execLocalPath + File.separator + fullName, false, true);
}catch (Exception e){
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());