This is an automated email from the ASF dual-hosted git repository.

chengjie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 71fc77ce9 [Improve] spark app submit bug fixed. (#4095)
71fc77ce9 is described below

commit 71fc77ce9c0cc055b43089b484b5fae95fb17485
Author: benjobs <[email protected]>
AuthorDate: Fri Sep 27 10:09:19 2024 +0800

    [Improve] spark app submit bug fixed. (#4095)
---
 qodana.yaml                                        | 47 ++++++++++++++++++++++
 .../streampark/common/util/PropertiesUtils.scala   | 26 +-----------
 .../impl/SparkApplicationActionServiceImpl.java    |  4 +-
 .../streampark/spark/client/impl/YarnClient.scala  |  7 ++--
 4 files changed, 54 insertions(+), 30 deletions(-)

diff --git a/qodana.yaml b/qodana.yaml
new file mode 100644
index 000000000..636032aba
--- /dev/null
+++ b/qodana.yaml
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#-------------------------------------------------------------------------------#
+#               Qodana analysis is configured by qodana.yaml file              
 #
+#             https://www.jetbrains.com/help/qodana/qodana-yaml.html           
 #
+#-------------------------------------------------------------------------------#
+version: "1.0"
+
+#Specify inspection profile for code analysis
+profile:
+  name: qodana.starter
+
+#Enable inspections
+#include:
+#  - name: <SomeEnabledInspectionId>
+
+#Disable inspections
+#exclude:
+#  - name: <SomeDisabledInspectionId>
+#    paths:
+#      - <path/where/not/run/inspection>
+
+projectJDK: 8 #(Applied in CI/CD pipeline)
+
+#Execute shell command before Qodana execution (Applied in CI/CD pipeline)
+#bootstrap: sh ./prepare-qodana.sh
+
+#Install IDE plugins before Qodana execution (Applied in CI/CD pipeline)
+#plugins:
+#  - id: <plugin.id> #(plugin id can be found at https://plugins.jetbrains.com)
+
+#Specify Qodana linter for analysis (Applied in CI/CD pipeline)
+linter: jetbrains/qodana-jvm:latest
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f7f68ae45..2f5567b7b 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -37,12 +37,6 @@ object PropertiesUtils extends Logger {
 
   private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
 
-  private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = 
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
-
-  // scalastyle:off
-  private[this] lazy val SPARK_ARGUMENT_REGEXP = 
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
-  // scalastyle:on
-
   private[this] lazy val MULTI_PROPERTY_REGEXP = 
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
 
   private[this] lazy val MULTI_PROPERTY_PATTERN = 
Pattern.compile(MULTI_PROPERTY_REGEXP)
@@ -398,7 +392,7 @@ object PropertiesUtils extends Logger {
         case d if Utils.isNotEmpty(d) =>
           d.foreach(x => {
             if (x.nonEmpty) {
-              val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
+              val p = PROPERTY_PATTERN.matcher(x)
               if (p.matches) {
                 map += p.group(1).trim -> p.group(2).trim
               }
@@ -409,22 +403,4 @@ object PropertiesUtils extends Logger {
       map.toMap
     }
   }
-
-  /** extract spark configuration from sparkApplication.appArgs */
-  @Nonnull def extractSparkArgumentsAsJava(arguments: String): 
JavaList[String] = {
-    val list = new JavaArrayList[String]()
-    if (StringUtils.isEmpty(arguments)) list
-    else {
-      arguments.split(SPARK_ARGUMENT_REGEXP) match {
-        case d if Utils.isNotEmpty(d) =>
-          d.foreach(x => {
-            if (x.nonEmpty) {
-              list.add(x)
-            }
-          })
-        case _ =>
-      }
-      list
-    }
-  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 0c7296a3c..f68dc3ea1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -80,6 +80,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -309,6 +310,7 @@ public class SparkApplicationActionServiceImpl
 
         // Get the args after placeholder replacement
         String applicationArgs = 
variableService.replaceVariable(application.getTeamId(), 
application.getAppArgs());
+        List<String> sparkArgs = Arrays.asList(applicationArgs.split("\\s+"));
 
         SubmitRequest submitRequest = new SubmitRequest(
             sparkEnv.getSparkVersion(),
@@ -320,7 +322,7 @@ public class SparkApplicationActionServiceImpl
             application.getMainClass(),
             appConf,
             
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
-            PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
+            sparkArgs,
             application.getApplicationType(),
             application.getHadoopUser(),
             buildResult,
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index 031400825..55b6a1a03 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -131,9 +131,8 @@ object YarnClient extends SparkClientTrait {
       .setAppResource(submitRequest.userJarPath)
       .setMainClass(submitRequest.appMain)
       .setAppName(submitRequest.appName)
-      .setConf(
-        "spark.yarn.jars",
-        submitRequest.hdfsWorkspace.sparkLib + "/*.jar")
+      .setConf("spark.yarn.dist.jars", submitRequest.hdfsWorkspace.sparkLib)
+      .setConf("spark.yarn.applicationType", "StreamPark Spark")
       .setVerbose(true)
       .setMaster("yarn")
       .setDeployMode(submitRequest.executionMode match {
@@ -166,7 +165,7 @@ object YarnClient extends SparkClientTrait {
     }
   }
 
-  protected def setYarnQueue(submitRequest: SubmitRequest): Unit = {
+  private def setYarnQueue(submitRequest: SubmitRequest): Unit = {
     if (submitRequest.hasExtra(KEY_SPARK_YARN_QUEUE_NAME)) {
       submitRequest.appProperties.put(KEY_SPARK_YARN_QUEUE, 
submitRequest.getExtra(KEY_SPARK_YARN_QUEUE_NAME).asInstanceOf[String])
     }

Reply via email to