This is an automated email from the ASF dual-hosted git repository.

allenlliu pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
     new fb1875e  Fix issue that Hive and Spark EC have a 
ClassNotFoundException bug when using jar UDF close#1250
     new 20aba70  Merge pull request #1251 from peacewong/dec-1.0.3-udf
fb1875e is described below

commit fb1875ebfacfee022bc0717ce02b02482d3eb57b
Author: peacewong <[email protected]>
AuthorDate: Wed Dec 29 12:16:40 2021 +0800

    Fix issue that Hive and Spark EC have a ClassNotFoundException bug when 
using jar UDF close#1250
---
 .../apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala   |  2 +-
 .../hive/src/main/resources/linkis-engineconn.properties    |  2 +-
 .../engineplugin/hive/hook/HiveAddJarsEngineHook.scala      | 11 +++++++++++
 .../launch/SparkSubmitProcessEngineConnLaunchBuilder.scala  | 13 ++++++-------
 .../engineplugin/common/launch/process/Environment.scala    | 12 +++++++++++-
 5 files changed, 30 insertions(+), 10 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
index 62ebcb3..aedabe8 100644
--- 
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
+++ 
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
@@ -31,7 +31,7 @@ class JarUDFLoadECMHook extends ECMHook with Logging {
         info("start loading UDFs")
         val udfInfos = UDFClient.getUdfInfos(request.user,"udf").filter{ info 
=> info.getUdfType == 0 && info.getExpire == false && 
StringUtils.isNotBlank(info.getPath) && info.getLoad == true }
         udfInfos.foreach{ udfInfo =>
-          LaunchConstants.addPathToClassPath(pel.environment, udfInfo.getPath)
+          LaunchConstants.addPathToUDFPath(pel.environment, udfInfo.getPath)
         }
     }
   }
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
 
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
index b0a1b22..d816917 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
@@ -19,4 +19,4 @@ wds.linkis.engineconn.debug.enable=true
 #wds.linkis.keytab.enable=true
 
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.hive.HiveEngineConnPlugin
 wds.linkis.bdp.hive.init.sql.enable=true
-wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
\ No newline at end of file
+wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineplugin.hive.hook.HiveAddJarsEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
\ No newline at end of file
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
index 1aaac35..77b6bfd 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
@@ -27,6 +27,8 @@ import 
org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
 import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, 
RunType}
 import org.apache.commons.lang.StringUtils
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
 
 import scala.collection.JavaConversions._
 
@@ -43,11 +45,20 @@ class HiveAddJarsEngineHook extends EngineConnHook with 
Logging {
   override def afterExecutionExecute(engineCreationContext: 
EngineCreationContext, engineConn: EngineConn): Unit = Utils.tryAndError {
     val options = engineCreationContext.getOptions
     var jars: String = ""
+    val udf_jars = CommonVars(Environment.UDF_JARS.toString, "", "UDF jar 
PAth").getValue
+    logger.info("udf jar_path:" + udf_jars)
     options foreach {
       case (key, value) => if (JARS.equals(key)) {
         jars = value
       }
     }
+    if (StringUtils.isNotEmpty(udf_jars)) {
+      if (StringUtils.isNotEmpty(jars)) {
+        jars = jars + "," + udf_jars
+      } else {
+        jars = udf_jars
+      }
+    }
     val codeLanguageLabel = new CodeLanguageLabel
     codeLanguageLabel.setCodeType(RunType.HIVE.toString)
     val labels = Array[Label[_]](codeLanguageLabel)
diff --git 
a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index 7c22784..5147c81 100644
--- 
a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -33,16 +33,13 @@ import org.apache.linkis.manager.label.entity.Label
 import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
 import org.apache.linkis.protocol.UserWithCreator
 import org.apache.commons.lang.StringUtils
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import org.apache.linkis.hadoop.common.conf.HadoopConf
 
 
-/**
 
- *
- * @Date 2020/10/23
- */
 class SparkSubmitProcessEngineConnLaunchBuilder private extends 
JavaProcessEngineConnLaunchBuilder {
 
   private[this] val fsRoot = "hdfs://"
@@ -172,9 +169,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder private 
extends JavaProcessEngin
 
     //addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue))
 //    info("No need to add jars for " + _jars.map(fromPath).exists(x => 
x.equals("hdfs:///")).toString())
-    _jars = _jars.filter(_.isNotBlankPath())
-
-    if(_jars.nonEmpty) {
+    if (_jars.isEmpty) {
+      _jars += AbsolutePath("")
+    }
+    _jars += AbsolutePath(variable(UDF_JARS))
+    if (_jars.nonEmpty) {
       addList("--jars", _jars.map(fromPath))
     }
 
diff --git 
a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
 
b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
index f0e426d..db2966f 100644
--- 
a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
+++ 
b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
@@ -17,13 +17,15 @@
  
 package org.apache.linkis.manager.engineplugin.common.launch.process
 
+import org.apache.commons.lang.StringUtils
+
 
 object Environment extends Enumeration {
 
   type Environment = Value
   val USER, ECM_HOME, PWD, PATH, SHELL, JAVA_HOME, CLASSPATH,
       HADOOP_HOME, HADOOP_CONF_DIR, HIVE_CONF_DIR, LOG_DIRS, TEMP_DIRS,
-      ECM_HOST, ECM_PORT, RANDOM_PORT, 
SERVICE_DISCOVERY,EUREKA_PREFER_IP,ENGINECONN_ENVKEYS = Value
+      ECM_HOST, ECM_PORT, RANDOM_PORT, SERVICE_DISCOVERY,EUREKA_PREFER_IP, 
UDF_JARS, ENGINECONN_ENVKEYS = Value
 
   def variable(environment: Environment): String = 
LaunchConstants.EXPANSION_MARKER_LEFT + environment + 
LaunchConstants.EXPANSION_MARKER_RIGHT
 
@@ -46,4 +48,12 @@ object LaunchConstants {
     env.put(Environment.CLASSPATH.toString, v)
   }
 
+  def addPathToUDFPath(env: java.util.Map[String, String], value: String): 
Unit = {
+    if (StringUtils.isBlank(value)) return
+    val v = if(env.containsKey(Environment.UDF_JARS.toString)) {
+      env.get(Environment.UDF_JARS.toString) + "," + value
+    } else value
+    env.put(Environment.UDF_JARS.toString, v)
+  }
+
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to