This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.2.0
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.2.0 by this push:
new fc3be7dc0 refactor(spark): get scala version from
util.properties.versionNumber… (#2419)
fc3be7dc0 is described below
commit fc3be7dc009c532f14bcd09c304d97d4da6ade3e
Author: Jack Xu <[email protected]>
AuthorDate: Fri Jul 8 11:57:09 2022 +0800
refactor(spark): get scala version from util.properties.versionNumber…
(#2419)
* refactor(spark): get scala version from
util.properties.versionNumberString
---
.../spark/config/SparkConfiguration.scala | 2 --
.../spark/executor/SparkScalaExecutor.scala | 28 ++++++++++++----------
2 files changed, 15 insertions(+), 15 deletions(-)
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index f2bf3e62a..66c27f4ba 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -34,8 +34,6 @@ object SparkConfiguration extends Logging {
val SPARK_LOOP_INIT_TIME =
CommonVars[TimeType]("wds.linkis.engine.spark.spark-loop.init.time", new
TimeType("120s"))
val SPARK_LANGUAGE_REPL_INIT_TIME =
CommonVars[TimeType]("wds.linkis.engine.spark.language-repl.init.time", new
TimeType("30s"))
val SPARK_REPL_CLASSDIR = CommonVars[String]("spark.repl.classdir", "",
"默认master")
- val SPARK_SCALA_VERSION = CommonVars("linkis.spark.scala.version", "2.11")
-
val PROXY_USER = CommonVars[String]("spark.proxy.user", "${UM}")
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
index d0a5b22b8..630cb4dc6 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkScalaExecutor.scala
@@ -18,8 +18,9 @@
package org.apache.linkis.engineplugin.spark.executor
import org.apache.commons.io.IOUtils
-import org.apache.commons.lang.StringUtils
-import org.apache.commons.lang.exception.ExceptionUtils
+import org.apache.commons.lang3.StringUtils
+import org.apache.commons.lang3.exception.ExceptionUtils
+import org.apache.commons.lang3.reflect.FieldUtils
import org.apache.linkis.common.utils.Utils
import
org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import org.apache.linkis.engineconn.computation.executor.rs.RsOutputStream
@@ -40,6 +41,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import java.io.{BufferedReader, File}
import _root_.scala.tools.nsc.GenericRunnerSettings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, NamedParam, Results,
SimpleReader, StdReplTags, isReplPower, replProps}
+import scala.util.Properties.versionNumberString
class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
extends SparkEngineConnExecutor(sparkEngineSession.sparkContext, id) {
@@ -208,7 +210,7 @@ class SparkScalaExecutor(sparkEngineSession:
SparkEngineSession, id: Long) exten
if (StringUtils.isNotBlank(errorMsg)) {
val errorMsgLowCase = errorMsg.toLowerCase
fatalLogs.foreach(fatalLog =>
- if ( errorMsgLowCase.contains(fatalLog) ) {
+ if (errorMsgLowCase.contains(fatalLog)) {
logger.error(s"match engineConn log fatal logs,is $fatalLog")
flag = true
}
@@ -256,10 +258,15 @@ class SparkScalaExecutor(sparkEngineSession:
SparkEngineSession, id: Long) exten
sparkILoop.settings = settings
sparkILoop.createInterpreter()
- val in0 = SparkConfiguration.SPARK_SCALA_VERSION.getValue match {
- case "2.11" => getField(sparkILoop,
"scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
- case "2.12" => getDeclareField(sparkILoop,
"in0").asInstanceOf[Option[BufferedReader]]
+ val isScala211 = versionNumberString.startsWith("2.11");
+ val in0 = if (isScala211) {
+ getField(sparkILoop,
"scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
+ } else {
+ // TODO: have problem with scala2.13 or higher
+ FieldUtils.readDeclaredField(sparkILoop, "in0", true)
+ .asInstanceOf[Option[BufferedReader]]
}
+
val reader = in0.fold(sparkILoop.chooseReader(settings))(r =>
SimpleReader(r,
jOut, interactive = true))
@@ -273,13 +280,8 @@ class SparkScalaExecutor(sparkEngineSession:
SparkEngineSession, id: Long) exten
field.setAccessible(true)
field.get(obj)
}
- private def getDeclareField(obj: Object, name: String): Object = {
- val field = obj.getClass.getDeclaredField(name)
- field.setAccessible(true)
- field.get(obj)
- }
- def bindSparkSession = {
+ def bindSparkSession: Unit = {
require(sparkContext != null)
require(sparkSession != null)
require(_sqlContext != null)
@@ -334,7 +336,7 @@ class EngineExecutionContextFactory {
def setEngineExecutionContext(engineExecutionContext:
EngineExecutionContext): Unit = this.engineExecutionContext =
engineExecutionContext
- def getEngineExecutionContext = this.engineExecutionContext
+ def getEngineExecutionContext: EngineExecutionContext =
this.engineExecutionContext
}
object SparkScalaExecutor {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]