This is an automated email from the ASF dual-hosted git repository.
chengjie pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 71fc77ce9 [Improve] spark app submit bug fixed. (#4095)
71fc77ce9 is described below
commit 71fc77ce9c0cc055b43089b484b5fae95fb17485
Author: benjobs <[email protected]>
AuthorDate: Fri Sep 27 10:09:19 2024 +0800
[Improve] spark app submit bug fixed. (#4095)
---
qodana.yaml | 47 ++++++++++++++++++++++
.../streampark/common/util/PropertiesUtils.scala | 26 +-----------
.../impl/SparkApplicationActionServiceImpl.java | 4 +-
.../streampark/spark/client/impl/YarnClient.scala | 7 ++--
4 files changed, 54 insertions(+), 30 deletions(-)
diff --git a/qodana.yaml b/qodana.yaml
new file mode 100644
index 000000000..636032aba
--- /dev/null
+++ b/qodana.yaml
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#-------------------------------------------------------------------------------#
+# Qodana analysis is configured by qodana.yaml file
#
+# https://www.jetbrains.com/help/qodana/qodana-yaml.html
#
+#-------------------------------------------------------------------------------#
+version: "1.0"
+
+#Specify inspection profile for code analysis
+profile:
+ name: qodana.starter
+
+#Enable inspections
+#include:
+# - name: <SomeEnabledInspectionId>
+
+#Disable inspections
+#exclude:
+# - name: <SomeDisabledInspectionId>
+# paths:
+# - <path/where/not/run/inspection>
+
+projectJDK: 8 #(Applied in CI/CD pipeline)
+
+#Execute shell command before Qodana execution (Applied in CI/CD pipeline)
+#bootstrap: sh ./prepare-qodana.sh
+
+#Install IDE plugins before Qodana execution (Applied in CI/CD pipeline)
+#plugins:
+# - id: <plugin.id> #(plugin id can be found at https://plugins.jetbrains.com)
+
+#Specify Qodana linter for analysis (Applied in CI/CD pipeline)
+linter: jetbrains/qodana-jvm:latest
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f7f68ae45..2f5567b7b 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -37,12 +37,6 @@ object PropertiesUtils extends Logger {
private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
- private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN =
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
-
- // scalastyle:off
- private[this] lazy val SPARK_ARGUMENT_REGEXP =
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
- // scalastyle:on
-
private[this] lazy val MULTI_PROPERTY_REGEXP =
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
private[this] lazy val MULTI_PROPERTY_PATTERN =
Pattern.compile(MULTI_PROPERTY_REGEXP)
@@ -398,7 +392,7 @@ object PropertiesUtils extends Logger {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
- val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
+ val p = PROPERTY_PATTERN.matcher(x)
if (p.matches) {
map += p.group(1).trim -> p.group(2).trim
}
@@ -409,22 +403,4 @@ object PropertiesUtils extends Logger {
map.toMap
}
}
-
- /** extract spark configuration from sparkApplication.appArgs */
- @Nonnull def extractSparkArgumentsAsJava(arguments: String):
JavaList[String] = {
- val list = new JavaArrayList[String]()
- if (StringUtils.isEmpty(arguments)) list
- else {
- arguments.split(SPARK_ARGUMENT_REGEXP) match {
- case d if Utils.isNotEmpty(d) =>
- d.foreach(x => {
- if (x.nonEmpty) {
- list.add(x)
- }
- })
- case _ =>
- }
- list
- }
- }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 0c7296a3c..f68dc3ea1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -80,6 +80,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.io.File;
+import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@@ -309,6 +310,7 @@ public class SparkApplicationActionServiceImpl
// Get the args after placeholder replacement
String applicationArgs =
variableService.replaceVariable(application.getTeamId(),
application.getAppArgs());
+ List<String> sparkArgs = Arrays.asList(applicationArgs.split("\\s+"));
SubmitRequest submitRequest = new SubmitRequest(
sparkEnv.getSparkVersion(),
@@ -320,7 +322,7 @@ public class SparkApplicationActionServiceImpl
application.getMainClass(),
appConf,
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
- PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
+ sparkArgs,
application.getApplicationType(),
application.getHadoopUser(),
buildResult,
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index 031400825..55b6a1a03 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -131,9 +131,8 @@ object YarnClient extends SparkClientTrait {
.setAppResource(submitRequest.userJarPath)
.setMainClass(submitRequest.appMain)
.setAppName(submitRequest.appName)
- .setConf(
- "spark.yarn.jars",
- submitRequest.hdfsWorkspace.sparkLib + "/*.jar")
+ .setConf("spark.yarn.dist.jars", submitRequest.hdfsWorkspace.sparkLib)
+ .setConf("spark.yarn.applicationType", "StreamPark Spark")
.setVerbose(true)
.setMaster("yarn")
.setDeployMode(submitRequest.executionMode match {
@@ -166,7 +165,7 @@ object YarnClient extends SparkClientTrait {
}
}
- protected def setYarnQueue(submitRequest: SubmitRequest): Unit = {
+ private def setYarnQueue(submitRequest: SubmitRequest): Unit = {
if (submitRequest.hasExtra(KEY_SPARK_YARN_QUEUE_NAME)) {
submitRequest.appProperties.put(KEY_SPARK_YARN_QUEUE,
submitRequest.getExtra(KEY_SPARK_YARN_QUEUE_NAME).asInstanceOf[String])
}