This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a94ed3a5b [Improve] Adjust to fit the Spark page  (#4042)
a94ed3a5b is described below

commit a94ed3a5b1b4ef453caded7f1d7ee39d05bc3357
Author: lenoxzhao <[email protected]>
AuthorDate: Sat Sep 7 21:58:54 2024 +0800

    [Improve] Adjust to fit the Spark page  (#4042)
    
    * feat: spark multiversion support
    
    * feat: change custom code to spark jar
    
    * feat: adjust to frontend
---
 .../common/enums/SparkDevelopmentMode.java         |  4 ++--
 .../streampark/common/conf/SparkVersion.scala      |  4 ++--
 .../src/main/assembly/script/data/mysql-data.sql   |  6 ++++++
 .../console/core/entity/SparkApplication.java      | 25 +++++++++++-----------
 .../impl/SparkApplicationActionServiceImpl.java    |  2 +-
 .../impl/SparkApplicationManageServiceImpl.java    |  9 ++++++--
 .../service/impl/SparkAppBuildPipeServiceImpl.java | 12 +++++------
 .../impl/SparkApplicationBackUpServiceImpl.java    |  2 +-
 .../mapper/core/SparkApplicationMapper.xml         | 17 ---------------
 .../spark/client/bean/SubmitRequest.scala          |  2 +-
 .../spark/client/proxy/SparkShimsProxy.scala       | 14 ++++--------
 .../org/apache/streampark/spark/core/Spark.scala   |  6 ++----
 12 files changed, 44 insertions(+), 59 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
index efdd66c00..a1531379a 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
@@ -26,8 +26,8 @@ public enum SparkDevelopmentMode {
     /** Unknown type replace null */
     UNKNOWN("Unknown", -1),
 
-    /** custom code */
-    CUSTOM_CODE("Custom Code", 1),
+    /** Spark Jar */
+    SPARK_JAR("Spark Jar", 1),
 
     /** Spark SQL */
     SPARK_SQL("Spark SQL", 2),
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
index 8b52adfb3..cbbe11321 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
@@ -38,7 +38,7 @@ class SparkVersion(val sparkHome: String) extends 
Serializable with Logger {
   val (version, scalaVersion) = {
     var sparkVersion: String = null
     var scalaVersion: String = null
-    val cmd = List(s"$sparkHome/bin/spark-submit --version")
+    val cmd = List(s"export SPARK_HOME=$sparkHome&&$sparkHome/bin/spark-submit 
--version")
     val buffer = new mutable.StringBuilder
 
     CommandUtils.execute(
@@ -91,7 +91,7 @@ class SparkVersion(val sparkHome: String) extends 
Serializable with Logger {
 
   def checkVersion(throwException: Boolean = true): Boolean = {
     version.split("\\.").map(_.trim.toInt) match {
-      case Array(3, v, _) if v >= 1 && v <= 3 => true
+      case Array(v, _, _) if v == 2 || v == 3 => true
       case _ =>
         if (throwException) {
           throw new UnsupportedOperationException(s"Unsupported spark version: 
$version")
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index a0eb0160e..0fe8fdf87 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -86,6 +86,12 @@ insert into `t_menu` values (110118, 110100, 'app sql 
delete', null, null, 'sql:
 insert into `t_menu` values (110301, 110300, 'cluster add', 
'/flink/add_cluster', 'flink/cluster/Add', 'cluster:create', '', '0', 0, null, 
now(), now());
 insert into `t_menu` values (110302, 110300, 'cluster edit', 
'/flink/edit_cluster', 'flink/cluster/Edit', 'cluster:update', '', '0', 0, 
null, now(), now());
 
+insert into `t_menu` values (120100, 120000, 'spark.application', 
'/spark/app', 'spark/app/index', null, null, '0', 1, 2, now(), now());
+insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home', 
'spark/home/index', null, null, '0', 1, 3, now(), now());
+insert into `t_menu` values (120300, 120000, 'spark.createApplication', 
'/spark/app/create', 'spark/app/create', 'app:create', '', '0', 0, null, now(), 
now());
+insert into `t_menu` values (120400, 120000, 'spark.updateApplication', 
'/spark/app/edit', 'spark/app/edit', 'app:update', '', '0', 0, null, now(), 
now());
+insert into `t_menu` values (120500, 120000, 'spark.applicationDetail', 
'/spark/app/detail', 'spark/app/detail', 'app:detail', '', '0', 0, null, now(), 
now());
+
 insert into `t_menu` values (130100, 130000, 'resource.project', 
'/resource/project', 'resource/project/View', null, 'github', '0', 1, 3, now(), 
now());
 insert into `t_menu` values (130200, 130000, 'resource.variable', 
'/resource/variable', 'resource/variable/View', null, null, '0', 1, 4, now(), 
now());
 insert into `t_menu` values (130300, 130000, 'resource.upload', 
'/resource/upload', 'resource/upload/View', null, null, '0', 1, 3, now(), 
now());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 64b8ed460..1685de3ea 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -64,7 +64,7 @@ public class SparkApplication extends BaseEntity {
 
     private Long teamId;
 
-    /** 1) custom code 2) spark SQL */
+    /** 1) spark jar 2) spark SQL 3) pyspark*/
     private Integer jobType;
 
     /** 1) Apache Spark 2) StreamPark Spark */
@@ -386,40 +386,41 @@ public class SparkApplication extends BaseEntity {
     }
 
     @JsonIgnore
-    public boolean isSparkSqlJob() {
-        return 
SparkDevelopmentMode.SPARK_SQL.getMode().equals(this.getJobType());
+    public boolean isSparkOnYarnJob() {
+        return SparkExecutionMode.YARN_CLUSTER.getMode() == 
(this.getExecutionMode())
+            || SparkExecutionMode.YARN_CLIENT.getMode() == 
(this.getExecutionMode());
     }
 
     @JsonIgnore
-    public boolean isCustomCodeJob() {
-        return 
SparkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
+    public boolean isSparkSqlJob() {
+        return 
SparkDevelopmentMode.SPARK_SQL.getMode().equals(this.getJobType());
     }
 
     @JsonIgnore
-    public boolean isCustomCodeOrSparkSqlJob() {
-        return isSparkSqlJob() || isCustomCodeJob();
+    public boolean isSparkJarJob() {
+        return 
SparkDevelopmentMode.SPARK_JAR.getMode().equals(this.getJobType());
     }
 
     @JsonIgnore
-    public boolean isCustomCodeOrPySparkJob() {
-        return 
SparkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
+    public boolean isSparkJarOrPySparkJob() {
+        return 
SparkDevelopmentMode.SPARK_JAR.getMode().equals(this.getJobType())
             || 
SparkDevelopmentMode.PYSPARK.getMode().equals(this.getJobType());
     }
 
     @JsonIgnore
     public boolean isUploadJob() {
-        return isCustomCodeOrPySparkJob()
+        return isSparkJarOrPySparkJob()
             && 
ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
     }
 
     @JsonIgnore
     public boolean isCICDJob() {
-        return isCustomCodeOrPySparkJob()
+        return isSparkJarOrPySparkJob()
             && ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
     }
 
     public boolean isStreamParkJob() {
-        return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
+        return this.getAppType() == ApplicationType.STREAMPARK_SPARK.getType();
     }
 
     @JsonIgnore
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 6b2ad804e..7b098c7a3 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -452,7 +452,7 @@ public class SparkApplicationActionServiceImpl
                 sparkUserJar = resource.getFilePath();
                 break;
 
-            case CUSTOM_CODE:
+            case SPARK_JAR:
                 if (application.isUploadJob()) {
                     appConf = applicationConfig == null
                         ? null
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index eaceffc45..f171594b7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.constants.Constants;
 import org.apache.streampark.common.enums.SparkExecutionMode;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
@@ -267,8 +268,12 @@ public class SparkApplicationManageServiceImpl
         ApiAlertException.throwIfFalse(
             success,
             String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), 
appParam.getTeamId()));
-        appParam.resolveYarnQueue();
-
+        if (appParam.isSparkOnYarnJob()) {
+            appParam.resolveYarnQueue();
+            if (appParam.isSparkSqlJob()) {
+                
appParam.setMainClass(Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS);
+            }
+        }
         if (appParam.isUploadJob()) {
             String jarPath = String.format(
                 "%s/%d/%s", Workspace.local().APP_UPLOADS(), 
appParam.getTeamId(), appParam.getJar());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
index 040ce28b1..61b17c53c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.constants.Constants;
 import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.SparkDevelopmentMode;
 import org.apache.streampark.common.enums.SparkExecutionMode;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.AssertUtils;
@@ -201,8 +200,8 @@ public class SparkAppBuildPipeServiceImpl
                     // 2) some preparatory work
                     String appUploads = app.getWorkspace().APP_UPLOADS();
 
-                    if (app.isCustomCodeOrPySparkJob()) {
-                        // customCode upload jar to appHome...
+                    if (app.isSparkJarOrPySparkJob()) {
+                        // spark jar and pyspark upload resource to appHome...
                         String appHome = app.getAppHome();
                         FsOperator fsOperator = app.getFsOperator();
                         fsOperator.delete(appHome);
@@ -286,7 +285,7 @@ public class SparkAppBuildPipeServiceImpl
                             // If the current task is not running, or the task 
has just been added, directly
                             // set
                             // the candidate version to the official version
-                            if (app.isCustomCodeOrSparkSqlJob()) {
+                            if (app.isSparkOnYarnJob()) {
                                 applicationManageService.toEffective(app);
                             } else {
                                 if (app.isStreamParkJob()) {
@@ -378,8 +377,7 @@ public class SparkAppBuildPipeServiceImpl
             case YARN_CLIENT:
                 String yarnProvidedPath = app.getAppLib();
                 String localWorkspace = app.getLocalAppHome().concat("/lib");
-                if (SparkDevelopmentMode.CUSTOM_CODE == 
app.getDevelopmentMode()
-                    && ApplicationType.APACHE_SPARK == 
app.getApplicationType()) {
+                if (ApplicationType.APACHE_SPARK == app.getApplicationType()) {
                     yarnProvidedPath = app.getAppHome();
                     localWorkspace = app.getLocalAppHome();
                 }
@@ -400,7 +398,7 @@ public class SparkAppBuildPipeServiceImpl
 
     private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication 
app) {
         switch (app.getDevelopmentMode()) {
-            case CUSTOM_CODE:
+            case SPARK_JAR:
                 switch (app.getApplicationType()) {
                     case STREAMPARK_SPARK:
                         return String.format(
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
index df094eb92..eaec6ab0d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
@@ -196,7 +196,7 @@ public class SparkApplicationBackUpServiceImpl
     @Override
     public void backup(SparkApplication appParam, SparkSql sparkSqlParam) {
         // basic configuration file backup
-        String appHome = (appParam.isCustomCodeJob() && appParam.isCICDJob())
+        String appHome = (appParam.isCICDJob())
             ? appParam.getDistHome()
             : appParam.getAppHome();
         FsOperator fsOperator = appParam.getFsOperator();
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
index b58fe3a4b..2e0b6ce8b 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
@@ -86,7 +86,6 @@
     <select id="selectPage" 
resultType="org.apache.streampark.console.core.entity.SparkApplication" 
parameterType="org.apache.streampark.console.core.entity.SparkApplication">
         select
             t.*,
-            p.name as projectName,
             u.username,
            case
                when trim(u.nick_name) = ''
@@ -104,37 +103,21 @@
             <if test="app.jobType != null and app.jobType != ''">
                 and t.job_type = #{app.jobType}
             </if>
-            <if test="app.jobTypeArray != null and app.jobTypeArray.length>0">
-                and t.job_type in
-                <foreach item="item" index="index" 
collection="app.jobTypeArray" open="("  close=")" separator=",">
-                    #{item}
-                </foreach>
-            </if>
             <if test="app.executionMode != null and app.executionMode != ''">
                 and t.execution_mode = #{app.executionMode}
             </if>
             <if test="app.appName != null and app.appName != ''">
                 and t.app_name like concat('%', '${app.appName}', '%')
             </if>
-            <if test="app.projectName != null and app.projectName != ''">
-                and p.name like concat('%', '${app.projectName}', '%')
-            </if>
             <if test="app.appId != null and app.appId != ''">
                 and t.app_id = #{app.appId}
             </if>
             <if test="app.state != null and app.state != ''">
                 and t.state = #{app.state}
             </if>
-
             <if test="app.userId != null and app.userId != ''">
                 and t.user_id = #{app.userId}
             </if>
-            <if test="app.stateArray != null and app.stateArray.length>0">
-                and t.state in
-                <foreach item="item" index="index" collection="app.stateArray" 
open="("  close=")" separator=",">
-                    #{item}
-                </foreach>
-            </if>
             <if test="app.tags != null and app.tags != ''">
                 and t.tags like concat('%', '${app.tags}', '%')
             </if>
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
index 7b99ffdf7..2e871b9cd 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
@@ -61,7 +61,7 @@ case class SubmitRequest(
 
   lazy val appMain: String = this.developmentMode match {
     case SparkDevelopmentMode.SPARK_SQL => 
Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS
-    case SparkDevelopmentMode.CUSTOM_CODE | SparkDevelopmentMode.PYSPARK => 
mainClass
+    case SparkDevelopmentMode.SPARK_JAR | SparkDevelopmentMode.PYSPARK => 
mainClass
     case SparkDevelopmentMode.UNKNOWN => throw new 
IllegalArgumentException("Unknown deployment Mode")
   }
 
diff --git 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index df4829211..a62ed677c 100644
--- 
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++ 
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -75,17 +75,11 @@ object SparkShimsProxy extends Logger {
     logInfo(s"Add verify sql lib,spark version: $sparkVersion")
     VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
       s"${sparkVersion.fullVersion}", {
-        val getSparkTable: File => Boolean = 
_.getName.startsWith("spark-table")
-        // 1) spark/lib/spark-table*
-        val libTableURL =
-          getSparkHomeLib(sparkVersion.sparkHome, "lib", getSparkTable)
+        val libUrl = getSparkHomeLib(sparkVersion.sparkHome, "jars", f => 
!f.getName.startsWith("log4j") && !f.getName.startsWith("slf4j"))
+        val shimsUrls = ListBuffer[URL](libUrl: _*)
 
-        // 2) After version 1.15 need add spark/opt/spark-table*
-        val optTableURL =
-          getSparkHomeLib(sparkVersion.sparkHome, "opt", getSparkTable)
-        val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
+        // TODO If there are compatibility issues with different versions
 
-        // 3) add only streampark shims jar
         addShimsUrls(
           sparkVersion,
           file => {
@@ -135,7 +129,7 @@ object SparkShimsProxy extends Logger {
             if (INCLUDE_PATTERN.matcher(jarName).matches()) {
               addShimUrl(jar)
               logInfo(s"Include jar lib: $jarName")
-            } else if (jarName.matches(s"^streampark-.*_$scalaVersion.*$$")) {
+            } else if 
(jarName.matches(s"^streampark-(?!flink).*_$scalaVersion.*$$")) {
               addShimUrl(jar)
               logInfo(s"Include streampark lib: $jarName")
             }
diff --git 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 52123e760..26aaeaad9 100644
--- 
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++ 
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -156,11 +156,9 @@ trait Spark extends Logger {
           throw new IllegalArgumentException(
             "[StreamPark] Usage: config file error,must be 
[properties|yaml|conf]")
       }
-
-      sparkConf.setAll(localConf)
+      localConf.foreach(arg => sparkConf.set(arg._1, arg._2))
     }
-
-    sparkConf.setAll(userArgs)
+    userArgs.foreach(arg => sparkConf.set(arg._1, arg._2))
 
     val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS, 
"org.apache.streampark.spark.cli.SqlClient")
     if (appMain == null) {

Reply via email to