This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9f9e26a96 [Improve] spark-submit some improvements (#3901)
9f9e26a96 is described below

commit 9f9e26a96cc696eb0d481ebf8d78b17eec5d7672
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 20 22:10:33 2024 +0800

    [Improve] spark-submit some improvements (#3901)
    
    * [Improve] spark-submit someimprovements
    
    * [Improve] enum minor improvements
---
 .../common/enums/SparkDevelopmentMode.java         | 60 ++++++++++++++++++++++
 .../common/enums/SparkExecutionMode.java           |  1 +
 .../console/core/entity/SparkApplication.java      |  4 +-
 .../impl/SparkApplicationActionServiceImpl.java    | 27 ++++------
 .../flink/client/bean/SubmitRequest.scala          |  7 ++-
 .../streampark/spark/client/bean/StopRequest.scala |  6 +--
 .../spark/client/bean/StopResponse.scala           |  2 +-
 .../spark/client/bean/SubmitRequest.scala          | 28 +++++-----
 .../spark/client/bean/SubmitResponse.scala         |  7 +--
 .../spark/client/SparkClientEndpoint.scala         |  8 +--
 ...arnApplicationClient.scala => YarnClient.scala} | 22 ++++----
 .../spark/client/trait/SparkClientTrait.scala      |  2 -
 12 files changed, 111 insertions(+), 63 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
new file mode 100644
index 000000000..29c5b9727
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.enums;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/** The flink deployment mode enum. */
+public enum SparkDevelopmentMode {
+
+    /** Unknown type replace null */
+    UNKNOWN("Unknown", -1),
+
+    /** custom code */
+    CUSTOM_CODE("Custom Code", 1),
+
+    /** spark SQL */
+    SPARK_SQL("Spark SQL", 2);
+
+    private final String name;
+
+    private final Integer mode;
+
+    SparkDevelopmentMode(@Nonnull String name, @Nonnull Integer mode) {
+        this.name = name;
+        this.mode = mode;
+    }
+
+    /**
+     * Try to resolve the mode value into {@link SparkDevelopmentMode}.
+     *
+     * @param value The mode value of potential flink deployment mode.
+     * @return The parsed flink deployment mode.
+     */
+    @Nonnull
+    public static SparkDevelopmentMode valueOf(@Nullable Integer value) {
+        for (SparkDevelopmentMode flinkDevelopmentMode : values()) {
+            if (flinkDevelopmentMode.mode.equals(value)) {
+                return flinkDevelopmentMode;
+            }
+        }
+        return SparkDevelopmentMode.UNKNOWN;
+    }
+
+}
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
index 25db6f1f5..d82d96f5d 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
@@ -37,6 +37,7 @@ public enum SparkExecutionMode {
 
     /** yarn client */
     YARN_CLIENT(3, "yarn-client");
+
     private final Integer mode;
 
     private final String name;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index c07e2b160..8b3eb5112 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -407,11 +407,11 @@ public class SparkApplication extends BaseEntity {
     @JsonIgnore
     @SneakyThrows
     @SuppressWarnings("unchecked")
-    public Map<String, Object> getOptionMap() {
+    public Map<String, String> getOptionMap() {
         if (StringUtils.isBlank(this.options)) {
             return new HashMap<>();
         }
-        Map<String, Object> optionMap = JacksonUtils.read(this.options, 
Map.class);
+        Map<String, String> optionMap = JacksonUtils.read(this.options, 
Map.class);
         optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
         return optionMap;
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 4ddb22c51..6b585bcf7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -21,7 +21,7 @@ import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.FlinkDevelopmentMode;
+import org.apache.streampark.common.enums.SparkDevelopmentMode;
 import org.apache.streampark.common.enums.SparkExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.AssertUtils;
@@ -211,17 +211,15 @@ public class SparkApplicationActionServiceImpl
 
         SparkEnv sparkEnv = 
sparkEnvService.getById(application.getVersionId());
 
-        Map<String, Object> properties = new HashMap<>();
+        Map<String, String> stopProper = new HashMap<>();
 
         StopRequest stopRequest =
             new StopRequest(
                 application.getId(),
                 sparkEnv.getSparkVersion(),
                 SparkExecutionMode.of(application.getExecutionMode()),
-                properties,
-                application.getJobId(),
-                appParam.getDrain(),
-                appParam.getNativeFormat());
+                stopProper,
+                application.getJobId());
 
         CompletableFuture<StopResponse> stopFuture =
             CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest), 
executorService);
@@ -322,7 +320,7 @@ public class SparkApplicationActionServiceImpl
             SparkExecutionMode.of(application.getExecutionMode()),
             getProperties(application),
             sparkEnv.getSparkConf(),
-            FlinkDevelopmentMode.of(application.getJobType()),
+            SparkDevelopmentMode.valueOf(application.getJobType()),
             application.getId(),
             jobId,
             application.getJobName(),
@@ -374,15 +372,10 @@ public class SparkApplicationActionServiceImpl
                     }
                 }
                 application.setAppId(response.clusterId());
