This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 922df460d9f0e9e973f28d79ccb0d8f729388a06 Author: peacewong <[email protected]> AuthorDate: Tue Oct 10 21:11:10 2023 +0800 Supports spark.conf parameters to configure multiple parameters --- .../spark/config/SparkConfiguration.scala | 1 + ...SparkSubmitProcessEngineConnLaunchBuilder.scala | 32 ++++++++++++++++++---- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index bb079b7b5..14bebcb95 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -165,6 +165,7 @@ object SparkConfiguration extends Logging { val REPLACE_PACKAGE_TO_HEADER = "org.apache.linkis" + val LINKIS_SPARK_CONF = CommonVars[String]("spark.conf", "") val SPARK_APPLICATION_ARGS = CommonVars("spark.app.args", "") val SPARK_APPLICATION_MAIN_CLASS = CommonVars("spark.app.main.class", "") diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala index 2487ede90..7e2d56888 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala @@ -18,9 +18,11 @@ package org.apache.linkis.engineplugin.spark.launch import org.apache.linkis.common.conf.CommonVars +import org.apache.linkis.common.utils.Logging import org.apache.linkis.engineplugin.spark.config.SparkConfiguration import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{ ENGINE_JAR, + LINKIS_SPARK_CONF, SPARK_APP_NAME, SPARK_DEFAULT_EXTERNAL_JARS_PATH, SPARK_DEPLOY_MODE, @@ -49,7 +51,8 @@ import org.apache.commons.lang3.StringUtils import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLaunchBuilder) { +class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLaunchBuilder) + extends Logging { def getCommands( engineConnBuildRequest: EngineConnBuildRequest, @@ -57,10 +60,21 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa gcLogDir: String, logDir: String ): Array[String] = { - val userEngineResource = engineConnBuildRequest.engineResource - val darResource = userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource] val properties = engineConnBuildRequest.engineConnCreationDesc.properties - + val sparkConf = getValueAndRemove(properties, LINKIS_SPARK_CONF) + // sparkcsonf DEMO:spark.sql.shuffle.partitions=10;spark.memory.fraction=0.6 + if (StringUtils.isNotBlank(sparkConf)) { + val strArrary = sparkConf.split(";").toList + strArrary.foreach { keyAndValue => + val key = keyAndValue.split("=")(0).trim + val value = keyAndValue.split("=")(1).trim + if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) { + engineConnBuildRequest.engineConnCreationDesc.properties.put(key, value) + } else { + logger.warn(s"spark conf has empty value, key:${key}, value:${value}") + } + } + } val className = getValueAndRemove(properties, "className", mainClass) val driverCores = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_CORES) val driverMemory = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_MEMORY) @@ -68,6 +82,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY) val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES) + val userEngineResource = engineConnBuildRequest.engineResource + val darResource = userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource] val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "") .split(",") .filter(isNotBlankPath) @@ -78,7 +94,13 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: JavaProcessEngineConnLa jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH) .split(",") .filter(x => { - isNotBlankPath(x) && (new java.io.File(x)).isFile + val isPath = isNotBlankPath(x) + // filter by isFile cannot support this case: + // The cg-linkismanager startup user is inconsistent with the engineconn startup user + + // val isFile = (new java.io.File(x)).isFile + logger.info(s"file:${x}, check isPath:${isPath}") + isPath }) val pyFiles = getValueAndRemove(properties, "py-files", "").split(",").filter(isNotBlankPath) val archives = getValueAndRemove(properties, "archives", "").split(",").filter(isNotBlankPath) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
