This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a94ed3a5b [Improve] Adjust to fit the Spark page (#4042)
a94ed3a5b is described below
commit a94ed3a5b1b4ef453caded7f1d7ee39d05bc3357
Author: lenoxzhao <[email protected]>
AuthorDate: Sat Sep 7 21:58:54 2024 +0800
[Improve] Adjust to fit the Spark page (#4042)
* feat: spark multiversion support
* feat: change custom code to spark jar
* feat: adjust to frontend
---
.../common/enums/SparkDevelopmentMode.java | 4 ++--
.../streampark/common/conf/SparkVersion.scala | 4 ++--
.../src/main/assembly/script/data/mysql-data.sql | 6 ++++++
.../console/core/entity/SparkApplication.java | 25 +++++++++++-----------
.../impl/SparkApplicationActionServiceImpl.java | 2 +-
.../impl/SparkApplicationManageServiceImpl.java | 9 ++++++--
.../service/impl/SparkAppBuildPipeServiceImpl.java | 12 +++++------
.../impl/SparkApplicationBackUpServiceImpl.java | 2 +-
.../mapper/core/SparkApplicationMapper.xml | 17 ---------------
.../spark/client/bean/SubmitRequest.scala | 2 +-
.../spark/client/proxy/SparkShimsProxy.scala | 14 ++++--------
.../org/apache/streampark/spark/core/Spark.scala | 6 ++----
12 files changed, 44 insertions(+), 59 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
index efdd66c00..a1531379a 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/SparkDevelopmentMode.java
@@ -26,8 +26,8 @@ public enum SparkDevelopmentMode {
/** Unknown type replace null */
UNKNOWN("Unknown", -1),
- /** custom code */
- CUSTOM_CODE("Custom Code", 1),
+ /** Spark Jar */
+ SPARK_JAR("Spark Jar", 1),
/** Spark SQL */
SPARK_SQL("Spark SQL", 2),
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
index 8b52adfb3..cbbe11321 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/SparkVersion.scala
@@ -38,7 +38,7 @@ class SparkVersion(val sparkHome: String) extends
Serializable with Logger {
val (version, scalaVersion) = {
var sparkVersion: String = null
var scalaVersion: String = null
- val cmd = List(s"$sparkHome/bin/spark-submit --version")
+ val cmd = List(s"export SPARK_HOME=$sparkHome&&$sparkHome/bin/spark-submit
--version")
val buffer = new mutable.StringBuilder
CommandUtils.execute(
@@ -91,7 +91,7 @@ class SparkVersion(val sparkHome: String) extends
Serializable with Logger {
def checkVersion(throwException: Boolean = true): Boolean = {
version.split("\\.").map(_.trim.toInt) match {
- case Array(3, v, _) if v >= 1 && v <= 3 => true
+ case Array(v, _, _) if v == 2 || v == 3 => true
case _ =>
if (throwException) {
throw new UnsupportedOperationException(s"Unsupported spark version:
$version")
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
index a0eb0160e..0fe8fdf87 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql
@@ -86,6 +86,12 @@ insert into `t_menu` values (110118, 110100, 'app sql
delete', null, null, 'sql:
insert into `t_menu` values (110301, 110300, 'cluster add',
'/flink/add_cluster', 'flink/cluster/Add', 'cluster:create', '', '0', 0, null,
now(), now());
insert into `t_menu` values (110302, 110300, 'cluster edit',
'/flink/edit_cluster', 'flink/cluster/Edit', 'cluster:update', '', '0', 0,
null, now(), now());
+insert into `t_menu` values (120100, 120000, 'spark.application',
'/spark/app', 'spark/app/index', null, null, '0', 1, 2, now(), now());
+insert into `t_menu` values (120200, 120000, 'spark.sparkHome', '/spark/home',
'spark/home/index', null, null, '0', 1, 3, now(), now());
+insert into `t_menu` values (120300, 120000, 'spark.createApplication',
'/spark/app/create', 'spark/app/create', 'app:create', '', '0', 0, null, now(),
now());
+insert into `t_menu` values (120400, 120000, 'spark.updateApplication',
'/spark/app/edit', 'spark/app/edit', 'app:update', '', '0', 0, null, now(),
now());
+insert into `t_menu` values (120500, 120000, 'spark.applicationDetail',
'/spark/app/detail', 'spark/app/detail', 'app:detail', '', '0', 0, null, now(),
now());
+
insert into `t_menu` values (130100, 130000, 'resource.project',
'/resource/project', 'resource/project/View', null, 'github', '0', 1, 3, now(),
now());
insert into `t_menu` values (130200, 130000, 'resource.variable',
'/resource/variable', 'resource/variable/View', null, null, '0', 1, 4, now(),
now());
insert into `t_menu` values (130300, 130000, 'resource.upload',
'/resource/upload', 'resource/upload/View', null, null, '0', 1, 3, now(),
now());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 64b8ed460..1685de3ea 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -64,7 +64,7 @@ public class SparkApplication extends BaseEntity {
private Long teamId;
- /** 1) custom code 2) spark SQL */
+ /** 1) spark jar 2) spark SQL 3) pyspark*/
private Integer jobType;
/** 1) Apache Spark 2) StreamPark Spark */
@@ -386,40 +386,41 @@ public class SparkApplication extends BaseEntity {
}
@JsonIgnore
- public boolean isSparkSqlJob() {
- return
SparkDevelopmentMode.SPARK_SQL.getMode().equals(this.getJobType());
+ public boolean isSparkOnYarnJob() {
+ return SparkExecutionMode.YARN_CLUSTER.getMode() ==
(this.getExecutionMode())
+ || SparkExecutionMode.YARN_CLIENT.getMode() ==
(this.getExecutionMode());
}
@JsonIgnore
- public boolean isCustomCodeJob() {
- return
SparkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
+ public boolean isSparkSqlJob() {
+ return
SparkDevelopmentMode.SPARK_SQL.getMode().equals(this.getJobType());
}
@JsonIgnore
- public boolean isCustomCodeOrSparkSqlJob() {
- return isSparkSqlJob() || isCustomCodeJob();
+ public boolean isSparkJarJob() {
+ return
SparkDevelopmentMode.SPARK_JAR.getMode().equals(this.getJobType());
}
@JsonIgnore
- public boolean isCustomCodeOrPySparkJob() {
- return
SparkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
+ public boolean isSparkJarOrPySparkJob() {
+ return
SparkDevelopmentMode.SPARK_JAR.getMode().equals(this.getJobType())
||
SparkDevelopmentMode.PYSPARK.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isUploadJob() {
- return isCustomCodeOrPySparkJob()
+ return isSparkJarOrPySparkJob()
&&
ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}
@JsonIgnore
public boolean isCICDJob() {
- return isCustomCodeOrPySparkJob()
+ return isSparkJarOrPySparkJob()
&& ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
}
public boolean isStreamParkJob() {
- return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
+ return this.getAppType() == ApplicationType.STREAMPARK_SPARK.getType();
}
@JsonIgnore
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 6b2ad804e..7b098c7a3 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -452,7 +452,7 @@ public class SparkApplicationActionServiceImpl
sparkUserJar = resource.getFilePath();
break;
- case CUSTOM_CODE:
+ case SPARK_JAR:
if (application.isUploadJob()) {
appConf = applicationConfig == null
? null
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index eaceffc45..f171594b7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.constants.Constants;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
@@ -267,8 +268,12 @@ public class SparkApplicationManageServiceImpl
ApiAlertException.throwIfFalse(
success,
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(),
appParam.getTeamId()));
- appParam.resolveYarnQueue();
-
+ if (appParam.isSparkOnYarnJob()) {
+ appParam.resolveYarnQueue();
+ if (appParam.isSparkSqlJob()) {
+
appParam.setMainClass(Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS);
+ }
+ }
if (appParam.isUploadJob()) {
String jarPath = String.format(
"%s/%d/%s", Workspace.local().APP_UPLOADS(),
appParam.getTeamId(), appParam.getJar());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
index 040ce28b1..61b17c53c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.constants.Constants;
import org.apache.streampark.common.enums.ApplicationType;
-import org.apache.streampark.common.enums.SparkDevelopmentMode;
import org.apache.streampark.common.enums.SparkExecutionMode;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.AssertUtils;
@@ -201,8 +200,8 @@ public class SparkAppBuildPipeServiceImpl
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();
- if (app.isCustomCodeOrPySparkJob()) {
- // customCode upload jar to appHome...
+ if (app.isSparkJarOrPySparkJob()) {
+ // spark jar and pyspark upload resource to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
@@ -286,7 +285,7 @@ public class SparkAppBuildPipeServiceImpl
// If the current task is not running, or the task
has just been added, directly
// set
// the candidate version to the official version
- if (app.isCustomCodeOrSparkSqlJob()) {
+ if (app.isSparkOnYarnJob()) {
applicationManageService.toEffective(app);
} else {
if (app.isStreamParkJob()) {
@@ -378,8 +377,7 @@ public class SparkAppBuildPipeServiceImpl
case YARN_CLIENT:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
- if (SparkDevelopmentMode.CUSTOM_CODE ==
app.getDevelopmentMode()
- && ApplicationType.APACHE_SPARK ==
app.getApplicationType()) {
+ if (ApplicationType.APACHE_SPARK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
}
@@ -400,7 +398,7 @@ public class SparkAppBuildPipeServiceImpl
private String retrieveSparkUserJar(SparkEnv sparkEnv, SparkApplication
app) {
switch (app.getDevelopmentMode()) {
- case CUSTOM_CODE:
+ case SPARK_JAR:
switch (app.getApplicationType()) {
case STREAMPARK_SPARK:
return String.format(
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
index df094eb92..eaec6ab0d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkApplicationBackUpServiceImpl.java
@@ -196,7 +196,7 @@ public class SparkApplicationBackUpServiceImpl
@Override
public void backup(SparkApplication appParam, SparkSql sparkSqlParam) {
// basic configuration file backup
- String appHome = (appParam.isCustomCodeJob() && appParam.isCICDJob())
+ String appHome = (appParam.isCICDJob())
? appParam.getDistHome()
: appParam.getAppHome();
FsOperator fsOperator = appParam.getFsOperator();
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
index b58fe3a4b..2e0b6ce8b 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/SparkApplicationMapper.xml
@@ -86,7 +86,6 @@
<select id="selectPage"
resultType="org.apache.streampark.console.core.entity.SparkApplication"
parameterType="org.apache.streampark.console.core.entity.SparkApplication">
select
t.*,
- p.name as projectName,
u.username,
case
when trim(u.nick_name) = ''
@@ -104,37 +103,21 @@
<if test="app.jobType != null and app.jobType != ''">
and t.job_type = #{app.jobType}
</if>
- <if test="app.jobTypeArray != null and app.jobTypeArray.length>0">
- and t.job_type in
- <foreach item="item" index="index"
collection="app.jobTypeArray" open="(" close=")" separator=",">
- #{item}
- </foreach>
- </if>
<if test="app.executionMode != null and app.executionMode != ''">
and t.execution_mode = #{app.executionMode}
</if>
<if test="app.appName != null and app.appName != ''">
and t.app_name like concat('%', '${app.appName}', '%')
</if>
- <if test="app.projectName != null and app.projectName != ''">
- and p.name like concat('%', '${app.projectName}', '%')
- </if>
<if test="app.appId != null and app.appId != ''">
and t.app_id = #{app.appId}
</if>
<if test="app.state != null and app.state != ''">
and t.state = #{app.state}
</if>
-
<if test="app.userId != null and app.userId != ''">
and t.user_id = #{app.userId}
</if>
- <if test="app.stateArray != null and app.stateArray.length>0">
- and t.state in
- <foreach item="item" index="index" collection="app.stateArray"
open="(" close=")" separator=",">
- #{item}
- </foreach>
- </if>
<if test="app.tags != null and app.tags != ''">
and t.tags like concat('%', '${app.tags}', '%')
</if>
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
index 7b99ffdf7..2e871b9cd 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/bean/SubmitRequest.scala
@@ -61,7 +61,7 @@ case class SubmitRequest(
lazy val appMain: String = this.developmentMode match {
case SparkDevelopmentMode.SPARK_SQL =>
Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS
- case SparkDevelopmentMode.CUSTOM_CODE | SparkDevelopmentMode.PYSPARK =>
mainClass
+ case SparkDevelopmentMode.SPARK_JAR | SparkDevelopmentMode.PYSPARK =>
mainClass
case SparkDevelopmentMode.UNKNOWN => throw new
IllegalArgumentException("Unknown deployment Mode")
}
diff --git
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
index df4829211..a62ed677c 100644
---
a/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
+++
b/streampark-spark/streampark-spark-client/streampark-spark-client-api/src/main/scala/org/apache/streampark/spark/client/proxy/SparkShimsProxy.scala
@@ -75,17 +75,11 @@ object SparkShimsProxy extends Logger {
logInfo(s"Add verify sql lib,spark version: $sparkVersion")
VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
s"${sparkVersion.fullVersion}", {
- val getSparkTable: File => Boolean =
_.getName.startsWith("spark-table")
- // 1) spark/lib/spark-table*
- val libTableURL =
- getSparkHomeLib(sparkVersion.sparkHome, "lib", getSparkTable)
+ val libUrl = getSparkHomeLib(sparkVersion.sparkHome, "jars", f =>
!f.getName.startsWith("log4j") && !f.getName.startsWith("slf4j"))
+ val shimsUrls = ListBuffer[URL](libUrl: _*)
- // 2) After version 1.15 need add spark/opt/spark-table*
- val optTableURL =
- getSparkHomeLib(sparkVersion.sparkHome, "opt", getSparkTable)
- val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
+ // TODO If there are compatibility issues with different versions
- // 3) add only streampark shims jar
addShimsUrls(
sparkVersion,
file => {
@@ -135,7 +129,7 @@ object SparkShimsProxy extends Logger {
if (INCLUDE_PATTERN.matcher(jarName).matches()) {
addShimUrl(jar)
logInfo(s"Include jar lib: $jarName")
- } else if (jarName.matches(s"^streampark-.*_$scalaVersion.*$$")) {
+ } else if
(jarName.matches(s"^streampark-(?!flink).*_$scalaVersion.*$$")) {
addShimUrl(jar)
logInfo(s"Include streampark lib: $jarName")
}
diff --git
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 52123e760..26aaeaad9 100644
---
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -156,11 +156,9 @@ trait Spark extends Logger {
throw new IllegalArgumentException(
"[StreamPark] Usage: config file error,must be
[properties|yaml|conf]")
}
-
- sparkConf.setAll(localConf)
+ localConf.foreach(arg => sparkConf.set(arg._1, arg._2))
}
-
- sparkConf.setAll(userArgs)
+ userArgs.foreach(arg => sparkConf.set(arg._1, arg._2))
val appMain = sparkConf.get(KEY_SPARK_MAIN_CLASS,
"org.apache.streampark.spark.cli.SqlClient")
if (appMain == null) {