This is an automated email from the ASF dual-hosted git repository.
kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 9aa6900b0 [Improve] spark app support user args (#2627)
9aa6900b0 is described below
commit 9aa6900b0ec534ad47eeef5a74d172f88e54758a
Author: benjobs <[email protected]>
AuthorDate: Fri Apr 14 21:26:48 2023 +0800
[Improve] spark app support user args (#2627)
Co-authored-by: benjobs <[email protected]>
---
.../org/apache/streampark/spark/core/Spark.scala | 27 ++++++++++------------
1 file changed, 12 insertions(+), 15 deletions(-)
diff --git
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 4cf44d386..186317fb8 100644
---
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -85,12 +85,9 @@ trait Spark extends Logger {
/** Initialize sparkConf according to user parameters */
final private def init(args: Array[String]): Unit = {
-
- logDebug("init application config ....")
-
var argv = args.toList
-
var conf: String = null
+ val userArgs = ArrayBuffer[(String, String)]()
while (argv.nonEmpty) {
argv match {
@@ -104,6 +101,9 @@ trait Spark extends Logger {
createOnError = value.toBoolean
argv = tail
case Nil =>
+ case other :: value :: tail if other.startsWith("--") =>
+ userArgs += other.drop(2) -> value
+ argv = tail
case tail =>
logError(s"Unrecognized options: ${tail.mkString(" ")}")
printUsageAndExit()
@@ -119,22 +119,19 @@ trait Spark extends Logger {
"[StreamPark] Usage: config file error,must be
[properties|yaml|conf]")
}
- localConf.foreach(x => sparkConf.set(x._1, x._2))
-
- val (appMain, appName) = sparkConf.get(KEY_SPARK_MAIN_CLASS, null) match {
- case null | "" => (null, null)
- case other =>
- sparkConf.get(KEY_SPARK_APP_NAME, null) match {
- case null | "" => (other, other)
- case name => (other, name)
- }
- }
+ sparkConf.setAll(localConf).setAll(userArgs)
+ val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, null)
if (appMain == null) {
- logError(s"[StreamPark] $KEY_SPARK_MAIN_CLASS must not be empty!")
+ logError(s"[StreamPark] parameter: $KEY_SPARK_MAIN_CLASS must not be
empty!")
System.exit(1)
}
+ val appName = sparkConf.get(KEY_SPARK_APP_NAME, null) match {
+ case null | "" => appMain
+ case name => name
+ }
+
// debug mode
val localMode = sparkConf.get("spark.master", null) == "local"
if (localMode) {