-                if (StringUtils.isNoneEmpty(response.sparkAppId())) {
-                    application.setJobId(response.sparkAppId());
+                if (StringUtils.isNoneEmpty(response.clusterId())) {
+                    application.setJobId(response.clusterId());
                 }
-
-                if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
-                    application.setJobManagerUrl(response.jobManagerUrl());
-                    applicationLog.setTrackUrl(response.jobManagerUrl());
-                }
-                applicationLog.setSparkAppId(response.sparkAppId());
+                applicationLog.setSparkAppId(response.clusterId());
                 application.setStartTime(new Date());
                 application.setEndTime(null);
 
@@ -531,8 +524,8 @@ public class SparkApplicationActionServiceImpl
         return Tuple2.of(flinkUserJar, appConf);
     }
 
-    private Map<String, Object> getProperties(SparkApplication application) {
-        Map<String, Object> properties = new 
HashMap<>(application.getOptionMap());
+    private Map<String, String> getProperties(SparkApplication application) {
+        Map<String, String> properties = new 
HashMap<>(application.getOptionMap());
         if 
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
             String yarnQueue = (String) 
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
             String yarnLabelExpr = (String) 
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 5dc294484..cc1e795c0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,6 +26,7 @@ import 
org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResp
 import org.apache.streampark.flink.util.FlinkUtils
 import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
 
+import org.apache.commons.collections.MapUtils
 import org.apache.commons.io.FileUtils
 import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, 
SavepointRestoreSettings}
 
@@ -118,10 +119,14 @@ case class SubmitRequest(
     }
   }
 
-  def hasProp(key: String): Boolean = properties.containsKey(key)
+  def hasProp(key: String): Boolean = MapUtils.isNotEmpty(properties) && 
properties.containsKey(key)
 
   def getProp(key: String): Any = properties.get(key)
 
