This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 922df460d9f0e9e973f28d79ccb0d8f729388a06
Author: peacewong <[email protected]>
AuthorDate: Tue Oct 10 21:11:10 2023 +0800

    Supports spark.conf parameters to configure multiple parameters
---
 .../spark/config/SparkConfiguration.scala          |  1 +
 ...SparkSubmitProcessEngineConnLaunchBuilder.scala | 32 ++++++++++++++++++----
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index bb079b7b5..14bebcb95 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -165,6 +165,7 @@ object SparkConfiguration extends Logging {
 
   val REPLACE_PACKAGE_TO_HEADER = "org.apache.linkis"
 
+  val LINKIS_SPARK_CONF = CommonVars[String]("spark.conf", "")
   val SPARK_APPLICATION_ARGS = CommonVars("spark.app.args", "")
   val SPARK_APPLICATION_MAIN_CLASS = CommonVars("spark.app.main.class", "")
 
diff --git 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
index 2487ede90..7e2d56888 100644
--- 
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
+++ 
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala
@@ -18,9 +18,11 @@
 package org.apache.linkis.engineplugin.spark.launch
 
 import org.apache.linkis.common.conf.CommonVars
+import org.apache.linkis.common.utils.Logging
 import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
 import org.apache.linkis.engineplugin.spark.config.SparkConfiguration.{
   ENGINE_JAR,
+  LINKIS_SPARK_CONF,
   SPARK_APP_NAME,
   SPARK_DEFAULT_EXTERNAL_JARS_PATH,
   SPARK_DEPLOY_MODE,
@@ -49,7 +51,8 @@ import org.apache.commons.lang3.StringUtils
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLaunchBuilder) {
+class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLaunchBuilder)
+    extends Logging {
 
   def getCommands(
       engineConnBuildRequest: EngineConnBuildRequest,
@@ -57,10 +60,21 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
       gcLogDir: String,
       logDir: String
   ): Array[String] = {
-    val userEngineResource = engineConnBuildRequest.engineResource
-    val darResource = 
userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
     val properties = engineConnBuildRequest.engineConnCreationDesc.properties
-
+    val sparkConf = getValueAndRemove(properties, LINKIS_SPARK_CONF)
+    // sparkcsonf 
DEMO:spark.sql.shuffle.partitions=10;spark.memory.fraction=0.6
+    if (StringUtils.isNotBlank(sparkConf)) {
+      val strArrary = sparkConf.split(";").toList
+      strArrary.foreach { keyAndValue =>
+        val key = keyAndValue.split("=")(0).trim
+        val value = keyAndValue.split("=")(1).trim
+        if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
+          engineConnBuildRequest.engineConnCreationDesc.properties.put(key, 
value)
+        } else {
+          logger.warn(s"spark conf has empty value, key:${key}, 
value:${value}")
+        }
+      }
+    }
     val className = getValueAndRemove(properties, "className", mainClass)
     val driverCores = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_CORES)
     val driverMemory = getValueAndRemove(properties, 
LINKIS_SPARK_DRIVER_MEMORY)
@@ -68,6 +82,8 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
     val executorMemory = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_MEMORY)
     val numExecutors = getValueAndRemove(properties, 
LINKIS_SPARK_EXECUTOR_INSTANCES)
 
+    val userEngineResource = engineConnBuildRequest.engineResource
+    val darResource = 
userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
     val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "")
       .split(",")
       .filter(isNotBlankPath)
@@ -78,7 +94,13 @@ class SparkSubmitProcessEngineConnLaunchBuilder(builder: 
JavaProcessEngineConnLa
     jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
       .split(",")
       .filter(x => {
-        isNotBlankPath(x) && (new java.io.File(x)).isFile
+        val isPath = isNotBlankPath(x)
+        // filter by isFile cannot support this case:
+        // The cg-linkismanager startup user is inconsistent with the 
engineconn startup user
+
+        // val isFile = (new java.io.File(x)).isFile
+        logger.info(s"file:${x}, check isPath:${isPath}")
+        isPath
       })
     val pyFiles = getValueAndRemove(properties, "py-files", 
"").split(",").filter(isNotBlankPath)
     val archives = getValueAndRemove(properties, "archives", 
"").split(",").filter(isNotBlankPath)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to