This is an automated email from the ASF dual-hosted git repository.

leonbao pushed a commit to branch dev-131
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev-131 by this push:
     new d212a63  fix 3112:download udf resource need find the tenant code of 
resource owner (#3114)
d212a63 is described below

commit d212a6340ce1d731f3e717a0d8bd6759b2ae60c1
Author: lgcareer <[email protected]>
AuthorDate: Thu Jul 2 14:42:22 2020 +0800

    fix 3112:download udf resource need find the tenant code of resource owner 
(#3114)
    
    * fix 3112:download udf resource need find the tenant code of resource owner
    
    * fix 2975:download resource need find the tenant code of resource owner
    
    * remove useless code
---
 .../server/entity/SQLTaskExecutionContext.java     | 16 +++---
 .../master/consumer/TaskPriorityQueueConsumer.java | 10 +++-
 .../dolphinscheduler/server/utils/UDFUtils.java    | 65 +++++++---------------
 .../server/worker/task/sql/SqlTask.java            |  3 +-
 4 files changed, 37 insertions(+), 57 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
index 97afb4f..210db5c 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SQLTaskExecutionContext.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.entity;
 import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 
 import java.io.Serializable;
-import java.util.List;
+import java.util.Map;
 
 /**
  *  SQL Task ExecutionContext
@@ -38,9 +38,9 @@ public class SQLTaskExecutionContext implements Serializable {
      */
     private String connectionParams;
     /**
-     * udf function list
+     * udf function tenant code map
      */
-    private List<UdfFunc> udfFuncList;
+    private Map<UdfFunc,String> udfFuncTenantCodeMap;
 
 
     public int getWarningGroupId() {
@@ -51,12 +51,12 @@ public class SQLTaskExecutionContext implements 
Serializable {
         this.warningGroupId = warningGroupId;
     }
 
-    public List<UdfFunc> getUdfFuncList() {
-        return udfFuncList;
+    public Map<UdfFunc, String> getUdfFuncTenantCodeMap() {
+        return udfFuncTenantCodeMap;
     }
 
-    public void setUdfFuncList(List<UdfFunc> udfFuncList) {
-        this.udfFuncList = udfFuncList;
+    public void setUdfFuncTenantCodeMap(Map<UdfFunc, String> 
udfFuncTenantCodeMap) {
+        this.udfFuncTenantCodeMap = udfFuncTenantCodeMap;
     }
 
     public String getConnectionParams() {
@@ -72,7 +72,7 @@ public class SQLTaskExecutionContext implements Serializable {
         return "SQLTaskExecutionContext{" +
                 "warningGroupId=" + warningGroupId +
                 ", connectionParams='" + connectionParams + '\'' +
-                ", udfFuncList=" + udfFuncList +
+                ", udfFuncTenantCodeMap=" + udfFuncTenantCodeMap +
                 '}';
     }
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index f68ad95..21995c3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -327,7 +327,13 @@ public class TaskPriorityQueueConsumer extends Thread{
             }
 
             List<UdfFunc> udfFuncList = 
processService.queryUdfFunListByids(udfFunIdsArray);
-            sqlTaskExecutionContext.setUdfFuncList(udfFuncList);
+            Map<UdfFunc,String> udfFuncMap = new HashMap<>();
+            for(UdfFunc udfFunc : udfFuncList) {
+                String tenantCode = 
processService.queryTenantCodeByResName(udfFunc.getResourceName(), 
ResourceType.UDF);
+                udfFuncMap.put(udfFunc,tenantCode);
+            }
+
+            sqlTaskExecutionContext.setUdfFuncTenantCodeMap(udfFuncMap);
         }
     }
 
@@ -369,7 +375,7 @@ public class TaskPriorityQueueConsumer extends Thread{
 
         if (baseParam != null) {
             List<ResourceInfo> projectResourceFiles = 
baseParam.getResourceFilesList();
-            if (projectResourceFiles != null) {
+            if (CollectionUtils.isNotEmpty(projectResourceFiles)) {
 
                 // filter the resources that the resource id equals 0
                 Set<ResourceInfo> oldVersionResources = 
projectResourceFiles.stream().filter(t -> t.getId() == 
0).collect(Collectors.toSet());
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
index 63efb24..3a8c8fe 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/UDFUtils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dolphinscheduler.server.utils;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
@@ -24,10 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.UdfFunc;
 import org.slf4j.Logger;
 
 import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
 
 import static 
org.apache.dolphinscheduler.common.utils.CollectionUtils.isNotEmpty;
 
@@ -43,53 +42,44 @@ public class UDFUtils {
 
     /**
      * create function list
-     * @param udfFuncs      udf functions
-     * @param tenantCode    tenant code
-     * @param logger        logger
+     * @param udfFuncTenantCodeMap  key is udf function,value is tenant code
+     * @param logger                logger
      * @return create function list
      */
-    public static List<String> createFuncs(List<UdfFunc> udfFuncs, String 
tenantCode,Logger logger){
+    public static List<String> createFuncs(Map<UdfFunc,String> 
udfFuncTenantCodeMap, Logger logger){
 
-        if (CollectionUtils.isEmpty(udfFuncs)){
+        if (MapUtils.isEmpty(udfFuncTenantCodeMap)){
             logger.info("can't find udf function resource");
             return null;
         }
-        // get  hive udf jar path
-        String hiveUdfJarPath = HadoopUtils.getHdfsUdfDir(tenantCode);
-        logger.info("hive udf jar path : {}" , hiveUdfJarPath);
-
-        // is the root directory of udf defined
-        if (StringUtils.isEmpty(hiveUdfJarPath)) {
-            logger.error("not define hive udf jar path");
-            throw new RuntimeException("hive udf jar base path not defined ");
-        }
-        Set<String> resources = getFuncResouces(udfFuncs);
         List<String> funcList = new ArrayList<>();
 
         // build jar sql
-        buildJarSql(funcList, resources, hiveUdfJarPath);
+        buildJarSql(funcList, udfFuncTenantCodeMap);
 
         // build temp function sql
-        buildTempFuncSql(funcList, udfFuncs);
+        buildTempFuncSql(funcList, 
udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList()));
 
         return funcList;
     }
 
     /**
      * build jar sql
-     * @param sqls          sql list
-     * @param resources     resource set
-     * @param uploadPath    upload path
+     * @param sqls                  sql list
+     * @param udfFuncTenantCodeMap  key is udf function,value is tenant code
      */
-    private static void buildJarSql(List<String> sqls, Set<String> resources, 
String uploadPath) {
+    private static void buildJarSql(List<String> sqls, Map<UdfFunc,String> 
udfFuncTenantCodeMap) {
         String defaultFS = 
HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
-        if (!uploadPath.startsWith("hdfs:")) {
-            uploadPath = defaultFS + uploadPath;
-        }
 
-        for (String resource : resources) {
-            sqls.add(String.format("add jar %s/%s", uploadPath, resource));
+        Set<Map.Entry<UdfFunc,String>> entries = 
udfFuncTenantCodeMap.entrySet();
+        for (Map.Entry<UdfFunc,String> entry:entries){
+            String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
+            if (!uploadPath.startsWith("hdfs:")) {
+                uploadPath = defaultFS + uploadPath;
+            }
+            sqls.add(String.format("add jar %s%s", uploadPath, 
entry.getKey().getResourceName()));
         }
+
     }
 
     /**
@@ -106,20 +96,5 @@ public class UDFUtils {
         }
     }
 
-    /**
-     * get the resource names of all functions
-     * @param udfFuncs udf function list
-     * @return
-     */
-    private static Set<String> getFuncResouces(List<UdfFunc> udfFuncs) {
-        Set<String> resources = new HashSet<>();
-
-        for (UdfFunc udfFunc : udfFuncs) {
-            resources.add(udfFunc.getResourceName());
-        }
-
-        return resources;
-    }
-
 
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
index 5511c6a..941404a 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
@@ -131,8 +131,7 @@ public class SqlTask extends AbstractTask {
                     .map(this::getSqlAndSqlParamsMap)
                     .collect(Collectors.toList());
 
-            List<String> createFuncs = 
UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(),
-                    taskExecutionContext.getTenantCode(),
+            List<String> createFuncs = 
UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
                     logger);
 
             // execute sql task

Reply via email to