This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 9f9e26a96 [Improve] spark-submit some improvements (#3901)
9f9e26a96 is described below
commit 9f9e26a96cc696eb0d481ebf8d78b17eec5d7672
Author: benjobs <[email protected]>
AuthorDate: Sat Jul 20 22:10:33 2024 +0800
[Improve] spark-submit some improvements (#3901)
* [Improve] spark-submit someimprovements
* [Improve] enum minor improvements
---
.../common/enums/SparkDevelopmentMode.java | 60 ++++++++++++++++++++++
.../common/enums/SparkExecutionMode.java | 1 +
.../console/core/entity/SparkApplication.java | 4 +-
.../impl/SparkApplicationActionServiceImpl.java | 27 ++++------
.../flink/client/bean/SubmitRequest.scala | 7 ++-
.../streampark/spark/client/bean/StopRequest.scala | 6 +--
.../spark/client/bean/StopResponse.scala | 2 +-
.../spark/client/bean/SubmitRequest.scala | 28 +++++-----
.../spark/client/bean/SubmitResponse.scala | 7 +--
.../spark/client/SparkClientEndpoint.scala | 8 +--
...arnApplicationClient.scala => YarnClient.scala} | 22 ++++----
.../spark/client/trait/SparkClientTrait.scala | 2 -
12 files changed, 111 insertions(+), 63 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
new file mode 100644
index 000000000..29c5b9727
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.enums;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+/** The flink deployment mode enum. */
+public enum SparkDevelopmentMode {
+
+ /** Unknown type replace null */
+ UNKNOWN("Unknown", -1),
+
+ /** custom code */
+ CUSTOM_CODE("Custom Code", 1),
+
+ /** spark SQL */
+ SPARK_SQL("Spark SQL", 2);
+
+ private final String name;
+
+ private final Integer mode;
+
+ SparkDevelopmentMode(@Nonnull String name, @Nonnull Integer mode) {
+ this.name = name;
+ this.mode = mode;
+ }
+
+ /**
+ * Try to resolve the mode value into {@link SparkDevelopmentMode}.
+ *
+ * @param value The mode value of potential flink deployment mode.
+ * @return The parsed flink deployment mode.
+ */
+ @Nonnull
+ public static SparkDevelopmentMode valueOf(@Nullable Integer value) {
+ for (SparkDevelopmentMode flinkDevelopmentMode : values()) {
+ if (flinkDevelopmentMode.mode.equals(value)) {
+ return flinkDevelopmentMode;
+ }
+ }
+ return SparkDevelopmentMode.UNKNOWN;
+ }
+
+}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
index 25db6f1f5..d82d96f5d 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkExecutionMode.java
@@ -37,6 +37,7 @@ public enum SparkExecutionMode {
/** yarn client */
YARN_CLIENT(3, "yarn-client");
+
private final Integer mode;
private final String name;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index c07e2b160..8b3eb5112 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -407,11 +407,11 @@ public class SparkApplication extends BaseEntity {
@JsonIgnore
@SneakyThrows
@SuppressWarnings("unchecked")
- public Map<String, Object> getOptionMap() {
+ public Map<String, String> getOptionMap() {
if (StringUtils.isBlank(this.options)) {
return new HashMap<>();
}
- Map<String, Object> optionMap = JacksonUtils.read(this.options,
Map.class);
+ Map<String, String> optionMap = JacksonUtils.read(this.options,
Map.class);
optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
return optionMap;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 4ddb22c51..6b585bcf7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -21,7 +21,7 @@ import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.FlinkDevelopmentMode;
+import org.apache.streampark.common.enums.SparkDevelopmentMode;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
@@ -211,17 +211,15 @@ public class SparkApplicationActionServiceImpl
SparkEnv sparkEnv =
sparkEnvService.getById(application.getVersionId());
- Map<String, Object> properties = new HashMap<>();
+ Map<String, String> stopProper = new HashMap<>();
StopRequest stopRequest =
new StopRequest(
application.getId(),
sparkEnv.getSparkVersion(),
SparkExecutionMode.of(application.getExecutionMode()),
- properties,
- application.getJobId(),
- appParam.getDrain(),
- appParam.getNativeFormat());
+ stopProper,
+ application.getJobId());
CompletableFuture<StopResponse> stopFuture =
CompletableFuture.supplyAsync(() -> SparkClient.stop(stopRequest),
executorService);
@@ -322,7 +320,7 @@ public class SparkApplicationActionServiceImpl
SparkExecutionMode.of(application.getExecutionMode()),
getProperties(application),
sparkEnv.getSparkConf(),
- FlinkDevelopmentMode.of(application.getJobType()),
+ SparkDevelopmentMode.valueOf(application.getJobType()),
application.getId(),
jobId,
application.getJobName(),
@@ -374,15 +372,10 @@ public class SparkApplicationActionServiceImpl
}
}
application.setAppId(response.clusterId());
- if (StringUtils.isNoneEmpty(response.sparkAppId())) {
- application.setJobId(response.sparkAppId());
+ if (StringUtils.isNoneEmpty(response.clusterId())) {
+ application.setJobId(response.clusterId());
}
-
- if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
- application.setJobManagerUrl(response.jobManagerUrl());
- applicationLog.setTrackUrl(response.jobManagerUrl());
- }
- applicationLog.setSparkAppId(response.sparkAppId());
+ applicationLog.setSparkAppId(response.clusterId());
application.setStartTime(new Date());
application.setEndTime(null);
@@ -531,8 +524,8 @@ public class SparkApplicationActionServiceImpl
return Tuple2.of(flinkUserJar, appConf);
}
- private Map<String, Object> getProperties(SparkApplication application) {
- Map<String, Object> properties = new
HashMap<>(application.getOptionMap());
+ private Map<String, String> getProperties(SparkApplication application) {
+ Map<String, String> properties = new
HashMap<>(application.getOptionMap());
if
(SparkExecutionMode.isYarnMode(application.getSparkExecutionMode())) {
String yarnQueue = (String)
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
String yarnLabelExpr = (String)
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 5dc294484..cc1e795c0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -26,6 +26,7 @@ import
org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResp
import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.commons.collections.MapUtils
import org.apache.commons.io.FileUtils
import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions,
SavepointRestoreSettings}
@@ -118,10 +119,14 @@ case class SubmitRequest(
}
}
- def hasProp(key: String): Boolean = properties.containsKey(key)
+ def hasProp(key: String): Boolean = MapUtils.isNotEmpty(properties) &&
properties.containsKey(key)
def getProp(key: String): Any = properties.get(key)
+ def hasExtra(key: String): Boolean = MapUtils.isNotEmpty(extraParameter) &&
extraParameter.containsKey(key)
+
+ def getExtra(key: String): Any = extraParameter.get(key)
+
private[this] def getParameterMap(prefix: String = ""): Map[String, String]
= {
if (this.appConf == null) {
return Map.empty[String, String]
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
index 4e2ab56bc..2f8b5fcbb 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopRequest.scala
@@ -28,7 +28,5 @@ case class StopRequest(
id: Long,
sparkVersion: SparkVersion,
executionMode: SparkExecutionMode,
- @Nullable properties: JavaMap[String, Any],
- jobId: String,
- withDrain: Boolean,
- nativeFormat: Boolean)
+ @Nullable properties: JavaMap[String, String],
+ jobId: String)
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
index c8655d19b..42b480534 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/StopResponse.scala
@@ -17,4 +17,4 @@
package org.apache.streampark.spark.client.bean
-case class StopResponse(savePointDir: String)
+case class StopResponse(savePoint: String)
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
index 2feca4b0f..cf71c99b9 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
@@ -25,23 +25,22 @@ import org.apache.streampark.common.util.{DeflaterUtils,
HdfsUtils, PropertiesUt
import org.apache.streampark.flink.packer.pipeline.{BuildResult,
ShadedBuildResponse}
import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.commons.collections.MapUtils
import javax.annotation.Nullable
import java.io.{File, IOException}
-import java.net.URL
import java.nio.file.Files
import java.util.{Map => JavaMap}
import scala.collection.convert.ImplicitConversions._
-import scala.util.Try
case class SubmitRequest(
sparkVersion: SparkVersion,
executionMode: SparkExecutionMode,
- properties: JavaMap[String, Any],
+ properties: JavaMap[String, String],
sparkYaml: String,
- developmentMode: FlinkDevelopmentMode,
+ developmentMode: SparkDevelopmentMode,
id: Long,
jobId: String,
appName: String,
@@ -52,7 +51,7 @@ case class SubmitRequest(
@Nullable buildResult: BuildResult,
@Nullable extraParameter: JavaMap[String, Any]) {
- val DEFAULT_SUBMIT_PARAM = Map[String, Any](
+ val DEFAULT_SUBMIT_PARAM = Map[String, String](
"spark.driver.cores" -> "1",
"spark.driver.memory" -> "1g",
"spark.executor.cores" -> "1",
@@ -62,8 +61,9 @@ case class SubmitRequest(
KEY_SPARK_PROPERTY_PREFIX)
lazy val appMain: String = this.developmentMode match {
- case FlinkDevelopmentMode.FLINK_SQL =>
Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
- case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
+ case SparkDevelopmentMode.SPARK_SQL =>
Constant.STREAMPARK_SPARKSQL_CLIENT_CLASS
+ case SparkDevelopmentMode.CUSTOM_CODE =>
appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
+ case SparkDevelopmentMode.UNKNOWN => throw new
IllegalArgumentException("Unknown deployment Mode")
}
lazy val effectiveAppName: String = if (this.appName == null) {
@@ -72,12 +72,6 @@ case class SubmitRequest(
this.appName
}
- lazy val libs: List[URL] = {
- val path = s"${Workspace.local.APP_WORKSPACE}/$id/lib"
- Try(new File(path).listFiles().map(_.toURI.toURL).toList)
- .getOrElse(List.empty[URL])
- }
-
lazy val userJarPath: String = {
executionMode match {
case _ =>
@@ -86,6 +80,14 @@ case class SubmitRequest(
}
}
+ def hasProp(key: String): Boolean = MapUtils.isNotEmpty(properties) &&
properties.containsKey(key)
+
+ def getProp(key: String): Any = properties.get(key)
+
+ def hasExtra(key: String): Boolean = MapUtils.isNotEmpty(extraParameter) &&
extraParameter.containsKey(key)
+
+ def getExtra(key: String): Any = extraParameter.get(key)
+
private[this] def getParameterMap(prefix: String = ""): Map[String, String]
= {
if (this.appConf == null) {
return Map.empty[String, String]
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
index 5ea75af01..b97b1f146 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitResponse.scala
@@ -17,13 +17,8 @@
package org.apache.streampark.spark.client.bean
-import javax.annotation.Nullable
-
import java.util.{Map => JavaMap}
case class SubmitResponse(
clusterId: String,
- sparkConfig: JavaMap[String, String],
- var sparkAppId: String,
- @Nullable jobId: String = "",
- @Nullable jobManagerUrl: String = "")
+ sparkConfig: JavaMap[String, String])
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
index 437bf0ff2..6590f8c12 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/SparkClientEndpoint.scala
@@ -26,15 +26,15 @@ import org.apache.streampark.spark.client.impl._
object SparkClientEndpoint {
private[this] val clients: Map[SparkExecutionMode, SparkClientTrait] = Map(
- YARN_CLUSTER -> YarnApplicationClient,
- YARN_CLIENT -> YarnApplicationClient)
+ YARN_CLUSTER -> YarnClient,
+ YARN_CLIENT -> YarnClient)
def submit(submitRequest: SubmitRequest): SubmitResponse = {
clients.get(submitRequest.executionMode) match {
case Some(client) => client.submit(submitRequest)
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${submitRequest.executionMode} submit ")
+ s"Unsupported ${submitRequest.executionMode} spark submit.")
}
}
@@ -43,7 +43,7 @@ object SparkClientEndpoint {
case Some(client) => client.stop(stopRequest)
case _ =>
throw new UnsupportedOperationException(
- s"Unsupported ${stopRequest.executionMode} stop ")
+ s"Unsupported ${stopRequest.executionMode} spark stop.")
}
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
similarity index 81%
rename from
streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
rename to
streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index af5776f18..8248a04cb 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnApplicationClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -22,7 +22,6 @@ import org.apache.streampark.common.util.HadoopUtils
import org.apache.streampark.spark.client.`trait`.SparkClientTrait
import org.apache.streampark.spark.client.bean._
-import org.apache.commons.collections.MapUtils
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
@@ -32,7 +31,7 @@ import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
/** yarn application mode submit */
-object YarnApplicationClient extends SparkClientTrait {
+object YarnClient extends SparkClientTrait {
override def doStop(stopRequest: StopRequest): StopResponse = {
HadoopUtils.yarnClient.killApplication(ApplicationId.fromString(stopRequest.jobId))
@@ -51,16 +50,16 @@ object YarnApplicationClient extends SparkClientTrait {
// 3) launch
Try(launch(launcher)) match {
case Success(handle: SparkAppHandle) =>
- logger.info(s"[StreamPark][YarnApplicationClient] spark job:
${submitRequest.effectiveAppName} is submit successful, " +
+ logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${submitRequest.effectiveAppName} is submit successful, " +
s"appid: ${handle.getAppId}, " +
s"state: ${handle.getState}")
- SubmitResponse(null, null, handle.getAppId)
+ SubmitResponse(handle.getAppId, submitRequest.properties)
case Failure(e) => throw e
}
}
private def launch(sparkLauncher: SparkLauncher): SparkAppHandle = {
- logger.info("[StreamPark][YarnApplicationClient] The spark task start")
+ logger.info("[StreamPark][Spark][YarnClient] The spark job starting")
val submitFinished: CountDownLatch = new CountDownLatch(1)
val sparkAppHandle = sparkLauncher.startApplication(new
SparkAppHandle.Listener() {
override def infoChanged(sparkAppHandle: SparkAppHandle): Unit = {}
@@ -96,26 +95,23 @@ object YarnApplicationClient extends SparkClientTrait {
.setDeployMode(submitRequest.executionMode match {
case SparkExecutionMode.YARN_CLIENT => "client"
case SparkExecutionMode.YARN_CLUSTER => "cluster"
- case _ =>
- throw new UnsupportedOperationException(
- "[StreamPark][YarnApplicationClient] Yarn mode only support
\"client\" and \"cluster\".")
+ case _ => throw new IllegalArgumentException("[StreamPark][Spark]
Invalid spark on yarn deployMode, only support \"client\" and \"cluster\".")
})
}
private def setSparkConfig(submitRequest: SubmitRequest, sparkLauncher:
SparkLauncher): Unit = {
- logger.info("[StreamPark][SparkClient][YarnApplicationClient] set spark
configuration.")
+ logger.info("[StreamPark][Spark][YarnClient] set spark configuration.")
// 1) set spark conf
submitRequest.properties.foreach(prop => {
val k = prop._1
- val v = prop._2.toString
+ val v = prop._2
logInfo(s"| $k : $v")
sparkLauncher.setConf(k, v)
})
// 2) appArgs...
- if (MapUtils.isNotEmpty(submitRequest.extraParameter) &&
submitRequest.extraParameter
- .containsKey("sql")) {
- sparkLauncher.addAppArgs("--sql",
submitRequest.extraParameter.get("sql").toString)
+ if (submitRequest.hasExtra("sql")) {
+ sparkLauncher.addAppArgs("--sql", submitRequest.getExtra("sql").toString)
}
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
index af7778ebc..bfefa8892 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/trait/SparkClientTrait.scala
@@ -66,8 +66,6 @@ trait SparkClientTrait extends Logger {
|----------------------------------------- spark job stop
----------------------------------
| userSparkHome : ${stopRequest.sparkVersion.sparkHome}
| sparkVersion : ${stopRequest.sparkVersion.version}
- | withDrain : ${stopRequest.withDrain}
- | nativeFormat : ${stopRequest.nativeFormat}
| jobId : ${stopRequest.jobId}
|-------------------------------------------------------------------------------------------
|""".stripMargin)