This is an automated email from the ASF dual-hosted git repository.
allenlliu pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
new fb1875e Fix issue that Hive and Spark EC have a
ClassNotFoundException bug when using jar UDF close#1250
new 20aba70 Merge pull request #1251 from peacewong/dec-1.0.3-udf
fb1875e is described below
commit fb1875ebfacfee022bc0717ce02b02482d3eb57b
Author: peacewong <[email protected]>
AuthorDate: Wed Dec 29 12:16:40 2021 +0800
Fix issue that Hive and Spark EC have a ClassNotFoundException bug when
using jar UDF close#1250
---
.../apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala | 2 +-
.../hive/src/main/resources/linkis-engineconn.properties | 2 +-
.../engineplugin/hive/hook/HiveAddJarsEngineHook.scala | 11 +++++++++++
.../launch/SparkSubmitProcessEngineConnLaunchBuilder.scala | 13 ++++++-------
.../engineplugin/common/launch/process/Environment.scala | 12 +++++++++++-
5 files changed, 30 insertions(+), 10 deletions(-)
diff --git
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
index 62ebcb3..aedabe8 100644
---
a/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
+++
b/linkis-computation-governance/linkis-engineconn-manager/linkis-engineconn-manager-server/src/main/scala/org/apache/linkis/ecm/server/hook/JarUDFLoadECMHook.scala
@@ -31,7 +31,7 @@ class JarUDFLoadECMHook extends ECMHook with Logging {
info("start loading UDFs")
val udfInfos = UDFClient.getUdfInfos(request.user,"udf").filter{ info
=> info.getUdfType == 0 && info.getExpire == false &&
StringUtils.isNotBlank(info.getPath) && info.getLoad == true }
udfInfos.foreach{ udfInfo =>
- LaunchConstants.addPathToClassPath(pel.environment, udfInfo.getPath)
+ LaunchConstants.addPathToUDFPath(pel.environment, udfInfo.getPath)
}
}
}
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
index b0a1b22..d816917 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
+++
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/resources/linkis-engineconn.properties
@@ -19,4 +19,4 @@ wds.linkis.engineconn.debug.enable=true
#wds.linkis.keytab.enable=true
wds.linkis.engineconn.plugin.default.class=org.apache.linkis.engineplugin.hive.HiveEngineConnPlugin
wds.linkis.bdp.hive.init.sql.enable=true
-wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
\ No newline at end of file
+wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.executor.hook.ComputationEngineConnHook,org.apache.linkis.engineplugin.hive.hook.HiveAddJarsEngineHook,org.apache.linkis.engineconn.computation.executor.hook.JarUdfEngineHook
\ No newline at end of file
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
index 1aaac35..77b6bfd 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
+++
b/linkis-engineconn-plugins/engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala
@@ -27,6 +27,8 @@ import
org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel,
RunType}
import org.apache.commons.lang.StringUtils
+import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
import scala.collection.JavaConversions._
@@ -43,11 +45,20 @@ class HiveAddJarsEngineHook extends EngineConnHook with
Logging {
override def afterExecutionExecute(engineCreationContext:
EngineCreationContext, engineConn: EngineConn): Unit = Utils.tryAndError {
val options = engineCreationContext.getOptions
var jars: String = ""
+ val udf_jars = CommonVars(Environment.UDF_JARS.toString, "", "UDF jar
PAth").getValue
+ logger.info("udf jar_path:" + udf_jars)
options foreach {
case (key, value) => if (JARS.equals(key)) {
jars = value
}
}
+ if (StringUtils.isNotEmpty(udf_jars)) {
+ if (StringUtils.isNotEmpty(jars)) {
+ jars = jars + "," + udf_jars
+ } else {
+ jars = udf_jars
+ }
+ }
val codeLanguageLabel = new CodeLanguageLabel
codeLanguageLabel.setCodeType(RunType.HIVE.toString)
val labels = Array[Label[_]](codeLanguageLabel)
diff --git
a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index 7c22784..5147c81 100644
---
a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++
b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -33,16 +33,13 @@ import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.protocol.UserWithCreator
import org.apache.commons.lang.StringUtils
+
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.linkis.hadoop.common.conf.HadoopConf
-/**
- *
- * @Date 2020/10/23
- */
class SparkSubmitProcessEngineConnLaunchBuilder private extends
JavaProcessEngineConnLaunchBuilder {
private[this] val fsRoot = "hdfs://"
@@ -172,9 +169,11 @@ class SparkSubmitProcessEngineConnLaunchBuilder private
extends JavaProcessEngin
//addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue))
// info("No need to add jars for " + _jars.map(fromPath).exists(x =>
x.equals("hdfs:///")).toString())
- _jars = _jars.filter(_.isNotBlankPath())
-
- if(_jars.nonEmpty) {
+ if (_jars.isEmpty) {
+ _jars += AbsolutePath("")
+ }
+ _jars += AbsolutePath(variable(UDF_JARS))
+ if (_jars.nonEmpty) {
addList("--jars", _jars.map(fromPath))
}
diff --git
a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
index f0e426d..db2966f 100644
---
a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
+++
b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/Environment.scala
@@ -17,13 +17,15 @@
package org.apache.linkis.manager.engineplugin.common.launch.process
+import org.apache.commons.lang.StringUtils
+
object Environment extends Enumeration {
type Environment = Value
val USER, ECM_HOME, PWD, PATH, SHELL, JAVA_HOME, CLASSPATH,
HADOOP_HOME, HADOOP_CONF_DIR, HIVE_CONF_DIR, LOG_DIRS, TEMP_DIRS,
- ECM_HOST, ECM_PORT, RANDOM_PORT,
SERVICE_DISCOVERY,EUREKA_PREFER_IP,ENGINECONN_ENVKEYS = Value
+ ECM_HOST, ECM_PORT, RANDOM_PORT, SERVICE_DISCOVERY,EUREKA_PREFER_IP,
UDF_JARS, ENGINECONN_ENVKEYS = Value
def variable(environment: Environment): String =
LaunchConstants.EXPANSION_MARKER_LEFT + environment +
LaunchConstants.EXPANSION_MARKER_RIGHT
@@ -46,4 +48,12 @@ object LaunchConstants {
env.put(Environment.CLASSPATH.toString, v)
}
+ def addPathToUDFPath(env: java.util.Map[String, String], value: String):
Unit = {
+ if (StringUtils.isBlank(value)) return
+ val v = if(env.containsKey(Environment.UDF_JARS.toString)) {
+ env.get(Environment.UDF_JARS.toString) + "," + value
+ } else value
+ env.put(Environment.UDF_JARS.toString, v)
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]