This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 5b1a1b4eb [Improve] job type custom_code rename to flink_jar (#4176)
5b1a1b4eb is described below
commit 5b1a1b4ebd19f3a200a637ee10c9470ac7ac2e51
Author: benjobs <[email protected]>
AuthorDate: Mon Jan 20 12:34:54 2025 +0800
[Improve] job type custom_code rename to flink_jar (#4176)
---
.../streampark/common/enums/FlinkJobType.java | 4 +-
.../main/assembly/script/schema/mysql-schema.sql | 2 +-
.../console/core/entity/FlinkApplication.java | 88 +++++++---------------
.../console/core/entity/SparkApplication.java | 8 +-
.../console/core/enums/ResourceFromEnum.java | 4 +-
.../impl/FlinkApplicationActionServiceImpl.java | 8 +-
.../impl/FlinkApplicationBackupServiceImpl.java | 6 +-
.../FlinkApplicationBuildPipelineServiceImpl.java | 27 +++----
.../impl/FlinkApplicationConfigServiceImpl.java | 2 +-
.../impl/FlinkApplicationManageServiceImpl.java | 23 +++---
.../impl/SparkApplicationActionServiceImpl.java | 2 +-
.../impl/SparkApplicationBackupServiceImpl.java | 2 +-
.../SparkApplicationBuildPipelineServiceImpl.java | 6 +-
.../impl/SparkApplicationManageServiceImpl.java | 6 +-
.../service/impl/FlinkSavepointServiceImpl.java | 2 +-
.../src/main/resources/db/schema-h2.sql | 2 +-
.../core/service/FlinkSavepointServiceTest.java | 4 +-
.../src/assets/icons/fjar.svg | 1 +
.../app/components/AppDetail/ExecOptionModal.vue | 2 +-
.../src/views/flink/app/data/detail.data.ts | 2 +-
.../src/views/flink/app/hooks/useAppTableAction.ts | 2 +-
.../flink/app/hooks/useCreateAndEditSchema.ts | 2 +-
.../src/views/flink/app/hooks/useCreateSchema.ts | 4 +-
.../pages/flink/applications/ApplicationForm.java | 6 +-
24 files changed, 93 insertions(+), 122 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
index 16c63952e..2c50978ef 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
@@ -31,9 +31,9 @@ public enum FlinkJobType {
UNKNOWN("Unknown", -1),
/**
- * custom code
+ * Flink Jar
*/
- CUSTOM_CODE("Custom Code", 1),
+ FLINK_JAR("Flink JAR", 1),
/**
* Flink SQL
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index a0885a319..25cc8c9ca 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -593,7 +593,7 @@ drop table if exists `t_spark_app`;
create table `t_spark_app` (
`id` bigint not null auto_increment,
`team_id` bigint not null,
- `job_type` tinyint default null comment '(1)custom code(2)spark SQL',
+ `job_type` tinyint default null comment '(1) spark Jar(2) spark SQL',
`app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark',
`version_id` bigint default null comment 'spark version',
`app_name` varchar(255) collate utf8mb4_general_ci default null comment
'spark.app.name',
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index fcb787b95..a88cc25a9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -46,7 +46,6 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
-import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
@@ -72,7 +71,7 @@ public class FlinkApplication extends BaseEntity {
private Long teamId;
/**
- * 1) custom code 2) flink SQL
+ * 1) flink jar 2) flink SQL
*/
private Integer jobType;
@@ -130,6 +129,7 @@ public class FlinkApplication extends BaseEntity {
@Getter
private String ingressTemplate;
+
@Setter
private String defaultModeIngress;
@@ -261,7 +261,7 @@ public class FlinkApplication extends BaseEntity {
private Date modifyTime;
/**
- * 1: cicd (build from csv) 2: upload (upload local jar job)
+ * 1: build (build from csv) 2: upload (upload local jar job)
*/
private Integer resourceFrom;
@@ -420,15 +420,6 @@ public class FlinkApplication extends BaseEntity {
&& this.cpFailureAction != null;
}
- public boolean eqFlinkJob(FlinkApplication other) {
- if (this.isFlinkSqlJobOrCDC()
- && other.isFlinkSqlJobOrCDC()
- && this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
- return
this.getDependencyObject().equals(other.getDependencyObject());
- }
- return false;
- }
-
/**
* Local compilation and packaging working directory
*/
@@ -475,18 +466,19 @@ public class FlinkApplication extends BaseEntity {
}
public String getMainClass() {
- FlinkJobType flinkJobType = FlinkJobType.of(jobType);
- if (flinkJobType == FlinkJobType.FLINK_SQL) {
- return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
- } else if (flinkJobType == FlinkJobType.FLINK_CDC) {
- return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
- } else if (flinkJobType == FlinkJobType.PYFLINK) {
- return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; // Assuming this
is the default behavior for other enum
- // values
- } else if (flinkJobType == FlinkJobType.CUSTOM_CODE) {
- return mainClass;
- } else {
- return null;
+ FlinkJobType flinkJobType = this.getJobTypeEnum();
+ switch (flinkJobType) {
+ case FLINK_SQL:
+ return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
+ case FLINK_CDC:
+ return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
+ case PYFLINK:
+ return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME;
+ case FLINK_JAR:
+ return mainClass;
+ case UNKNOWN:
+ default:
+ return null;
}
}
@@ -513,42 +505,35 @@ public class FlinkApplication extends BaseEntity {
}
@JsonIgnore
- public boolean isFlinkSqlJobOrCDC() {
+ public boolean isJobTypeFlinkSqlOrCDC() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) ||
FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
}
@JsonIgnore
- public boolean isFlinkSqlJobOrPyFlinkJobOrFlinkCDC() {
- return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType())
- || FlinkJobType.PYFLINK.getMode().equals(this.getJobType())
- || FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
+ public boolean isJobTypeFlinkJar() {
+ return FlinkJobType.FLINK_JAR.getMode().equals(this.getJobType());
}
@JsonIgnore
- public boolean isCustomCodeJob() {
- return FlinkJobType.CUSTOM_CODE.getMode().equals(this.getJobType());
- }
-
- @JsonIgnore
- public boolean isCustomCodeOrPyFlinkJob() {
- return FlinkJobType.CUSTOM_CODE.getMode().equals(this.getJobType())
+ public boolean isJobTypeFlinkJarOrPyFlink() {
+ return FlinkJobType.FLINK_JAR.getMode().equals(this.getJobType())
|| FlinkJobType.PYFLINK.getMode().equals(this.getJobType());
}
@JsonIgnore
- public boolean isUploadJob() {
- return isCustomCodeOrPyFlinkJob()
+ public boolean isResourceFromUpload() {
+ return isJobTypeFlinkJarOrPyFlink()
&&
ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}
@JsonIgnore
- public boolean isCICDJob() {
- return isCustomCodeOrPyFlinkJob()
- && ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
+ public boolean isResourceFromBuild() {
+ return isJobTypeFlinkJarOrPyFlink()
+ &&
ResourceFromEnum.BUILD.getValue().equals(this.getResourceFrom());
}
- public boolean isStreamParkJob() {
+ public boolean isAppTypeStreamPark() {
return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
}
@@ -673,23 +658,4 @@ public class FlinkApplication extends BaseEntity {
return FlinkDeployMode.isKubernetesMode(this.getDeployModeEnum());
}
- public static class SFunc {
-
- public static final SFunction<FlinkApplication, Long> ID =
FlinkApplication::getId;
- public static final SFunction<FlinkApplication, String> JOB_ID =
FlinkApplication::getJobId;
- public static final SFunction<FlinkApplication, Date> START_TIME =
FlinkApplication::getStartTime;
- public static final SFunction<FlinkApplication, Date> END_TIME =
FlinkApplication::getEndTime;
- public static final SFunction<FlinkApplication, Long> DURATION =
FlinkApplication::getDuration;
- public static final SFunction<FlinkApplication, Integer> TOTAL_TASK =
FlinkApplication::getTotalTask;
- public static final SFunction<FlinkApplication, Integer> TOTAL_TM =
FlinkApplication::getTotalTM;
- public static final SFunction<FlinkApplication, Integer> TOTAL_SLOT =
FlinkApplication::getTotalSlot;
- public static final SFunction<FlinkApplication, Integer> JM_MEMORY =
FlinkApplication::getJmMemory;
- public static final SFunction<FlinkApplication, Integer> TM_MEMORY =
FlinkApplication::getTmMemory;
- public static final SFunction<FlinkApplication, Integer> STATE =
FlinkApplication::getState;
- public static final SFunction<FlinkApplication, String> OPTIONS =
FlinkApplication::getOptions;
- public static final SFunction<FlinkApplication, Integer>
AVAILABLE_SLOT = FlinkApplication::getAvailableSlot;
- public static final SFunction<FlinkApplication, Integer>
EXECUTION_MODE = FlinkApplication::getDeployMode;
- public static final SFunction<FlinkApplication, String>
JOB_MANAGER_URL = FlinkApplication::getJobManagerUrl;
- }
-
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index 56894baa9..80937fdf5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -81,7 +81,7 @@ public class SparkApplication extends BaseEntity {
private Integer deployMode;
- /** 1: cicd (build from csv) 2: upload (upload local jar job) */
+ /** 1: build (build from csv) 2: upload (upload local jar job) */
private Integer resourceFrom;
private Long projectId;
@@ -426,15 +426,15 @@ public class SparkApplication extends BaseEntity {
}
@JsonIgnore
- public boolean isUploadJob() {
+ public boolean isFromUploadJob() {
return isSparkJarOrPySparkJob()
&&
ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
}
@JsonIgnore
- public boolean isCICDJob() {
+ public boolean isFromBuildJob() {
return isSparkJarOrPySparkJob()
- && ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
+ &&
ResourceFromEnum.BUILD.getValue().equals(this.getResourceFrom());
}
public boolean isStreamParkJob() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java
index c657e7e62..7159f3c1c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java
@@ -24,8 +24,8 @@ import java.util.Arrays;
@Getter
public enum ResourceFromEnum {
- /** cicd(build from cvs) */
- CICD(1),
+ /** build from cvs */
+ BUILD(1),
/** upload local jar */
UPLOAD(2);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index 717f90b55..3c6c43b6d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -216,7 +216,7 @@ public class FlinkApplicationActionServiceImpl
// 3) restore related status
LambdaUpdateWrapper<FlinkApplication> updateWrapper =
Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkApplication::getId, application.getId());
- if (application.isFlinkSqlJobOrCDC()) {
+ if (application.isJobTypeFlinkSqlOrCDC()) {
updateWrapper.set(FlinkApplication::getRelease,
ReleaseStateEnum.FAILED.get());
} else {
updateWrapper.set(FlinkApplication::getRelease,
ReleaseStateEnum.NEED_RELEASE.get());
@@ -452,7 +452,7 @@ public class FlinkApplicationActionServiceImpl
applicationManageService.toEffective(application);
Map<String, Object> extraParameter = new HashMap<>(0);
- if (application.isFlinkSqlJobOrCDC()) {
+ if (application.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql =
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
@@ -725,8 +725,8 @@ public class FlinkApplicationActionServiceImpl
flinkUserJar = resource.getFilePath();
break;
- case CUSTOM_CODE:
- if (application.isUploadJob()) {
+ case FLINK_JAR:
+ if (application.isResourceFromUpload()) {
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
index 38d1554bd..2b5fc93dd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
@@ -89,7 +89,7 @@ public class FlinkApplicationBackupServiceImpl
// If necessary, perform the backup first
if (bakParam.isBackup()) {
application.setBackUpDescription(bakParam.getDescription());
- if (application.isFlinkSqlJobOrCDC()) {
+ if (application.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), false);
backup(application, flinkSql);
} else {
@@ -107,7 +107,7 @@ public class FlinkApplicationBackupServiceImpl
effectiveService.saveOrUpdate(
bakParam.getAppId(), EffectiveTypeEnum.CONFIG,
bakParam.getId());
// if flink sql task, will be rollback sql and dependencies
- if (application.isFlinkSqlJobOrCDC()) {
+ if (application.isJobTypeFlinkSqlOrCDC()) {
effectiveService.saveOrUpdate(
bakParam.getAppId(), EffectiveTypeEnum.FLINKSQL,
bakParam.getSqlId());
}
@@ -190,7 +190,7 @@ public class FlinkApplicationBackupServiceImpl
@Override
public void backup(FlinkApplication appParam, FlinkSql flinkSqlParam) {
// basic configuration file backup
- String appHome = (appParam.isCustomCodeJob() && appParam.isCICDJob())
+ String appHome = (appParam.isJobTypeFlinkJar() &&
appParam.isResourceFromBuild())
? appParam.getDistHome()
: appParam.getAppHome();
FsOperator fsOperator = appParam.getFsOperator();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
index f930fb3cc..066aea636 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
@@ -192,14 +192,15 @@ public class FlinkApplicationBuildPipelineServiceImpl
return true;
}
// rollback
- if (app.isNeedRollback() && app.isFlinkSqlJobOrCDC()) {
+ if (app.isNeedRollback() && app.isJobTypeFlinkSqlOrCDC()) {
flinkSqlService.rollback(app);
}
// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(),
CandidateTypeEnum.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(),
false);
- if (app.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
+ FlinkJobType jobType = app.getJobTypeEnum();
+ if (jobType == FlinkJobType.FLINK_SQL || jobType ==
FlinkJobType.PYFLINK || jobType == FlinkJobType.FLINK_CDC) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql :
newFlinkSql;
AssertUtils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
@@ -235,12 +236,12 @@ public class FlinkApplicationBuildPipelineServiceImpl
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();
- if (app.isCustomCodeOrPyFlinkJob()) {
- // customCode upload jar to appHome...
+ if (app.isJobTypeFlinkJarOrPyFlink()) {
+ // flinkJar upload jar to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
- if (app.isUploadJob()) {
+ if (app.isResourceFromUpload()) {
String uploadJar =
appUploads.concat("/").concat(app.getJar());
File localJar = new File(
String.format(
@@ -274,7 +275,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
break;
default:
throw new IllegalArgumentException(
- "[StreamPark] unsupported
ApplicationType of custom code: "
+ "[StreamPark] unsupported
ApplicationType of FlinkJar: "
+ app.getApplicationType());
}
} else {
@@ -324,10 +325,10 @@ public class FlinkApplicationBuildPipelineServiceImpl
// If the current task is not running, or the task
has just been added, directly
// set
// the candidate version to the official version
- if (app.isFlinkSqlJobOrCDC()) {
+ if (app.isJobTypeFlinkSqlOrCDC()) {
applicationManageService.toEffective(app);
} else {
- if (app.isStreamParkJob()) {
+ if (app.isAppTypeStreamPark()) {
FlinkApplicationConfig config =
applicationConfigService.getLatest(app.getId());
if (config != null) {
@@ -340,7 +341,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
}
// backup.
if (!app.isNeedRollback()) {
- if (app.isFlinkSqlJobOrCDC() && newFlinkSql !=
null) {
+ if (app.isJobTypeFlinkSqlOrCDC() && newFlinkSql !=
null) {
backUpService.backup(app, newFlinkSql);
} else {
backUpService.backup(app, null);
@@ -467,7 +468,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
- if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum()
+ if (FlinkJobType.FLINK_JAR == app.getJobTypeEnum()
&& APACHE_FLINK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
@@ -574,7 +575,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
app.getLocalAppHome(),
mainClass,
flinkUserJar,
- app.isCustomCodeJob(),
+ app.isJobTypeFlinkJar(),
app.getDeployModeEnum(),
app.getJobTypeEnum(),
flinkEnv.getFlinkVersion(),
@@ -586,7 +587,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
*/
private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication
app) {
switch (app.getJobTypeEnum()) {
- case CUSTOM_CODE:
+ case FLINK_JAR:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
return String.format(
@@ -595,7 +596,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
return String.format("%s/%s", app.getAppHome(),
app.getJar());
default:
throw new IllegalArgumentException(
- "[StreamPark] unsupported ApplicationType of
custom code: "
+ "[StreamPark] unsupported ApplicationType of
FlinkJar: "
+ app.getApplicationType());
}
case PYFLINK:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
index 8eb6ef3a2..a6447f9ec 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
@@ -99,7 +99,7 @@ public class FlinkApplicationConfigServiceImpl
public synchronized void update(FlinkApplication appParam, Boolean latest)
{
// flink sql job
FlinkApplicationConfig latestConfig = getLatest(appParam.getId());
- if (appParam.isFlinkSqlJobOrCDC()) {
+ if (appParam.isJobTypeFlinkSqlOrCDC()) {
updateForFlinkSqlJob(appParam, latest, latestConfig);
} else {
updateForNonFlinkSqlJob(appParam, latest, latestConfig);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
index f1bba8c61..a4ed4dbdf 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
@@ -20,6 +20,7 @@ package
org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkDeployMode;
+import org.apache.streampark.common.enums.FlinkJobType;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.util.DeflaterUtils;
@@ -162,7 +163,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
if (config != null) {
this.configService.toEffective(appParam.getId(), config.getId());
}
- if (appParam.isFlinkSqlJobOrCDC()) {
+ if (appParam.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(),
null);
if (flinkSql != null) {
flinkSqlService.toEffective(appParam.getId(),
flinkSql.getId());
@@ -347,7 +348,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(),
appParam.getTeamId()));
appParam.doSetHotParams();
- if (appParam.isUploadJob()) {
+ if (appParam.isResourceFromUpload()) {
String jarPath = String.format(
"%s/%d/%s", Workspace.local().APP_UPLOADS(),
appParam.getTeamId(), appParam.getJar());
if (!new File(jarPath).exists()) {
@@ -365,7 +366,9 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
boolean saveSuccess = save(appParam);
if (saveSuccess) {
- if (appParam.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
+ FlinkJobType jobType = appParam.getJobTypeEnum();
+ if (jobType == FlinkJobType.FLINK_SQL || jobType ==
FlinkJobType.PYFLINK
+ || jobType == FlinkJobType.FLINK_CDC) {
FlinkSql flinkSql = new FlinkSql(appParam);
flinkSqlService.create(flinkSql);
}
@@ -449,7 +452,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
boolean saved = save(newApp);
if (saved) {
- if (newApp.isFlinkSqlJobOrCDC()) {
+ if (newApp.isJobTypeFlinkSqlOrCDC()) {
FlinkSql copyFlinkSql =
flinkSqlService.getLatestFlinkSql(appParam.getId(), true);
newApp.setFlinkSql(copyFlinkSql.getSql());
newApp.setDependency(copyFlinkSql.getDependency());
@@ -500,7 +503,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
application.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
// 1) jar job jar file changed
- if (application.isUploadJob()) {
+ if (application.isResourceFromUpload()) {
if (!Objects.equals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
@@ -584,12 +587,12 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
}
// Flink Sql job...
- if (application.isFlinkSqlJobOrCDC()) {
+ if (application.isJobTypeFlinkSqlOrCDC()) {
updateFlinkSqlJob(application, appParam);
return true;
}
- if (application.isStreamParkJob()) {
+ if (application.isAppTypeStreamPark()) {
configService.update(appParam, application.isRunning());
} else {
application.setJar(appParam.getJar());
@@ -721,7 +724,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
this.update(update);
// backup
- if (appParam.isFlinkSqlJobOrCDC()) {
+ if (appParam.isJobTypeFlinkSqlOrCDC()) {
FlinkSql newFlinkSql =
flinkSqlService.getCandidate(appParam.getId(), CandidateTypeEnum.NEW);
if (!appParam.isNeedRollback() && newFlinkSql != null) {
backUpService.backup(appParam, newFlinkSql);
@@ -752,7 +755,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
if (config != null) {
config.setToApplication(application);
}
- if (application.isFlinkSqlJobOrCDC()) {
+ if (application.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), true);
if (flinkSql == null) {
flinkSql = flinkSqlService.getCandidate(application.getId(),
CandidateTypeEnum.NEW);
@@ -760,7 +763,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
}
flinkSql.setToApplication(application);
} else {
- if (application.isCICDJob()) {
+ if (application.isResourceFromBuild()) {
String path =
this.projectService.getAppConfPath(application.getProjectId(),
application.getModule());
application.setConfPath(path);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
index 4a9b24d50..4c5114abd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java
@@ -487,7 +487,7 @@ public class SparkApplicationActionServiceImpl
break;
case SPARK_JAR:
- if (application.isUploadJob()) {
+ if (application.isFromUploadJob()) {
appConf = applicationConfig == null
? null
: String.format("yaml://%s",
applicationConfig.getContent());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java
index 1a62563a8..912d13b4b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java
@@ -189,7 +189,7 @@ public class SparkApplicationBackupServiceImpl
@Override
public void backup(SparkApplication appParam, SparkSql sparkSqlParam) {
// basic configuration file backup
- String appHome = (appParam.isCICDJob())
+ String appHome = (appParam.isFromBuildJob())
? appParam.getDistHome()
: appParam.getAppHome();
FsOperator fsOperator = appParam.getFsOperator();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
index 1ab5410e8..7764d08cb 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java
@@ -214,7 +214,7 @@ public class SparkApplicationBuildPipelineServiceImpl
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
- if (app.isUploadJob()) {
+ if (app.isFromUploadJob()) {
String uploadJar =
appUploads.concat("/").concat(app.getJar());
File localJar = new File(
String.format(
@@ -244,7 +244,7 @@ public class SparkApplicationBuildPipelineServiceImpl
break;
default:
throw new IllegalArgumentException(
- "[StreamPark] unsupported
ApplicationType of custom code: "
+ "[StreamPark] unsupported
ApplicationType of FlinkJar: "
+ app.getApplicationType());
}
} else {
@@ -453,7 +453,7 @@ public class SparkApplicationBuildPipelineServiceImpl
return String.format("%s/%s", app.getAppHome(),
app.getJar());
default:
throw new IllegalArgumentException(
- "[StreamPark] unsupported ApplicationType of
custom code: "
+ "[StreamPark] unsupported ApplicationType of
FlinkJar: "
+ app.getApplicationType());
}
case PYSPARK:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
index 635afa2dd..e2e00b422 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java
@@ -280,7 +280,7 @@ public class SparkApplicationManageServiceImpl
appParam.setMainClass(Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS);
}
}
- if (appParam.isUploadJob()) {
+ if (appParam.isFromUploadJob()) {
String jarPath = String.format(
"%s/%d/%s", Workspace.local().APP_UPLOADS(),
appParam.getTeamId(), appParam.getJar());
if (!new File(jarPath).exists()) {
@@ -412,7 +412,7 @@ public class SparkApplicationManageServiceImpl
application.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
// 1) jar job jar file changed
- if (application.isUploadJob()) {
+ if (application.isFromUploadJob()) {
if (!Objects.equals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
@@ -640,7 +640,7 @@ public class SparkApplicationManageServiceImpl
}
sparkSql.setToApplication(application);
} else {
- if (application.isCICDJob()) {
+ if (application.isFromBuildJob()) {
String path =
this.projectService.getAppConfPath(application.getProjectId(),
application.getModule());
application.setConfPath(path);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index 50afd4749..277f9dc87 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -342,7 +342,7 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
@VisibleForTesting
@Nullable
public String getSavepointFromConfig(FlinkApplication application) {
- if (!application.isStreamParkJob() &&
!application.isFlinkSqlJobOrCDC()) {
+ if (!application.isAppTypeStreamPark() &&
!application.isJobTypeFlinkSqlOrCDC()) {
return null;
}
FlinkApplicationConfig applicationConfig =
configService.getEffective(application.getId());
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
index d8f1b64fe..d857cad66 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql
@@ -557,7 +557,7 @@ create table if not exists `t_spark_env` (
create table if not exists `t_spark_app` (
`id` bigint generated by default as identity not null,
`team_id` bigint not null,
- `job_type` tinyint default null comment '(1)custom code(2)spark SQL',
+ `job_type` tinyint default null comment '(1) Spark jar (2) Spark SQL',
`app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark',
`version_id` bigint default null comment 'spark version',
`app_name` varchar(255) default null comment 'spark.app.name',
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
index 66f43f940..14cbc3fdb 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java
@@ -104,14 +104,14 @@ class FlinkSavepointServiceTest extends
SpringUnitTestBase {
app.setAppType(ApplicationType.APACHE_FLINK.getType());
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
- app.setJobType(FlinkJobType.CUSTOM_CODE.getMode());
+ app.setJobType(FlinkJobType.FLINK_JAR.getMode());
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) without application
config.
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
- app.setJobType(FlinkJobType.CUSTOM_CODE.getMode());
+ app.setJobType(FlinkJobType.FLINK_JAR.getMode());
assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) with application config
just disabled checkpoint.
diff --git
a/streampark-console/streampark-console-webapp/src/assets/icons/fjar.svg
b/streampark-console/streampark-console-webapp/src/assets/icons/fjar.svg
new file mode 100644
index 000000000..fc152c4be
--- /dev/null
+++ b/streampark-console/streampark-console-webapp/src/assets/icons/fjar.svg
@@ -0,0 +1 @@
+<?xml version="1.0" standalone="no"?><!DOCTYPE svg PUBLIC "-//W3C//DTD SVG
1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"><svg
t="1737299962791" class="icon" viewBox="0 0 1024 1024" version="1.1"
xmlns="http://www.w3.org/2000/svg" p-id="27402"
xmlns:xlink="http://www.w3.org/1999/xlink" width="200" height="200"><path
d="M960 32H64a31.36 31.36 0 0 0-32 32v896a31.36 31.36 0 0 0 32 32h896a31.36
31.36 0 0 0 32-32V64a31.36 31.36 0 0 0-32-32zM928 896a31.36 31.36 0 0 1-30.72
30.72H1 [...]
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue
index d453e848f..4ef249491 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue
@@ -100,7 +100,7 @@
:height="600"
>
<template #title>
- <SvgIcon name="code" style="color: red" />
+ <SvgIcon name="fjar" style="color: red" />
{{ t('flink.app.detail.exceptionModal.title') }}
</template>
<div class="startExp h-540px" ref="startExp"></div>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
index aee124287..3da83a68e 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts
@@ -42,7 +42,7 @@ export const getDescSchema = (): DescItem[] => {
'div',
{ class: 'bold-tag' },
h(Tag, { color: curVal === 1 ? '#545454' : '#0C7EF2', class:
'mr-8px' }, () =>
- curVal === 1 ? 'Custom Code' : 'Flink SQL',
+ curVal === 1 ? 'Flink JAR' : 'Flink SQL',
),
),
},
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
index bf9d7e897..c65ca155f 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
@@ -232,7 +232,7 @@ export const useAppTableAction = (
sessionStorage.setItem('appPageNo', String(currentPageNo || 1));
flinkAppStore.setApplicationId(app.id);
if (app.appType == AppTypeEnum.STREAMPARK_FLINK) {
- // jobType( 1 custom code 2: flinkSQL)
+ // jobType( 1 flinkJar 2: flinkSQL)
router.push({ path: '/flink/app/edit_streampark', query: { appId: app.id
} });
} else if (app.appType == AppTypeEnum.APACHE_FLINK) {
//Apache Flink
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 59e6d4bed..62f20eab5 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -510,7 +510,7 @@ export const useCreateAndEditSchema = (
icon: 'ant-design:code-outlined',
style: { color: '#108ee9' },
}),
- h('span', { class: 'pl-8px' }, 'Custom Code'),
+ h('span', { class: 'pl-8px' }, 'Flink JAR'),
],
},
);
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 3166e6fac..59bdad48a 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -39,8 +39,8 @@ const getJobTypeOptions = () => {
return [
{
label: h('div', {}, [
- h(SvgIcon, { name: 'code', color: '#108ee9' }, ''),
- h('span', { class: 'pl-10px' }, 'Custom Code'),
+ h(SvgIcon, { name: 'fjar', color: '#108ee9' }, ''),
+ h('span', { class: 'pl-10px' }, 'Flink JAR'),
]),
value: String(JobTypeEnum.JAR),
},
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
index 85b5943ff..b9f4ed1e7 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java
@@ -96,9 +96,9 @@ public final class ApplicationForm {
new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION)
.until(ExpectedConditions.visibilityOfAllElements(selectJobType));
switch (jobType) {
- case CUSTOM_CODE:
+ case FLINK_JAR:
selectJobType.stream()
- .filter(e ->
e.getText().equalsIgnoreCase(FlinkJobType.CUSTOM_CODE.desc))
+ .filter(e ->
e.getText().equalsIgnoreCase(FlinkJobType.FLINK_JAR.desc))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException(
@@ -250,7 +250,7 @@ public final class ApplicationForm {
@Getter
public enum FlinkJobType {
- CUSTOM_CODE("custom code"),
+ FLINK_JAR("flink jar"),
FLINK_SQL("flink sql"),
PYTHON_FLINK("python flink");