This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch flink_cdc in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 73637dbcebe4fb6b30ab5f609366912467becb0b Author: benjobs <[email protected]> AuthorDate: Sun Jan 19 23:22:27 2025 +0800 [Improve] job type custom_code rename to flink_jar --- .../streampark/common/enums/FlinkJobType.java | 4 +- .../main/assembly/script/schema/mysql-schema.sql | 2 +- .../console/core/entity/FlinkApplication.java | 88 +++++++--------------- .../console/core/entity/SparkApplication.java | 8 +- .../console/core/enums/ResourceFromEnum.java | 4 +- .../impl/FlinkApplicationActionServiceImpl.java | 8 +- .../impl/FlinkApplicationBackupServiceImpl.java | 6 +- .../FlinkApplicationBuildPipelineServiceImpl.java | 27 +++---- .../impl/FlinkApplicationConfigServiceImpl.java | 2 +- .../impl/FlinkApplicationManageServiceImpl.java | 23 +++--- .../impl/SparkApplicationActionServiceImpl.java | 2 +- .../impl/SparkApplicationBackupServiceImpl.java | 2 +- .../SparkApplicationBuildPipelineServiceImpl.java | 6 +- .../impl/SparkApplicationManageServiceImpl.java | 6 +- .../service/impl/FlinkSavepointServiceImpl.java | 2 +- .../src/main/resources/db/schema-h2.sql | 2 +- .../core/service/FlinkSavepointServiceTest.java | 4 +- .../src/assets/icons/fjar.svg | 1 + .../app/components/AppDetail/ExecOptionModal.vue | 2 +- .../src/views/flink/app/data/detail.data.ts | 2 +- .../src/views/flink/app/hooks/useAppTableAction.ts | 2 +- .../flink/app/hooks/useCreateAndEditSchema.ts | 2 +- .../src/views/flink/app/hooks/useCreateSchema.ts | 4 +- .../pages/flink/applications/ApplicationForm.java | 6 +- 24 files changed, 93 insertions(+), 122 deletions(-) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java index 16c63952e..2c50978ef 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java @@ -31,9 +31,9 @@ public enum FlinkJobType { UNKNOWN("Unknown", -1), /** - * custom code + * Flink Jar */ - CUSTOM_CODE("Custom Code", 1), + FLINK_JAR("Flink JAR", 1), /** * Flink SQL diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index a0885a319..25cc8c9ca 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -593,7 +593,7 @@ drop table if exists `t_spark_app`; create table `t_spark_app` ( `id` bigint not null auto_increment, `team_id` bigint not null, - `job_type` tinyint default null comment '(1)custom code(2)spark SQL', + `job_type` tinyint default null comment '(1) spark Jar(2) spark SQL', `app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark', `version_id` bigint default null comment 'spark version', `app_name` varchar(255) collate utf8mb4_general_ci default null comment 'spark.app.name', diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java index fcb787b95..a88cc25a9 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java @@ -46,7 +46,6 @@ import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; -import com.baomidou.mybatisplus.core.toolkit.support.SFunction; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Getter; import lombok.Setter; @@ -72,7 +71,7 @@ public class FlinkApplication extends BaseEntity { private Long teamId; /** - * 1) custom code 2) flink SQL + * 1) flink jar 2) flink SQL */ private Integer jobType; @@ -130,6 +129,7 @@ public class FlinkApplication extends BaseEntity { @Getter private String ingressTemplate; + @Setter private String defaultModeIngress; @@ -261,7 +261,7 @@ public class FlinkApplication extends BaseEntity { private Date modifyTime; /** - * 1: cicd (build from csv) 2: upload (upload local jar job) + * 1: build (build from csv) 2: upload (upload local jar job) */ private Integer resourceFrom; @@ -420,15 +420,6 @@ public class FlinkApplication extends BaseEntity { && this.cpFailureAction != null; } - public boolean eqFlinkJob(FlinkApplication other) { - if (this.isFlinkSqlJobOrCDC() - && other.isFlinkSqlJobOrCDC() - && this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) { - return this.getDependencyObject().equals(other.getDependencyObject()); - } - return false; - } - /** * Local compilation and packaging working directory */ @@ -475,18 +466,19 @@ public class FlinkApplication extends BaseEntity { } public String getMainClass() { - FlinkJobType flinkJobType = FlinkJobType.of(jobType); - if (flinkJobType == FlinkJobType.FLINK_SQL) { - return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS; - } else if (flinkJobType == FlinkJobType.FLINK_CDC) { - return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS; - } else if (flinkJobType == FlinkJobType.PYFLINK) { - return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; // Assuming this is the default behavior for other enum - // values - } else if (flinkJobType == FlinkJobType.CUSTOM_CODE) { - return mainClass; - } else { - return null; + FlinkJobType flinkJobType = this.getJobTypeEnum(); + switch (flinkJobType) { + case FLINK_SQL: + return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS; + case FLINK_CDC: + return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS; + case PYFLINK: + return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; + case FLINK_JAR: + return mainClass; + case UNKNOWN: + default: + return null; } } @@ -513,42 +505,35 @@ public class FlinkApplication extends BaseEntity { } @JsonIgnore - public boolean isFlinkSqlJobOrCDC() { + public boolean isJobTypeFlinkSqlOrCDC() { return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) || FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType()); } @JsonIgnore - public boolean isFlinkSqlJobOrPyFlinkJobOrFlinkCDC() { - return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) - || FlinkJobType.PYFLINK.getMode().equals(this.getJobType()) - || FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType()); + public boolean isJobTypeFlinkJar() { + return FlinkJobType.FLINK_JAR.getMode().equals(this.getJobType()); } @JsonIgnore - public boolean isCustomCodeJob() { - return FlinkJobType.CUSTOM_CODE.getMode().equals(this.getJobType()); - } - - @JsonIgnore - public boolean isCustomCodeOrPyFlinkJob() { - return FlinkJobType.CUSTOM_CODE.getMode().equals(this.getJobType()) + public boolean isJobTypeFlinkJarOrPyFlink() { + return FlinkJobType.FLINK_JAR.getMode().equals(this.getJobType()) || FlinkJobType.PYFLINK.getMode().equals(this.getJobType()); } @JsonIgnore - public boolean isUploadJob() { - return isCustomCodeOrPyFlinkJob() + public boolean isResourceFromUpload() { + return isJobTypeFlinkJarOrPyFlink() && ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom()); } @JsonIgnore - public boolean isCICDJob() { - return isCustomCodeOrPyFlinkJob() - && ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom()); + public boolean isResourceFromBuild() { + return isJobTypeFlinkJarOrPyFlink() + && ResourceFromEnum.BUILD.getValue().equals(this.getResourceFrom()); } - public boolean isStreamParkJob() { + public boolean isAppTypeStreamPark() { return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType(); } @@ -673,23 +658,4 @@ public class FlinkApplication extends BaseEntity { return FlinkDeployMode.isKubernetesMode(this.getDeployModeEnum()); } - public static class SFunc { - - public static final SFunction<FlinkApplication, Long> ID = FlinkApplication::getId; - public static final SFunction<FlinkApplication, String> JOB_ID = FlinkApplication::getJobId; - public static final SFunction<FlinkApplication, Date> START_TIME = FlinkApplication::getStartTime; - public static final SFunction<FlinkApplication, Date> END_TIME = FlinkApplication::getEndTime; - public static final SFunction<FlinkApplication, Long> DURATION = FlinkApplication::getDuration; - public static final SFunction<FlinkApplication, Integer> TOTAL_TASK = FlinkApplication::getTotalTask; - public static final SFunction<FlinkApplication, Integer> TOTAL_TM = FlinkApplication::getTotalTM; - public static final SFunction<FlinkApplication, Integer> TOTAL_SLOT = FlinkApplication::getTotalSlot; - public static final SFunction<FlinkApplication, Integer> JM_MEMORY = FlinkApplication::getJmMemory; - public static final SFunction<FlinkApplication, Integer> TM_MEMORY = FlinkApplication::getTmMemory; - public static final SFunction<FlinkApplication, Integer> STATE = FlinkApplication::getState; - public static final SFunction<FlinkApplication, String> OPTIONS = FlinkApplication::getOptions; - public static final SFunction<FlinkApplication, Integer> AVAILABLE_SLOT = FlinkApplication::getAvailableSlot; - public static final SFunction<FlinkApplication, Integer> EXECUTION_MODE = FlinkApplication::getDeployMode; - public static final SFunction<FlinkApplication, String> JOB_MANAGER_URL = FlinkApplication::getJobManagerUrl; - } - } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java index 56894baa9..80937fdf5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java @@ -81,7 +81,7 @@ public class SparkApplication extends BaseEntity { private Integer deployMode; - /** 1: cicd (build from csv) 2: upload (upload local jar job) */ + /** 1: build (build from csv) 2: upload (upload local jar job) */ private Integer resourceFrom; private Long projectId; @@ -426,15 +426,15 @@ public class SparkApplication extends BaseEntity { } @JsonIgnore - public boolean isUploadJob() { + public boolean isFromUploadJob() { return isSparkJarOrPySparkJob() && ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom()); } @JsonIgnore - public boolean isCICDJob() { + public boolean isFromBuildJob() { return isSparkJarOrPySparkJob() - && ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom()); + && ResourceFromEnum.BUILD.getValue().equals(this.getResourceFrom()); } public boolean isStreamParkJob() { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java index c657e7e62..7159f3c1c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/ResourceFromEnum.java @@ -24,8 +24,8 @@ import java.util.Arrays; @Getter public enum ResourceFromEnum { - /** cicd(build from cvs) */ - CICD(1), + /** build from cvs */ + BUILD(1), /** upload local jar */ UPLOAD(2); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index 717f90b55..3c6c43b6d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -216,7 +216,7 @@ public class FlinkApplicationActionServiceImpl // 3) restore related status LambdaUpdateWrapper<FlinkApplication> updateWrapper = Wrappers.lambdaUpdate(); updateWrapper.eq(FlinkApplication::getId, application.getId()); - if (application.isFlinkSqlJobOrCDC()) { + if (application.isJobTypeFlinkSqlOrCDC()) { updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.FAILED.get()); } else { updateWrapper.set(FlinkApplication::getRelease, ReleaseStateEnum.NEED_RELEASE.get()); @@ -452,7 +452,7 @@ public class FlinkApplicationActionServiceImpl applicationManageService.toEffective(application); Map<String, Object> extraParameter = new HashMap<>(0); - if (application.isFlinkSqlJobOrCDC()) { + if (application.isJobTypeFlinkSqlOrCDC()) { FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); // Get the sql of the replaced placeholder String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql()); @@ -725,8 +725,8 @@ public class FlinkApplicationActionServiceImpl flinkUserJar = resource.getFilePath(); break; - case CUSTOM_CODE: - if (application.isUploadJob()) { + case FLINK_JAR: + if (application.isResourceFromUpload()) { appConf = String.format( "json://{\"%s\":\"%s\"}", diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java index 38d1554bd..2b5fc93dd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java @@ -89,7 +89,7 @@ public class FlinkApplicationBackupServiceImpl // If necessary, perform the backup first if (bakParam.isBackup()) { application.setBackUpDescription(bakParam.getDescription()); - if (application.isFlinkSqlJobOrCDC()) { + if (application.isJobTypeFlinkSqlOrCDC()) { FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false); backup(application, flinkSql); } else { @@ -107,7 +107,7 @@ public class FlinkApplicationBackupServiceImpl effectiveService.saveOrUpdate( bakParam.getAppId(), EffectiveTypeEnum.CONFIG, bakParam.getId()); // if flink sql task, will be rollback sql and dependencies - if (application.isFlinkSqlJobOrCDC()) { + if (application.isJobTypeFlinkSqlOrCDC()) { effectiveService.saveOrUpdate( bakParam.getAppId(), EffectiveTypeEnum.FLINKSQL, bakParam.getSqlId()); } @@ -190,7 +190,7 @@ public class FlinkApplicationBackupServiceImpl @Override public void backup(FlinkApplication appParam, FlinkSql flinkSqlParam) { // basic configuration file backup - String appHome = (appParam.isCustomCodeJob() && appParam.isCICDJob()) + String appHome = (appParam.isJobTypeFlinkJar() && appParam.isResourceFromBuild()) ? appParam.getDistHome() : appParam.getAppHome(); FsOperator fsOperator = appParam.getFsOperator(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java index f930fb3cc..066aea636 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java @@ -192,14 +192,15 @@ public class FlinkApplicationBuildPipelineServiceImpl return true; } // rollback - if (app.isNeedRollback() && app.isFlinkSqlJobOrCDC()) { + if (app.isNeedRollback() && app.isJobTypeFlinkSqlOrCDC()) { flinkSqlService.rollback(app); } // 1) flink sql setDependency FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateTypeEnum.NEW); FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false); - if (app.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) { + FlinkJobType jobType = app.getJobTypeEnum(); + if (jobType == FlinkJobType.FLINK_SQL || jobType == FlinkJobType.PYFLINK || jobType == FlinkJobType.FLINK_CDC) { FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql; AssertUtils.notNull(flinkSql); app.setDependency(flinkSql.getDependency()); @@ -235,12 +236,12 @@ public class FlinkApplicationBuildPipelineServiceImpl // 2) some preparatory work String appUploads = app.getWorkspace().APP_UPLOADS(); - if (app.isCustomCodeOrPyFlinkJob()) { - // customCode upload jar to appHome... + if (app.isJobTypeFlinkJarOrPyFlink()) { + // flinkJar upload jar to appHome... String appHome = app.getAppHome(); FsOperator fsOperator = app.getFsOperator(); fsOperator.delete(appHome); - if (app.isUploadJob()) { + if (app.isResourceFromUpload()) { String uploadJar = appUploads.concat("/").concat(app.getJar()); File localJar = new File( String.format( @@ -274,7 +275,7 @@ public class FlinkApplicationBuildPipelineServiceImpl break; default: throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " + "[StreamPark] unsupported ApplicationType of FlinkJar: " + app.getApplicationType()); } } else { @@ -324,10 +325,10 @@ public class FlinkApplicationBuildPipelineServiceImpl // If the current task is not running, or the task has just been added, directly // set // the candidate version to the official version - if (app.isFlinkSqlJobOrCDC()) { + if (app.isJobTypeFlinkSqlOrCDC()) { applicationManageService.toEffective(app); } else { - if (app.isStreamParkJob()) { + if (app.isAppTypeStreamPark()) { FlinkApplicationConfig config = applicationConfigService.getLatest(app.getId()); if (config != null) { @@ -340,7 +341,7 @@ public class FlinkApplicationBuildPipelineServiceImpl } // backup. if (!app.isNeedRollback()) { - if (app.isFlinkSqlJobOrCDC() && newFlinkSql != null) { + if (app.isJobTypeFlinkSqlOrCDC() && newFlinkSql != null) { backUpService.backup(app, newFlinkSql); } else { backUpService.backup(app, null); @@ -467,7 +468,7 @@ public class FlinkApplicationBuildPipelineServiceImpl case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); String localWorkspace = app.getLocalAppHome().concat("/lib"); - if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum() + if (FlinkJobType.FLINK_JAR == app.getJobTypeEnum() && APACHE_FLINK == app.getApplicationType()) { yarnProvidedPath = app.getAppHome(); localWorkspace = app.getLocalAppHome(); @@ -574,7 +575,7 @@ public class FlinkApplicationBuildPipelineServiceImpl app.getLocalAppHome(), mainClass, flinkUserJar, - app.isCustomCodeJob(), + app.isJobTypeFlinkJar(), app.getDeployModeEnum(), app.getJobTypeEnum(), flinkEnv.getFlinkVersion(), @@ -586,7 +587,7 @@ public class FlinkApplicationBuildPipelineServiceImpl */ private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication app) { switch (app.getJobTypeEnum()) { - case CUSTOM_CODE: + case FLINK_JAR: switch (app.getApplicationType()) { case STREAMPARK_FLINK: return String.format( @@ -595,7 +596,7 @@ public class FlinkApplicationBuildPipelineServiceImpl return String.format("%s/%s", app.getAppHome(), app.getJar()); default: throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " + "[StreamPark] unsupported ApplicationType of FlinkJar: " + app.getApplicationType()); } case PYFLINK: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java index 8eb6ef3a2..a6447f9ec 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java @@ -99,7 +99,7 @@ public class FlinkApplicationConfigServiceImpl public synchronized void update(FlinkApplication appParam, Boolean latest) { // flink sql job FlinkApplicationConfig latestConfig = getLatest(appParam.getId()); - if (appParam.isFlinkSqlJobOrCDC()) { + if (appParam.isJobTypeFlinkSqlOrCDC()) { updateForFlinkSqlJob(appParam, latest, latestConfig); } else { updateForNonFlinkSqlJob(appParam, latest, latestConfig); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java index f1bba8c61..a4ed4dbdf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java @@ -20,6 +20,7 @@ package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkDeployMode; +import org.apache.streampark.common.enums.FlinkJobType; import org.apache.streampark.common.enums.StorageType; import org.apache.streampark.common.fs.HdfsOperator; import org.apache.streampark.common.util.DeflaterUtils; @@ -162,7 +163,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat if (config != null) { this.configService.toEffective(appParam.getId(), config.getId()); } - if (appParam.isFlinkSqlJobOrCDC()) { + if (appParam.isJobTypeFlinkSqlOrCDC()) { FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(), null); if (flinkSql != null) { flinkSqlService.toEffective(appParam.getId(), flinkSql.getId()); @@ -347,7 +348,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), appParam.getTeamId())); appParam.doSetHotParams(); - if (appParam.isUploadJob()) { + if (appParam.isResourceFromUpload()) { String jarPath = String.format( "%s/%d/%s", Workspace.local().APP_UPLOADS(), appParam.getTeamId(), appParam.getJar()); if (!new File(jarPath).exists()) { @@ -365,7 +366,9 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat boolean saveSuccess = save(appParam); if (saveSuccess) { - if (appParam.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) { + FlinkJobType jobType = appParam.getJobTypeEnum(); + if (jobType == FlinkJobType.FLINK_SQL || jobType == FlinkJobType.PYFLINK + || jobType == FlinkJobType.FLINK_CDC) { FlinkSql flinkSql = new FlinkSql(appParam); flinkSqlService.create(flinkSql); } @@ -449,7 +452,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat boolean saved = save(newApp); if (saved) { - if (newApp.isFlinkSqlJobOrCDC()) { + if (newApp.isJobTypeFlinkSqlOrCDC()) { FlinkSql copyFlinkSql = flinkSqlService.getLatestFlinkSql(appParam.getId(), true); newApp.setFlinkSql(copyFlinkSql.getSql()); newApp.setDependency(copyFlinkSql.getDependency()); @@ -500,7 +503,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat application.setRelease(ReleaseStateEnum.NEED_RELEASE.get()); // 1) jar job jar file changed - if (application.isUploadJob()) { + if (application.isResourceFromUpload()) { if (!Objects.equals(application.getJar(), appParam.getJar())) { application.setBuild(true); } else { @@ -584,12 +587,12 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat } // Flink Sql job... - if (application.isFlinkSqlJobOrCDC()) { + if (application.isJobTypeFlinkSqlOrCDC()) { updateFlinkSqlJob(application, appParam); return true; } - if (application.isStreamParkJob()) { + if (application.isAppTypeStreamPark()) { configService.update(appParam, application.isRunning()); } else { application.setJar(appParam.getJar()); @@ -721,7 +724,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat this.update(update); // backup - if (appParam.isFlinkSqlJobOrCDC()) { + if (appParam.isJobTypeFlinkSqlOrCDC()) { FlinkSql newFlinkSql = flinkSqlService.getCandidate(appParam.getId(), CandidateTypeEnum.NEW); if (!appParam.isNeedRollback() && newFlinkSql != null) { backUpService.backup(appParam, newFlinkSql); @@ -752,7 +755,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat if (config != null) { config.setToApplication(application); } - if (application.isFlinkSqlJobOrCDC()) { + if (application.isJobTypeFlinkSqlOrCDC()) { FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); if (flinkSql == null) { flinkSql = flinkSqlService.getCandidate(application.getId(), CandidateTypeEnum.NEW); @@ -760,7 +763,7 @@ public class FlinkApplicationManageServiceImpl extends ServiceImpl<FlinkApplicat } flinkSql.setToApplication(application); } else { - if (application.isCICDJob()) { + if (application.isResourceFromBuild()) { String path = this.projectService.getAppConfPath(application.getProjectId(), application.getModule()); application.setConfPath(path); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java index 4a9b24d50..4c5114abd 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java @@ -487,7 +487,7 @@ public class SparkApplicationActionServiceImpl break; case SPARK_JAR: - if (application.isUploadJob()) { + if (application.isFromUploadJob()) { appConf = applicationConfig == null ? null : String.format("yaml://%s", applicationConfig.getContent()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java index 1a62563a8..912d13b4b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBackupServiceImpl.java @@ -189,7 +189,7 @@ public class SparkApplicationBackupServiceImpl @Override public void backup(SparkApplication appParam, SparkSql sparkSqlParam) { // basic configuration file backup - String appHome = (appParam.isCICDJob()) + String appHome = (appParam.isFromBuildJob()) ? appParam.getDistHome() : appParam.getAppHome(); FsOperator fsOperator = appParam.getFsOperator(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java index 1ab5410e8..7764d08cb 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationBuildPipelineServiceImpl.java @@ -214,7 +214,7 @@ public class SparkApplicationBuildPipelineServiceImpl String appHome = app.getAppHome(); FsOperator fsOperator = app.getFsOperator(); fsOperator.delete(appHome); - if (app.isUploadJob()) { + if (app.isFromUploadJob()) { String uploadJar = appUploads.concat("/").concat(app.getJar()); File localJar = new File( String.format( @@ -244,7 +244,7 @@ public class SparkApplicationBuildPipelineServiceImpl break; default: throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " + "[StreamPark] unsupported ApplicationType of FlinkJar: " + app.getApplicationType()); } } else { @@ -453,7 +453,7 @@ public class SparkApplicationBuildPipelineServiceImpl return String.format("%s/%s", app.getAppHome(), app.getJar()); default: throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " + "[StreamPark] unsupported ApplicationType of FlinkJar: " + app.getApplicationType()); } case PYSPARK: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java index 635afa2dd..e2e00b422 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationManageServiceImpl.java @@ -280,7 +280,7 @@ public class SparkApplicationManageServiceImpl appParam.setMainClass(Constants.STREAMPARK_SPARKSQL_CLIENT_CLASS); } } - if (appParam.isUploadJob()) { + if (appParam.isFromUploadJob()) { String jarPath = String.format( "%s/%d/%s", Workspace.local().APP_UPLOADS(), appParam.getTeamId(), appParam.getJar()); if (!new File(jarPath).exists()) { @@ -412,7 +412,7 @@ public class SparkApplicationManageServiceImpl application.setRelease(ReleaseStateEnum.NEED_RELEASE.get()); // 1) jar job jar file changed - if (application.isUploadJob()) { + if (application.isFromUploadJob()) { if (!Objects.equals(application.getJar(), appParam.getJar())) { application.setBuild(true); } else { @@ -640,7 +640,7 @@ public class SparkApplicationManageServiceImpl } sparkSql.setToApplication(application); } else { - if (application.isCICDJob()) { + if (application.isFromBuildJob()) { String path = this.projectService.getAppConfPath(application.getProjectId(), application.getModule()); application.setConfPath(path); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java index 50afd4749..277f9dc87 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java @@ -342,7 +342,7 @@ public class FlinkSavepointServiceImpl extends ServiceImpl<FlinkSavepointMapper, @VisibleForTesting @Nullable public String getSavepointFromConfig(FlinkApplication application) { - if (!application.isStreamParkJob() && !application.isFlinkSqlJobOrCDC()) { + if (!application.isAppTypeStreamPark() && !application.isJobTypeFlinkSqlOrCDC()) { return null; } FlinkApplicationConfig applicationConfig = configService.getEffective(application.getId()); diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index d8f1b64fe..d857cad66 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -557,7 +557,7 @@ create table if not exists `t_spark_env` ( create table if not exists `t_spark_app` ( `id` bigint generated by default as identity not null, `team_id` bigint not null, - `job_type` tinyint default null comment '(1)custom code(2)spark SQL', + `job_type` tinyint default null comment '(1) Spark jar (2) Spark SQL', `app_type` tinyint default null comment '(1)Apache Spark(2)StreamPark Spark', `version_id` bigint default null comment 'spark version', `app_name` varchar(255) default null comment 'spark.app.name', diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java index 66f43f940..14cbc3fdb 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/FlinkSavepointServiceTest.java @@ -104,14 +104,14 @@ class FlinkSavepointServiceTest extends SpringUnitTestBase { app.setAppType(ApplicationType.APACHE_FLINK.getType()); assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); app.setAppType(ApplicationType.STREAMPARK_FLINK.getType()); - app.setJobType(FlinkJobType.CUSTOM_CODE.getMode()); + app.setJobType(FlinkJobType.FLINK_JAR.getMode()); assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); // Test for (StreamPark job Or FlinkSQL job) without application config. app.setAppType(ApplicationType.STREAMPARK_FLINK.getType()); assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); app.setAppType(ApplicationType.STREAMPARK_FLINK.getType()); - app.setJobType(FlinkJobType.CUSTOM_CODE.getMode()); + app.setJobType(FlinkJobType.FLINK_JAR.getMode()); assertThat(savepointServiceImpl.getSavepointFromConfig(app)).isNull(); // Test for (StreamPark job Or FlinkSQL job) with application config just disabled checkpoint. diff --git a/streampark-console/streampark-console-webapp/src/assets/icons/fjar.svg b/streampark-console/streampark-console-webapp/src/assets/icons/fjar.svg new file mode 100644 index 000000000..fc152c4be --- /dev/null +++ b/streampark-console/streampark-console-webapp/src/assets/icons/fjar.svg @@ -0,0 +1 @@ +<?xml version="1.0" standalone="no"?><!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"><svg t="1737299962791" class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="27402" xmlns:xlink="http://www.w3.org/1999/xlink" width="200" height="200"><path d="M960 32H64a31.36 31.36 0 0 0-32 32v896a31.36 31.36 0 0 0 32 32h896a31.36 31.36 0 0 0 32-32V64a31.36 31.36 0 0 0-32-32zM928 896a31.36 31.36 0 0 1-30.72 30.72H1 [...] diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue index d453e848f..4ef249491 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/ExecOptionModal.vue @@ -100,7 +100,7 @@ :height="600" > <template #title> - <SvgIcon name="code" style="color: red" /> + <SvgIcon name="fjar" style="color: red" /> {{ t('flink.app.detail.exceptionModal.title') }} </template> <div class="startExp h-540px" ref="startExp"></div> diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts index aee124287..3da83a68e 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/data/detail.data.ts @@ -42,7 +42,7 @@ export const getDescSchema = (): DescItem[] => { 'div', { class: 'bold-tag' }, h(Tag, { color: curVal === 1 ? '#545454' : '#0C7EF2', class: 'mr-8px' }, () => - curVal === 1 ? 'Custom Code' : 'Flink SQL', + curVal === 1 ? 'Flink JAR' : 'Flink SQL', ), ), }, diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts index bf9d7e897..c65ca155f 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts @@ -232,7 +232,7 @@ export const useAppTableAction = ( sessionStorage.setItem('appPageNo', String(currentPageNo || 1)); flinkAppStore.setApplicationId(app.id); if (app.appType == AppTypeEnum.STREAMPARK_FLINK) { - // jobType( 1 custom code 2: flinkSQL) + // jobType( 1 flinkJar 2: flinkSQL) router.push({ path: '/flink/app/edit_streampark', query: { appId: app.id } }); } else if (app.appType == AppTypeEnum.APACHE_FLINK) { //Apache Flink diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts index 59e6d4bed..62f20eab5 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts @@ -510,7 +510,7 @@ export const useCreateAndEditSchema = ( icon: 'ant-design:code-outlined', style: { color: '#108ee9' }, }), - h('span', { class: 'pl-8px' }, 'Custom Code'), + h('span', { class: 'pl-8px' }, 'Flink JAR'), ], }, ); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts index 3166e6fac..59bdad48a 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts @@ -39,8 +39,8 @@ const getJobTypeOptions = () => { return [ { label: h('div', {}, [ - h(SvgIcon, { name: 'code', color: '#108ee9' }, ''), - h('span', { class: 'pl-10px' }, 'Custom Code'), + h(SvgIcon, { name: 'fjar', color: '#108ee9' }, ''), + h('span', { class: 'pl-10px' }, 'Flink JAR'), ]), value: String(JobTypeEnum.JAR), }, diff --git a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java index 85b5943ff..b9f4ed1e7 100644 --- a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java +++ b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/pages/flink/applications/ApplicationForm.java @@ -96,9 +96,9 @@ public final class ApplicationForm { new WebDriverWait(driver, Constants.DEFAULT_WEBDRIVER_WAIT_DURATION) .until(ExpectedConditions.visibilityOfAllElements(selectJobType)); switch (jobType) { - case CUSTOM_CODE: + case FLINK_JAR: selectJobType.stream() - .filter(e -> e.getText().equalsIgnoreCase(FlinkJobType.CUSTOM_CODE.desc)) + .filter(e -> e.getText().equalsIgnoreCase(FlinkJobType.FLINK_JAR.desc)) .findFirst() .orElseThrow( () -> new IllegalArgumentException( @@ -250,7 +250,7 @@ public final class ApplicationForm { @Getter public enum FlinkJobType { - CUSTOM_CODE("custom code"), + FLINK_JAR("flink jar"), FLINK_SQL("flink sql"), PYTHON_FLINK("python flink");
