This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 27e97a625 [Fix] Resolve Spark configuration update issues (#4099)
27e97a625 is described below
commit 27e97a625bc1c7e25183b73c70e8c4995e0502df
Author: lenoxzhao <[email protected]>
AuthorDate: Sat Sep 28 05:39:13 2024 +0800
[Fix] Resolve Spark configuration update issues (#4099)
---
.../streampark/common/util/PropertiesUtils.scala | 26 +++++++++++++++++++++-
.../impl/SparkApplicationActionServiceImpl.java | 4 +---
.../impl/SparkApplicationManageServiceImpl.java | 24 +++++++++-----------
.../src/main/resources/spark-application.conf | 2 +-
.../src/views/spark/app/components/DetailTab.vue | 4 ++--
.../streampark/spark/client/impl/YarnClient.scala | 2 +-
6 files changed, 40 insertions(+), 22 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 2f5567b7b..f7f68ae45 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -37,6 +37,12 @@ object PropertiesUtils extends Logger {
private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
+ private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN =
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
+
+ // scalastyle:off
+ private[this] lazy val SPARK_ARGUMENT_REGEXP =
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
+ // scalastyle:on
+
private[this] lazy val MULTI_PROPERTY_REGEXP =
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
private[this] lazy val MULTI_PROPERTY_PATTERN =
Pattern.compile(MULTI_PROPERTY_REGEXP)
@@ -392,7 +398,7 @@ object PropertiesUtils extends Logger {
case d if Utils.isNotEmpty(d) =>
d.foreach(x => {
if (x.nonEmpty) {
- val p = PROPERTY_PATTERN.matcher(x)
+ val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
if (p.matches) {
map += p.group(1).trim -> p.group(2).trim
}
@@ -403,4 +409,22 @@ object PropertiesUtils extends Logger {
map.toMap
}
}
+
+ /** extract spark configuration from sparkApplication.appArgs */
+ @Nonnull def extractSparkArgumentsAsJava(arguments: String):
JavaList[String] = {
+ val list = new JavaArrayList[String]()
+ if (StringUtils.isEmpty(arguments)) list
+ else {
+ arguments.split(SPARK_ARGUMENT_REGEXP) match {
+ case d if Utils.isNotEmpty(d) =>
+ d.foreach(x => {
+ if (x.nonEmpty) {
+ list.add(x)
+ }
+ })
+ case _ =>
+ }
+ list
+ }
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index e6e89b479..a6dce8bbb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -80,7 +80,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.io.File;
-import java.util.Arrays;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
@@ -310,7 +309,6 @@ public class SparkApplicationActionServiceImpl
// Get the args after placeholder replacement
String applicationArgs =
variableService.replaceVariable(application.getTeamId(),
application.getAppArgs());
- List<String> sparkArgs = Arrays.asList(applicationArgs.split("\\s+"));
SubmitRequest submitRequest = new SubmitRequest(
sparkEnv.getSparkVersion(),
@@ -322,7 +320,7 @@ public class SparkApplicationActionServiceImpl
application.getMainClass(),
appConf,
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
- sparkArgs,
+ PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
application.getApplicationType(),
application.getHadoopUser(),
buildResult,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index c60667478..112ed1cf5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -458,18 +458,15 @@ public class SparkApplicationManageServiceImpl
break;
}
- // Spark Sql job...
if (application.isSparkSqlJob()) {
updateSparkSqlJob(application, appParam);
- return true;
- }
-
- if (application.isStreamParkJob()) {
- // configService.update(appParam, application.isRunning());
- } else {
+ } else if (application.isSparkJarJob()) {
application.setJar(appParam.getJar());
application.setMainClass(appParam.getMainClass());
}
+
+ // update config
+ configService.update(appParam, application.isRunning());
this.updateById(application);
return true;
}
@@ -492,19 +489,19 @@ public class SparkApplicationManageServiceImpl
sparkSqlService.create(sql);
application.setBuild(true);
} else {
- // get previous flink sql and decode
+ // get previous spark sql and decode
SparkSql copySourceSparkSql =
sparkSqlService.getById(appParam.getSqlId());
ApiAlertException.throwIfNull(
- copySourceSparkSql, "Flink sql is null, update flink sql job
failed.");
+ copySourceSparkSql, "Spark sql is null, update spark sql job
failed.");
copySourceSparkSql.decode();
- // get submit flink sql
- SparkSql targetFlinkSql = new SparkSql(appParam);
+ // get submit spark sql
+ SparkSql targetSparkSql = new SparkSql(appParam);
// judge sql and dependency has changed
- ChangeTypeEnum changeTypeEnum =
copySourceSparkSql.checkChange(targetFlinkSql);
+ ChangeTypeEnum changeTypeEnum =
copySourceSparkSql.checkChange(targetSparkSql);
- log.info("updateFlinkSqlJob changeTypeEnum: {}", changeTypeEnum);
+ log.info("updateSparkSqlJob changeTypeEnum: {}", changeTypeEnum);
// if has been changed
if (changeTypeEnum.hasChanged()) {
@@ -542,7 +539,6 @@ public class SparkApplicationManageServiceImpl
}
}
this.updateById(application);
- // this.configService.update(appParam, application.isRunning());
}
@Override
diff --git
a/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
b/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
index c9d30f409..48fd83ff7 100644
---
a/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
+++
b/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
@@ -23,7 +23,7 @@ spark: #@see:
https://spark.apache.org/docs/latest/configuration.html
memoryOverhead:
memoryOverheadFactor:
executor:
- instances: 4
+ instances:
cores:
memory:
memoryOverhead:
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
b/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
index 595f0b5fd..a4bb9c8ec 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
+++
b/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
@@ -252,11 +252,11 @@
}
async function handleYarnUrl(id: string) {
- window.open(baseUrl() + '/proxy/yarn/' + id + '/');
+ window.open(baseUrl() + '/spark/proxy/yarn/' + id + '/');
}
async function handleViewHistory(id: string) {
- window.open(baseUrl() + '/proxy/history/' + id + '/');
+ window.open(baseUrl() + '/spark/proxy/history/' + id + '/');
}
function getBackupAction(record: Recordable): ActionItem[] {
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index 7eb1ed4b2..d26ea8098 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -40,7 +40,7 @@ object YarnClient extends SparkClientTrait {
override def doStop(stopRequest: StopRequest): StopResponse = {
val sparkAppHandle = sparkHandles.remove(stopRequest.appId)
if (sparkAppHandle != null) {
- Try(sparkAppHandle.kill()) match {
+ Try(sparkAppHandle.stop()) match {
case Success(_) =>
logger.info(s"[StreamPark][Spark][YarnClient] spark job:
${stopRequest.appId} is stopped successfully.")
StopResponse(null)