This is an automated email from the ASF dual-hosted git repository.

kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9aa6900b0 [Improve] spark app support user args (#2627)
9aa6900b0 is described below

commit 9aa6900b0ec534ad47eeef5a74d172f88e54758a
Author: benjobs <[email protected]>
AuthorDate: Fri Apr 14 21:26:48 2023 +0800

    [Improve] spark app support user args (#2627)
    
    Co-authored-by: benjobs <[email protected]>
---
 .../org/apache/streampark/spark/core/Spark.scala   | 27 ++++++++++------------
 1 file changed, 12 insertions(+), 15 deletions(-)

diff --git 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 4cf44d386..186317fb8 100644
--- 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++ 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -85,12 +85,9 @@ trait Spark extends Logger {
 
   /** Initialize sparkConf according to user parameters */
   final private def init(args: Array[String]): Unit = {
-
-    logDebug("init application config ....")
-
     var argv = args.toList
-
     var conf: String = null
+    val userArgs = ArrayBuffer[(String, String)]()
 
     while (argv.nonEmpty) {
       argv match {
@@ -104,6 +101,9 @@ trait Spark extends Logger {
           createOnError = value.toBoolean
           argv = tail
         case Nil =>
+        case other :: value :: tail if other.startsWith("--") =>
+          userArgs += other.drop(2) -> value
+          argv = tail
         case tail =>
           logError(s"Unrecognized options: ${tail.mkString(" ")}")
           printUsageAndExit()
@@ -119,22 +119,19 @@ trait Spark extends Logger {
           "[StreamPark] Usage: config file error,must be 
[properties|yaml|conf]")
     }
 
-    localConf.foreach(x => sparkConf.set(x._1, x._2))
-
-    val (appMain, appName) = sparkConf.get(KEY_SPARK_MAIN_CLASS, null) match {
-      case null | "" => (null, null)
-      case other =>
-        sparkConf.get(KEY_SPARK_APP_NAME, null) match {
-          case null | "" => (other, other)
-          case name => (other, name)
-        }
-    }
+    sparkConf.setAll(localConf).setAll(userArgs)
 
+    val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, null)
     if (appMain == null) {
-      logError(s"[StreamPark] $KEY_SPARK_MAIN_CLASS must not be empty!")
+      logError(s"[StreamPark] parameter: $KEY_SPARK_MAIN_CLASS must not be 
empty!")
       System.exit(1)
     }
 
+    val appName = sparkConf.get(KEY_SPARK_APP_NAME, null) match {
+      case null | "" => appMain
+      case name => name
+    }
+
     // debug mode
     val localMode = sparkConf.get("spark.master", null) == "local"
     if (localMode) {

Reply via email to