This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch dev-131
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev-131 by this push:
new d212a63 fix 3112:download udf resource need find the tenant code of
resource owner (#3114)
d212a63 is described below
commit d212a6340ce1d731f3e717a0d8bd6759b2ae60c1
Author: lgcareer <[email protected]>
AuthorDate: Thu Jul 2 14:42:22 2020 +0800
fix 3112:download udf resource need find the tenant code of resource owner
(#3114)
* fix 3112:download udf resource need find the tenant code of resource owner
* fix 2975:download resource need find the tenant code of resource owner
* remove useless code
---
.../server/entity/SQLTaskExecutionContext.java | 16 +++---
.../master/consumer/TaskPriorityQueueConsumer.java | 10 +++-
.../dolphinscheduler/server/utils/UDFUtils.java | 65 +++++++---------------
.../server/worker/task/sql/SqlTask.java | 3 +-
4 files changed, 37 insertions(+), 57 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
index 97afb4f..210db5c 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import java.io.Serializable;
-import java.util.List;
+import java.util.Map;
/**
* SQL Task ExecutionContext
@@ -38,9 +38,9 @@ public class SQLTaskExecutionContext implements Serializable {
*/
private String connectionParams;
/**
- * udf function list
+ * udf function tenant code map
*/
- private List<UdfFunc> udfFuncList;
+ private Map<UdfFunc,String> udfFuncTenantCodeMap;
public int getWarningGroupId() {
@@ -51,12 +51,12 @@ public class SQLTaskExecutionContext implements
Serializable {
this.warningGroupId = warningGroupId;
}
- public List<UdfFunc> getUdfFuncList() {
- return udfFuncList;
+ public Map<UdfFunc, String> getUdfFuncTenantCodeMap() {
+ return udfFuncTenantCodeMap;
}
- public void setUdfFuncList(List<UdfFunc> udfFuncList) {
- this.udfFuncList = udfFuncList;
+ public void setUdfFuncTenantCodeMap(Map<UdfFunc, String>
udfFuncTenantCodeMap) {
+ this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
}
public String getConnectionParams() {
@@ -72,7 +72,7 @@ public class SQLTaskExecutionContext implements Serializable {
return "SQLTaskExecutionContext{" +
"warningGroupId=" + warningGroupId +
", connectionParams='" + connectionParams + '\'' +
- ", udfFuncList=" + udfFuncList +
+ ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap +
'}';
}
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f68ad95..21995c3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -327,7 +327,13 @@ public class TaskPriorityQueueConsumer extends Thread{
}
List<UdfFunc> udfFuncList =
processService.queryUdfFunListByids(udfFunIdsArray);
- sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+ Map<UdfFunc,String> udfFuncMap = new HashMap<>();
+ for(UdfFunc udfFunc : udfFuncList) {
+ String tenantCode =
processService.queryTenantCodeByResName(udfFunc.getResourceName(),
ResourceType.UDF);
+ udfFuncMap.put(udfFunc,tenantCode);
+ }
+
+ sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
}
}
@@ -369,7 +375,7 @@ public class TaskPriorityQueueConsumer extends Thread{
if (baseParam != null) {
List<ResourceInfo> projectResourceFiles =
baseParam.getResourceFilesList();
- if (projectResourceFiles != null) {
+ if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
// filter the resources that the resource id equals 0
Set<ResourceInfo> oldVersionResources =
projectResourceFiles.stream().filter(t -> t.getId() ==
0).collect(Collectors.toSet());
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
index 63efb24..3a8c8fe 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
@@ -16,6 +16,7 @@
*/
package org.apache.dolphinscheduler.server.utils;
+import org.apache.commons.collections.MapUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
@@ -24,10 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.slf4j.Logger;
import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
import static
org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty;
@@ -43,53 +42,44 @@ public class UDFUtils {
/**
* create function list
- * @param udfFuncs udf functions
- * @param tenantCode tenant code
- * @param logger logger
+ * @param udfFuncTenantCodeMap key is udf function,value is tenant code
+ * @param logger logger
* @return create function list
*/
- public static List<String> createFuncs(List<UdfFunc> udfFuncs, String
tenantCode,Logger logger){
+ public static List<String> createFuncs(Map<UdfFunc,String>
udfFuncTenantCodeMap, Logger logger){
- if (CollectionUtils.isEmpty(udfFuncs)){
+ if (MapUtils.isEmpty(udfFuncTenantCodeMap)){
logger.info("can't find udf function resource");
return null;
}
- // get hive udf jar path
- String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
- logger.info("hive udf jar path : {}" , hiveUdfJarPath);
-
- // is the root directory of udf defined
- if (StringUtils.isEmpty(hiveUdfJarPath)) {
- logger.error("not define hive udf jar path");
- throw new RuntimeException("hive udf jar base path not defined ");
- }
- Set<String> resources = getFuncResouces(udfFuncs);
List<String> funcList = new ArrayList<>();
// build jar sql
- buildJarSql(funcList, resources, hiveUdfJarPath);
+ buildJarSql(funcList, udfFuncTenantCodeMap);
// build temp function sql
- buildTempFuncSql(funcList, udfFuncs);
+ buildTempFuncSql(funcList,
udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList()));
return funcList;
}
/**
* build jar sql
- * @param sqls sql list
- * @param resources resource set
- * @param uploadPath upload path
+ * @param sqls sql list
+ * @param udfFuncTenantCodeMap key is udf function,value is tenant code
*/
- private static void buildJarSql(List<String> sqls, Set<String> resources,
String uploadPath) {
+ private static void buildJarSql(List<String> sqls, Map<UdfFunc,String>
udfFuncTenantCodeMap) {
String defaultFS =
HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
- if (!uploadPath.startsWith("hdfs:")) {
- uploadPath = defaultFS + uploadPath;
- }
- for (String resource : resources) {
- sqls.add(String.format("add jar %s/%s", uploadPath, resource));
+ Set<Map.Entry<UdfFunc,String>> entries =
udfFuncTenantCodeMap.entrySet();
+ for (Map.Entry<UdfFunc,String> entry:entries){
+ String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
+ if (!uploadPath.startsWith("hdfs:")) {
+ uploadPath = defaultFS + uploadPath;
+ }
+ sqls.add(String.format("add jar %s%s", uploadPath,
entry.getKey().getResourceName()));
}
+
}
/**
@@ -106,20 +96,5 @@ public class UDFUtils {
}
}
- /**
- * get the resource names of all functions
- * @param udfFuncs udf function list
- * @return
- */
- private static Set<String> getFuncResouces(List<UdfFunc> udfFuncs) {
- Set<String> resources = new HashSet<>();
-
- for (UdfFunc udfFunc : udfFuncs) {
- resources.add(udfFunc.getResourceName());
- }
-
- return resources;
- }
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 5511c6a..941404a 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -131,8 +131,7 @@ public class SqlTask extends AbstractTask {
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
- List<String> createFuncs =
UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(),
- taskExecutionContext.getTenantCode(),
+ List<String> createFuncs =
UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
logger);
// execute sql task