This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 27e97a625 [Fix] Resolve Spark configuration update issues (#4099)
27e97a625 is described below

commit 27e97a625bc1c7e25183b73c70e8c4995e0502df
Author: lenoxzhao <[email protected]>
AuthorDate: Sat Sep 28 05:39:13 2024 +0800

    [Fix] Resolve Spark configuration update issues (#4099)
---
 .../streampark/common/util/PropertiesUtils.scala   | 26 +++++++++++++++++++++-
 .../impl/SparkApplicationActionServiceImpl.java    |  4 +---
 .../impl/SparkApplicationManageServiceImpl.java    | 24 +++++++++-----------
 .../src/main/resources/spark-application.conf      |  2 +-
 .../src/views/spark/app/components/DetailTab.vue   |  4 ++--
 .../streampark/spark/client/impl/YarnClient.scala  |  2 +-
 6 files changed, 40 insertions(+), 22 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 2f5567b7b..f7f68ae45 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -37,6 +37,12 @@ object PropertiesUtils extends Logger {
 
   private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
 
+  private[this] lazy val SPARK_PROPERTY_COMPLEX_PATTERN = 
Pattern.compile("^[\"']?(.*?)=(.*?)[\"']?$")
+
+  // scalastyle:off
+  private[this] lazy val SPARK_ARGUMENT_REGEXP = 
"\"?(\\s+|$)(?=(([^\"]*\"){2})*[^\"]*$)\"?"
+  // scalastyle:on
+
   private[this] lazy val MULTI_PROPERTY_REGEXP = 
"-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
 
   private[this] lazy val MULTI_PROPERTY_PATTERN = 
Pattern.compile(MULTI_PROPERTY_REGEXP)
@@ -392,7 +398,7 @@ object PropertiesUtils extends Logger {
         case d if Utils.isNotEmpty(d) =>
           d.foreach(x => {
             if (x.nonEmpty) {
-              val p = PROPERTY_PATTERN.matcher(x)
+              val p = SPARK_PROPERTY_COMPLEX_PATTERN.matcher(x)
               if (p.matches) {
                 map += p.group(1).trim -> p.group(2).trim
               }
@@ -403,4 +409,22 @@ object PropertiesUtils extends Logger {
       map.toMap
     }
   }
+
+  /** extract spark configuration from sparkApplication.appArgs */
+  @Nonnull def extractSparkArgumentsAsJava(arguments: String): 
JavaList[String] = {
+    val list = new JavaArrayList[String]()
+    if (StringUtils.isEmpty(arguments)) list
+    else {
+      arguments.split(SPARK_ARGUMENT_REGEXP) match {
+        case d if Utils.isNotEmpty(d) =>
+          d.foreach(x => {
+            if (x.nonEmpty) {
+              list.add(x)
+            }
+          })
+        case _ =>
+      }
+      list
+    }
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index e6e89b479..a6dce8bbb 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -80,7 +80,6 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -310,7 +309,6 @@ public class SparkApplicationActionServiceImpl
 
         // Get the args after placeholder replacement
         String applicationArgs = 
variableService.replaceVariable(application.getTeamId(), 
application.getAppArgs());
-        List<String> sparkArgs = Arrays.asList(applicationArgs.split("\\s+"));
 
         SubmitRequest submitRequest = new SubmitRequest(
             sparkEnv.getSparkVersion(),
@@ -322,7 +320,7 @@ public class SparkApplicationActionServiceImpl
             application.getMainClass(),
             appConf,
             
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
-            sparkArgs,
+            PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
             application.getApplicationType(),
             application.getHadoopUser(),
             buildResult,
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index c60667478..112ed1cf5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -458,18 +458,15 @@ public class SparkApplicationManageServiceImpl
                 break;
         }
 
-        // Spark Sql job...
         if (application.isSparkSqlJob()) {
             updateSparkSqlJob(application, appParam);
-            return true;
-        }
-
-        if (application.isStreamParkJob()) {
-            // configService.update(appParam, application.isRunning());
-        } else {
+        } else if (application.isSparkJarJob()) {
             application.setJar(appParam.getJar());
             application.setMainClass(appParam.getMainClass());
         }
+
+        // update config
+        configService.update(appParam, application.isRunning());
         this.updateById(application);
         return true;
     }
@@ -492,19 +489,19 @@ public class SparkApplicationManageServiceImpl
             sparkSqlService.create(sql);
             application.setBuild(true);
         } else {
-            // get previous flink sql and decode
+            // get previous spark sql and decode
             SparkSql copySourceSparkSql = 
sparkSqlService.getById(appParam.getSqlId());
             ApiAlertException.throwIfNull(
-                copySourceSparkSql, "Flink sql is null, update flink sql job 
failed.");
+                copySourceSparkSql, "Spark sql is null, update spark sql job 
failed.");
             copySourceSparkSql.decode();
 
-            // get submit flink sql
-            SparkSql targetFlinkSql = new SparkSql(appParam);
+            // get submit spark sql
+            SparkSql targetSparkSql = new SparkSql(appParam);
 
             // judge sql and dependency has changed
-            ChangeTypeEnum changeTypeEnum = 
copySourceSparkSql.checkChange(targetFlinkSql);
+            ChangeTypeEnum changeTypeEnum = 
copySourceSparkSql.checkChange(targetSparkSql);
 
-            log.info("updateFlinkSqlJob changeTypeEnum: {}", changeTypeEnum);
+            log.info("updateSparkSqlJob changeTypeEnum: {}", changeTypeEnum);
 
             // if has been changed
             if (changeTypeEnum.hasChanged()) {
@@ -542,7 +539,6 @@ public class SparkApplicationManageServiceImpl
             }
         }
         this.updateById(application);
-        // this.configService.update(appParam, application.isRunning());
     }
 
     @Override
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
 
b/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
index c9d30f409..48fd83ff7 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
+++ 
b/streampark-console/streampark-console-service/src/main/resources/spark-application.conf
@@ -23,7 +23,7 @@ spark: #@see: 
https://spark.apache.org/docs/latest/configuration.html
     memoryOverhead:
     memoryOverheadFactor:
   executor:
-    instances: 4
+    instances:
     cores:
     memory:
     memoryOverhead:
diff --git 
a/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
 
b/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
index 595f0b5fd..a4bb9c8ec 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/spark/app/components/DetailTab.vue
@@ -252,11 +252,11 @@
   }
 
   async function handleYarnUrl(id: string) {
-    window.open(baseUrl() + '/proxy/yarn/' + id + '/');
+    window.open(baseUrl() + '/spark/proxy/yarn/' + id + '/');
   }
 
   async function handleViewHistory(id: string) {
-    window.open(baseUrl() + '/proxy/history/' + id + '/');
+    window.open(baseUrl() + '/spark/proxy/history/' + id + '/');
   }
 
   function getBackupAction(record: Recordable): ActionItem[] {
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
index 7eb1ed4b2..d26ea8098 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-core/src/main/scala/org/apache/streampark/spark/client/impl/YarnClient.scala
@@ -40,7 +40,7 @@ object YarnClient extends SparkClientTrait {
   override def doStop(stopRequest: StopRequest): StopResponse = {
     val sparkAppHandle = sparkHandles.remove(stopRequest.appId)
     if (sparkAppHandle != null) {
-      Try(sparkAppHandle.kill()) match {
+      Try(sparkAppHandle.stop()) match {
         case Success(_) =>
           logger.info(s"[StreamPark][Spark][YarnClient] spark job: 
${stopRequest.appId} is stopped successfully.")
           StopResponse(null)

Reply via email to