+  def hasExtra(key: String): Boolean = MapUtils.isNotEmpty(extraParameter) && 
extraParameter.containsKey(key)
+
+  def getExtra(key: String): Any = extraParameter.get(key)
+
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] 
= {
     if (this.appConf == null) {
       return Map.empty[String, String]
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
index 4e2ab56bc..2f8b5fcbb 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
@@ -28,7 +28,5 @@ case class StopRequest(
     id: Long,
     sparkVersion: SparkVersion,
     executionMode: SparkExecutionMode,
-    @Nullable properties: JavaMap[String, Any],
-    jobId: String,
-    withDrain: Boolean,
-    nativeFormat: Boolean)
+    @Nullable properties: JavaMap[String, String],
+    jobId: String)
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
index c8655d19b..42b480534 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
@@ -17,4 +17,4 @@
 
 package org.apache.streampark.spark.client.bean
 
-case class StopResponse(savePointDir: String)
+case class StopResponse(savePoint: String)
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
index 2feca4b0f..cf71c99b9 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
@@ -25,23 +25,22 @@ import org.apache.streampark.common.util.{DeflaterUtils, 
HdfsUtils, PropertiesUt
 import org.apache.streampark.flink.packer.pipeline.{BuildResult, 
ShadedBuildResponse}
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.commons.collections.MapUtils
 
 import javax.annotation.Nullable
 
 import java.io.{File, IOException}
-import java.net.URL
 import java.nio.file.Files
 import java.util.{Map => JavaMap}
 
 import scala.collection.convert.ImplicitConversions._
-import scala.util.Try
 
 case class SubmitRequest(
     sparkVersion: SparkVersion,
     executionMode: SparkExecutionMode,
-    properties: JavaMap[String, Any],
+    properties: JavaMap[String, String],
     sparkYaml: String,
-    developmentMode: FlinkDevelopmentMode,
+    developmentMode: SparkDevelopmentMode,
     id: Long,
     jobId: String,
     appName: String,
@@ -52,7 +51,7 @@ case class SubmitRequest(
     @Nullable buildResult: BuildResult,
     @Nullable extraParameter: JavaMap[String, Any]) {
 
-  val DEFAULT_SUBMIT_PARAM = Map[String, Any](
+  val DEFAULT_SUBMIT_PARAM = Map[String, String](
     "spark.driver.cores" -> "1",
     "spark.driver.memory" -> "1g",
     "spark.executor.cores" -> "1",
@@ -62,8 +61,9 @@ case class SubmitRequest(
     KEY_SPARK_PROPERTY_PREFIX)
 
   lazy val appMain: String = this.developmentMode match {
-    case FlinkDevelopmentMode.FLINK_SQL => 
Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
-    case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
+    case SparkDevelopmentMode.SPARK_SQL => 
Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
+    case SparkDevelopmentMode.CUSTOM_CODE => 
appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
+    case SparkDevelopmentMode.UNKNOWN => throw new 
IllegalArgumentException("Unknown deployment Mode")
   }
 
   lazy val effectiveAppName: String = if (this.appName == null) {
@@ -72,12 +72,6 @@ case class SubmitRequest(
     this.appName
   }
 
-  lazy val libs: List[URL] = {
-    val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
-    Try(new File(path).listFiles().map(_.toURI.toURL).toList)
-      .getOrElse(List.empty[URL])
-  }
-
   lazy val userJarPath: String = {
     executionMode match {
       case _ =>
@@ -86,6 +80,14 @@ case class SubmitRequest(
     }
   }
 
+  def hasProp(key: String): Boolean = MapUtils.isNotEmpty(properties) && 
properties.containsKey(key)
+
+  def getProp(key: String): Any = properties.get(key)
+
+  def hasExtra(key: String): Boolean = MapUtils.isNotEmpty(extraParameter) && 
extraParameter.containsKey(key)
+
+  def getExtra(key: String): Any = extraParameter.get(key)
+
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] 
= {
     if (this.appConf == null) {
       return Map.empty[String, String]
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
index 5ea75af01..b97b1f146 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
@@ -17,13 +17,8 @@
 
 package org.apache.streampark.spark.client.bean
 
-import javax.annotation.Nullable
-
 import java.util.{Map => JavaMap}
 
 case class SubmitResponse(
     clusterId: String,
-    sparkConfig: JavaMap[String, String],
-    var sparkAppId: String,
-    @Nullable jobId: String = "",
-    @Nullable jobManagerUrl: String = "")
+    sparkConfig: JavaMap[String, String])
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
index 437bf0ff2..6590f8c12 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
@@ -26,15 +26,15 @@ import org.apache.streampark.spark.client.impl._
 object SparkClientEndpoint {
 
   private[this] val clients: Map[SparkExecutionMode, SparkClientTrait] = Map(
-    YARN_CLUSTER -> YarnApplicationClient,
-    YARN_CLIENT -> YarnApplicationClient)
+    YARN_CLUSTER -> YarnClient,
+    YARN_CLIENT -> YarnClient)
 
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
     clients.get(submitRequest.executionMode) match {
       case Some(client) => client.submit(submitRequest)
       case _ =>
         throw new UnsupportedOperationException(
-          s"Unsupported ${submitRequest.executionMode} submit ")
+          s"Unsupported ${submitRequest.executionMode} spark submit.")
     }
   }
 
@@ -43,7 +43,7 @@ object SparkClientEndpoint {
       case Some(client) => client.stop(stopRequest)
       case _ =>
         throw new UnsupportedOperationException(
-          s"Unsupported ${stopRequest.executionMode} stop ")
+          s"Unsupported ${stopRequest.executionMode} spark stop.")
     }
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
similarity index 81%
rename from 
streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
rename to 
streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index af5776f18..8248a04cb 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -22,7 +22,6 @@ import org.apache.streampark.common.util.HadoopUtils
 import org.apache.streampark.spark.client.`trait`.SparkClientTrait
 import org.apache.streampark.spark.client.bean._
 
-import org.apache.commons.collections.MapUtils
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
 
@@ -32,7 +31,7 @@ import scala.collection.convert.ImplicitConversions._
 import scala.util.{Failure, Success, Try}
 
 /** yarn application mode submit */
-object YarnApplicationClient extends SparkClientTrait {
+object YarnClient extends SparkClientTrait {
 
   override def doStop(stopRequest: StopRequest): StopResponse = {
     
HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId))
@@ -51,16 +50,16 @@ object YarnApplicationClient extends SparkClientTrait {
     // 3) launch
     Try(launch(launcher)) match {
       case Success(handle: SparkAppHandle) =>
-        logger.info(s"[StreamPark][YarnApplicationClient] spark job: 
${submitRequest.effectiveAppName} is submit successful, " +
+        logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${submitRequest.effectiveAppName} is submit successful, " +
           s"appid: ${handle.getAppId}, " +
           s"state: ${handle.getState}")
-        SubmitResponse(null, null, handle.getAppId)
+        SubmitResponse(handle.getAppId, submitRequest.properties)
       case Failure(e) => throw e
     }
   }
 
   private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = {
-    logger.info("[StreamPark][YarnApplicationClient] The spark task start")
+    logger.info("[StreamPark][Spark][YarnClient] The spark job starting")
     val submitFinished: CountDownLatch = new CountDownLatch(1)
     val sparkAppHandle = sparkLauncher.startApplication(new 
SparkAppHandle.Listener() {
       override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {}
@@ -96,26 +95,23 @@ object YarnApplicationClient extends SparkClientTrait {
       .setDeployMode(submitRequest.executionMode match {
         case SparkExecutionMode.YARN_CLIENT => "client"
         case SparkExecutionMode.YARN_CLUSTER => "cluster"
-        case _ =>
-          throw new UnsupportedOperationException(
-            "[StreamPark][YarnApplicationClient] Yarn mode only support 
\"client\" and \"cluster\".")
+        case _ => throw new IllegalArgumentException("[StreamPark][Spark] 
Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".")
       })
   }
 
   private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher: 
SparkLauncher): Unit = {
-    logger.info("[StreamPark][SparkClient][YarnApplicationClient] set spark 
configuration.")
+    logger.info("[StreamPark][Spark][YarnClient] set spark configuration.")
     // 1) set spark conf
     submitRequest.properties.foreach(prop => {
       val k = prop._1
-      val v = prop._2.toString
+      val v = prop._2
       logInfo(s"| $k  : $v")
       sparkLauncher.setConf(k, v)
     })
 
     // 2) appArgs...
-    if (MapUtils.isNotEmpty(submitRequest.extraParameter) && 
submitRequest.extraParameter
-        .containsKey("sql")) {
-      sparkLauncher.addAppArgs("--sql", 
submitRequest.extraParameter.get("sql").toString)
+    if (submitRequest.hasExtra("sql")) {
+      sparkLauncher.addAppArgs("--sql", submitRequest.getExtra("sql").toString)
     }
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index af7778ebc..bfefa8892 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -66,8 +66,6 @@ trait SparkClientTrait extends Logger {
          |----------------------------------------- spark job stop 
----------------------------------
          |     userSparkHome     : ${stopRequest.sparkVersion.sparkHome}
          |     sparkVersion      : ${stopRequest.sparkVersion.version}
-         |     withDrain         : ${stopRequest.withDrain}
-         |     nativeFormat      : ${stopRequest.nativeFormat}
          |     jobId             : ${stopRequest.jobId}
          
|-------------------------------------------------------------------------------------------
          |""".stripMargin)

Reply via email to