This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 246fab62d [Feature] Backend modifications support the cdc yaml api
(#4175)
246fab62d is described below
commit 246fab62d049939b4d610e113b47e86a1155a356
Author: ouyangwulin <[email protected]>
AuthorDate: Sun Jan 19 20:38:19 2025 +0800
[Feature] Backend modifications support the cdc yaml api (#4175)
* [Feature] Backend modifications support the cdc yaml api
* fixd imports
---
.../streampark/common/enums/FlinkJobType.java | 31 ++++-
.../console/core/entity/FlinkApplication.java | 146 ++++++++++++++++-----
.../impl/FlinkApplicationActionServiceImpl.java | 22 +++-
.../impl/FlinkApplicationBackupServiceImpl.java | 4 +-
.../FlinkApplicationBuildPipelineServiceImpl.java | 31 +++--
.../impl/FlinkApplicationConfigServiceImpl.java | 6 +-
.../impl/FlinkApplicationManageServiceImpl.java | 14 +-
.../service/impl/FlinkSavepointServiceImpl.java | 10 +-
.../flink/client/bean/SubmitRequest.scala | 1 +
.../flink/client/trait/FlinkClientTrait.scala | 2 +-
.../impl/FlinkYarnApplicationBuildPipeline.scala | 4 +-
11 files changed, 200 insertions(+), 71 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
index b15d2c097..16c63952e 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
@@ -20,20 +20,35 @@ package org.apache.streampark.common.enums;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-/** The flink deployment mode enum. */
+/**
+ * The flink deployment mode enum.
+ */
public enum FlinkJobType {
- /** Unknown type replace null */
+ /**
+ * Unknown type replace null
+ */
UNKNOWN("Unknown", -1),
- /** custom code */
+ /**
+ * custom code
+ */
CUSTOM_CODE("Custom Code", 1),
- /** Flink SQL */
+ /**
+ * Flink SQL
+ */
FLINK_SQL("Flink SQL", 2),
- /** Py flink Mode */
- PYFLINK("Python Flink", 3);
+ /**
+ * Py flink Mode
+ */
+ PYFLINK("Python Flink", 3),
+
+ /**
+ * Flink CDC
+ */
+ FLINK_CDC("Flink CDC", 4);
private final String name;
@@ -60,7 +75,9 @@ public enum FlinkJobType {
return FlinkJobType.UNKNOWN;
}
- /** Get the mode value of the current {@link FlinkJobType} enum. */
+ /**
+ * Get the mode value of the current {@link FlinkJobType} enum.
+ */
@Nonnull
public Integer getMode() {
return mode;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index 49557c51e..fcb787b95 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -71,38 +71,58 @@ public class FlinkApplication extends BaseEntity {
private Long teamId;
- /** 1) custom code 2) flink SQL */
+ /**
+ * 1) custom code 2) flink SQL
+ */
private Integer jobType;
private Long projectId;
- /** creator */
+ /**
+ * creator
+ */
private Long userId;
- /** The name of the frontend and program displayed in yarn */
+ /**
+ * The name of the frontend and program displayed in yarn
+ */
private String jobName;
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobId;
- /** The address of the jobmanager, that is, the direct access address of
the Flink web UI */
+ /**
+ * The address of the jobmanager, that is, the direct access address of
the Flink web UI
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String jobManagerUrl;
- /** flink version */
+ /**
+ * flink version
+ */
private Long versionId;
- /** 1. yarn application id(on yarn) 2. k8s application id (on k8s
application) */
+ /**
+ * 1. yarn application id(on yarn) 2. k8s application id (on k8s
application)
+ */
private String clusterId;
- /** flink docker base image */
+ /**
+ * flink docker base image
+ */
private String flinkImage;
- /** k8s namespace */
+ /**
+ * k8s namespace
+ */
private String k8sNamespace = Constants.DEFAULT;
- /** The exposed type of the rest service of
K8s(kubernetes.rest-service.exposed.type) */
+ /**
+ * The exposed type of the rest service of
K8s(kubernetes.rest-service.exposed.type)
+ */
private Integer k8sRestExposedType;
- /** flink kubernetes pod template */
+ /**
+ * flink kubernetes pod template
+ */
private String k8sPodTemplate;
private String k8sJmPodTemplate;
@@ -113,32 +133,46 @@ public class FlinkApplication extends BaseEntity {
@Setter
private String defaultModeIngress;
- /** flink-hadoop integration on flink-k8s mode */
+ /**
+ * flink-hadoop integration on flink-k8s mode
+ */
private Boolean k8sHadoopIntegration;
private Integer state;
- /** task release status */
+ /**
+ * task release status
+ */
@TableField("`release`")
private Integer release;
- /** determine if a task needs to be built */
+ /**
+ * determine if a task needs to be built
+ */
private Boolean build;
- /** max restart retries after job failed */
+ /**
+ * max restart retries after job failed
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer restartSize;
- /** has restart count */
+ /**
+ * has restart count
+ */
private Integer restartCount;
private Integer optionState;
- /** alert id */
+ /**
+ * alert id
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long alertId;
private String args;
- /** application module */
+ /**
+ * application module
+ */
private String module;
private String options;
@@ -155,7 +189,9 @@ public class FlinkApplication extends BaseEntity {
private Integer appType;
- /** determine if tracking status */
+ /**
+ * determine if tracking status
+ */
private Integer tracking;
private String jar;
@@ -176,19 +212,27 @@ public class FlinkApplication extends BaseEntity {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long duration;
- /** checkpoint max failure interval */
+ /**
+ * checkpoint max failure interval
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer cpMaxFailureInterval;
- /** checkpoint failure rate interval */
+ /**
+ * checkpoint failure rate interval
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer cpFailureRateInterval;
- /** Actions triggered after X minutes failed Y times: 1: send alert 2:
restart */
+ /**
+ * Actions triggered after X minutes failed Y times: 1: send alert 2:
restart
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Integer cpFailureAction;
- /** overview */
+ /**
+ * overview
+ */
@TableField("TOTAL_TM")
private Integer totalTM;
@@ -201,7 +245,9 @@ public class FlinkApplication extends BaseEntity {
private Integer tmMemory;
private Integer totalTask;
- /** the cluster id bound to the task in remote mode */
+ /**
+ * the cluster id bound to the task in remote mode
+ */
@TableField(updateStrategy = FieldStrategy.IGNORED)
private Long flinkClusterId;
@@ -214,13 +260,17 @@ public class FlinkApplication extends BaseEntity {
private Date modifyTime;
- /** 1: cicd (build from csv) 2: upload (upload local jar job) */
+ /**
+ * 1: cicd (build from csv) 2: upload (upload local jar job)
+ */
private Integer resourceFrom;
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String tags;
- /** running job */
+ /**
+ * running job
+ */
private transient JobsOverview.Task overview;
private transient String teamResource;
@@ -254,10 +304,14 @@ public class FlinkApplication extends BaseEntity {
private transient String yarnQueue;
private transient String serviceAccount;
- /** Flink Web UI Url */
+ /**
+ * Flink Web UI Url
+ */
private transient String flinkRestUrl;
- /** refer to {@link
org.apache.streampark.flink.packer.pipeline.BuildPipeline} */
+ /**
+ * refer to {@link
org.apache.streampark.flink.packer.pipeline.BuildPipeline}
+ */
private transient Integer buildStatus;
private transient AppControl appControl;
@@ -367,15 +421,17 @@ public class FlinkApplication extends BaseEntity {
}
public boolean eqFlinkJob(FlinkApplication other) {
- if (this.isFlinkSqlJob()
- && other.isFlinkSqlJob()
+ if (this.isFlinkSqlJobOrCDC()
+ && other.isFlinkSqlJobOrCDC()
&& this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
return
this.getDependencyObject().equals(other.getDependencyObject());
}
return false;
}
- /** Local compilation and packaging working directory */
+ /**
+ * Local compilation and packaging working directory
+ */
@JsonIgnore
public String getDistHome() {
String path = String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(),
projectId.toString(), getModule());
@@ -397,7 +453,9 @@ public class FlinkApplication extends BaseEntity {
return path;
}
- /** Automatically identify remoteAppHome or localAppHome based on app
FlinkDeployMode */
+ /**
+ * Automatically identify remoteAppHome or localAppHome based on app
FlinkDeployMode
+ */
@JsonIgnore
public String getAppHome() {
switch (this.getDeployModeEnum()) {
@@ -416,6 +474,22 @@ public class FlinkApplication extends BaseEntity {
}
}
+ public String getMainClass() {
+ FlinkJobType flinkJobType = FlinkJobType.of(jobType);
+ if (flinkJobType == FlinkJobType.FLINK_SQL) {
+ return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
+ } else if (flinkJobType == FlinkJobType.FLINK_CDC) {
+ return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
+ } else if (flinkJobType == FlinkJobType.PYFLINK) {
+ return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; // Assuming this
is the default behavior for other enum
+ // values
+ } else if (flinkJobType == FlinkJobType.CUSTOM_CODE) {
+ return mainClass;
+ } else {
+ return null;
+ }
+ }
+
@JsonIgnore
public String getAppLib() {
return getAppHome().concat("/lib");
@@ -439,14 +513,16 @@ public class FlinkApplication extends BaseEntity {
}
@JsonIgnore
- public boolean isFlinkSqlJob() {
- return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType());
+ public boolean isFlinkSqlJobOrCDC() {
+ return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) ||
+ FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
}
@JsonIgnore
- public boolean isFlinkSqlJobOrPyFlinkJob() {
+ public boolean isFlinkSqlJobOrPyFlinkJobOrFlinkCDC() {
return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType())
- || FlinkJobType.PYFLINK.getMode().equals(this.getJobType());
+ || FlinkJobType.PYFLINK.getMode().equals(this.getJobType())
+ || FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
}
@JsonIgnore
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index d645570f9..717f90b55 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -216,7 +216,7 @@ public class FlinkApplicationActionServiceImpl
// 3) restore related status
LambdaUpdateWrapper<FlinkApplication> updateWrapper =
Wrappers.lambdaUpdate();
updateWrapper.eq(FlinkApplication::getId, application.getId());
- if (application.isFlinkSqlJob()) {
+ if (application.isFlinkSqlJobOrCDC()) {
updateWrapper.set(FlinkApplication::getRelease,
ReleaseStateEnum.FAILED.get());
} else {
updateWrapper.set(FlinkApplication::getRelease,
ReleaseStateEnum.NEED_RELEASE.get());
@@ -452,7 +452,7 @@ public class FlinkApplicationActionServiceImpl
applicationManageService.toEffective(application);
Map<String, Object> extraParameter = new HashMap<>(0);
- if (application.isFlinkSqlJob()) {
+ if (application.isFlinkSqlJobOrCDC()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql =
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
@@ -690,6 +690,24 @@ public class FlinkApplicationActionServiceImpl
}
break;
+ case FLINK_CDC:
+ log.info("the current job id: {}", application.getId());
+ FlinkSql flinkCDC =
flinkSqlService.getEffective(application.getId(), false);
+ AssertUtils.notNull(flinkCDC);
+ // 1) dist_userJar
+ String cdcDistJar =
ServiceHelper.getFlinkCDCClientJar(flinkEnv);
+ // 2) appConfig
+ appConf =
+ applicationConfig == null
+ ? null
+ : String.format("yaml://%s",
applicationConfig.getContent());
+ // 3) client
+ if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
+ String clientPath = Workspace.remote().APP_CLIENT();
+ flinkUserJar = String.format("%s/%s", clientPath,
cdcDistJar);
+ }
+ break;
+
case PYFLINK:
Resource resource =
resourceService.findByResourceName(application.getTeamId(),
application.getJar());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
index f8ccdac43..38d1554bd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
@@ -89,7 +89,7 @@ public class FlinkApplicationBackupServiceImpl
// If necessary, perform the backup first
if (bakParam.isBackup()) {
application.setBackUpDescription(bakParam.getDescription());
- if (application.isFlinkSqlJob()) {
+ if (application.isFlinkSqlJobOrCDC()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), false);
backup(application, flinkSql);
} else {
@@ -107,7 +107,7 @@ public class FlinkApplicationBackupServiceImpl
effectiveService.saveOrUpdate(
bakParam.getAppId(), EffectiveTypeEnum.CONFIG,
bakParam.getId());
// if flink sql task, will be rollback sql and dependencies
- if (application.isFlinkSqlJob()) {
+ if (application.isFlinkSqlJobOrCDC()) {
effectiveService.saveOrUpdate(
bakParam.getAppId(), EffectiveTypeEnum.FLINKSQL,
bakParam.getSqlId());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
index fc9958cb2..f930fb3cc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
@@ -19,7 +19,6 @@ package
org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.constants.Constants;
-import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDeployMode;
import org.apache.streampark.common.enums.FlinkJobType;
import org.apache.streampark.common.fs.FsOperator;
@@ -110,6 +109,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.streampark.common.enums.ApplicationType.APACHE_FLINK;
import static org.apache.streampark.console.core.enums.OperationEnum.RELEASE;
@Service
@@ -170,7 +170,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
/**
* Build application. This is an async call method.
*
- * @param appId application id
+ * @param appId application id
* @param forceBuild forced start pipeline or not
* @return Whether the pipeline was successfully started
*/
@@ -192,14 +192,14 @@ public class FlinkApplicationBuildPipelineServiceImpl
return true;
}
// rollback
- if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+ if (app.isNeedRollback() && app.isFlinkSqlJobOrCDC()) {
flinkSqlService.rollback(app);
}
// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(),
CandidateTypeEnum.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(),
false);
- if (app.isFlinkSqlJobOrPyFlinkJob()) {
+ if (app.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql :
newFlinkSql;
AssertUtils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
@@ -324,7 +324,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
// If the current task is not running, or the task
has just been added, directly
// set
// the candidate version to the official version
- if (app.isFlinkSqlJob()) {
+ if (app.isFlinkSqlJobOrCDC()) {
applicationManageService.toEffective(app);
} else {
if (app.isStreamParkJob()) {
@@ -340,7 +340,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
}
// backup.
if (!app.isNeedRollback()) {
- if (app.isFlinkSqlJob() && newFlinkSql != null) {
+ if (app.isFlinkSqlJobOrCDC() && newFlinkSql !=
null) {
backUpService.backup(app, newFlinkSql);
} else {
backUpService.backup(app, null);
@@ -423,7 +423,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
/**
* check the build environment
*
- * @param appId application id
+ * @param appId application id
* @param forceBuild forced start pipeline or not
*/
private void checkBuildEnv(Long appId, boolean forceBuild) {
@@ -447,7 +447,9 @@ public class FlinkApplicationBuildPipelineServiceImpl
"The job is invalid, or the job cannot be built while it is
running");
}
- /** create building pipeline instance */
+ /**
+ * create building pipeline instance
+ */
private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication
app) {
FlinkEnv flinkEnv =
flinkEnvService.getByIdOrDefault(app.getVersionId());
String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
@@ -466,7 +468,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
String yarnProvidedPath = app.getAppLib();
String localWorkspace = app.getLocalAppHome().concat("/lib");
if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum()
- && ApplicationType.APACHE_FLINK ==
app.getApplicationType()) {
+ && APACHE_FLINK == app.getApplicationType()) {
yarnProvidedPath = app.getAppHome();
localWorkspace = app.getLocalAppHome();
}
@@ -579,7 +581,9 @@ public class FlinkApplicationBuildPipelineServiceImpl
getMergedDependencyInfo(app));
}
- /** copy from {@link FlinkApplicationActionService#start(FlinkApplication,
boolean)} */
+ /**
+ * copy from {@link FlinkApplicationActionService#start(FlinkApplication,
boolean)}
+ */
private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication
app) {
switch (app.getJobTypeEnum()) {
case CUSTOM_CODE:
@@ -603,6 +607,13 @@ public class FlinkApplicationBuildPipelineServiceImpl
return String.format("%s/%s", clientPath, sqlDistJar);
}
return
Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar);
+ case FLINK_CDC:
+ String cdcDistJar =
ServiceHelper.getFlinkCDCClientJar(flinkEnv);
+ if (app.getDeployModeEnum() ==
FlinkDeployMode.YARN_APPLICATION) {
+ String clientPath = Workspace.remote().APP_CLIENT();
+ return String.format("%s/%s", clientPath, cdcDistJar);
+ }
+ return
Workspace.local().APP_CLIENT().concat("/").concat(cdcDistJar);
default:
throw new UnsupportedOperationException(
"[StreamPark] unsupported JobType: " +
app.getJobTypeEnum());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
index 283fc8f39..8eb6ef3a2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
@@ -99,7 +99,7 @@ public class FlinkApplicationConfigServiceImpl
public synchronized void update(FlinkApplication appParam, Boolean latest)
{
// flink sql job
FlinkApplicationConfig latestConfig = getLatest(appParam.getId());
- if (appParam.isFlinkSqlJob()) {
+ if (appParam.isFlinkSqlJobOrCDC()) {
updateForFlinkSqlJob(appParam, latest, latestConfig);
} else {
updateForNonFlinkSqlJob(appParam, latest, latestConfig);
@@ -169,7 +169,9 @@ public class FlinkApplicationConfigServiceImpl
}
}
- /** Not running tasks are set to Effective, running tasks are set to
Latest */
+ /**
+ * Not running tasks are set to Effective, running tasks are set to Latest
+ */
@Override
public void setLatestOrEffective(Boolean latest, Long configId, Long
appId) {
if (latest) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
index aec454bac..f1bba8c61 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
@@ -162,7 +162,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
if (config != null) {
this.configService.toEffective(appParam.getId(), config.getId());
}
- if (appParam.isFlinkSqlJob()) {
+ if (appParam.isFlinkSqlJobOrCDC()) {
FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(),
null);
if (flinkSql != null) {
flinkSqlService.toEffective(appParam.getId(),
flinkSql.getId());
@@ -365,7 +365,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
boolean saveSuccess = save(appParam);
if (saveSuccess) {
- if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
+ if (appParam.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
FlinkSql flinkSql = new FlinkSql(appParam);
flinkSqlService.create(flinkSql);
}
@@ -449,7 +449,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
boolean saved = save(newApp);
if (saved) {
- if (newApp.isFlinkSqlJob()) {
+ if (newApp.isFlinkSqlJobOrCDC()) {
FlinkSql copyFlinkSql =
flinkSqlService.getLatestFlinkSql(appParam.getId(), true);
newApp.setFlinkSql(copyFlinkSql.getSql());
newApp.setDependency(copyFlinkSql.getDependency());
@@ -584,7 +584,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
}
// Flink Sql job...
- if (application.isFlinkSqlJob()) {
+ if (application.isFlinkSqlJobOrCDC()) {
updateFlinkSqlJob(application, appParam);
return true;
}
@@ -721,7 +721,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
this.update(update);
// backup
- if (appParam.isFlinkSqlJob()) {
+ if (appParam.isFlinkSqlJobOrCDC()) {
FlinkSql newFlinkSql =
flinkSqlService.getCandidate(appParam.getId(), CandidateTypeEnum.NEW);
if (!appParam.isNeedRollback() && newFlinkSql != null) {
backUpService.backup(appParam, newFlinkSql);
@@ -752,7 +752,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
if (config != null) {
config.setToApplication(application);
}
- if (application.isFlinkSqlJob()) {
+ if (application.isFlinkSqlJobOrCDC()) {
FlinkSql flinkSql =
flinkSqlService.getEffective(application.getId(), true);
if (flinkSql == null) {
flinkSql = flinkSqlService.getCandidate(application.getId(),
CandidateTypeEnum.NEW);
@@ -823,7 +823,7 @@ public class FlinkApplicationManageServiceImpl extends
ServiceImpl<FlinkApplicat
*
* @param application application entity.
* @return If the deployMode is (Yarn PerJob or application mode) and the
queue label is not
- * (empty or default), return true, false else.
+ * (empty or default), return true, false else.
*/
private boolean isYarnNotDefaultQueue(FlinkApplication application) {
return
FlinkDeployMode.isYarnPerJobOrAppMode(application.getDeployModeEnum())
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index 30a4e0152..50afd4749 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -342,7 +342,7 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
@VisibleForTesting
@Nullable
public String getSavepointFromConfig(FlinkApplication application) {
- if (!application.isStreamParkJob() && !application.isFlinkSqlJob()) {
+ if (!application.isStreamParkJob() &&
!application.isFlinkSqlJobOrCDC()) {
return null;
}
FlinkApplicationConfig applicationConfig =
configService.getEffective(application.getId());
@@ -384,7 +384,9 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
return config.isEmpty() ? null : config.get(SAVEPOINT_DIRECTORY.key());
}
- /** Try get the 'state.checkpoints.num-retained' from the dynamic
properties. */
+ /**
+ * Try get the 'state.checkpoints.num-retained' from the dynamic
properties.
+ */
private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String
dynamicProps) {
String rawCfgValue =
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
if (StringUtils.isBlank(rawCfgValue)) {
@@ -404,7 +406,9 @@ public class FlinkSavepointServiceImpl extends
ServiceImpl<FlinkSavepointMapper,
return Optional.empty();
}
- /** Try get the 'state.checkpoints.num-retained' from the flink env. */
+ /**
+ * Try get the 'state.checkpoints.num-retained' from the flink env.
+ */
private int getChkNumRetainedFromFlinkEnv(
@Nonnull FlinkEnv flinkEnv,
@Nonnull FlinkApplication application) {
String flinkConfNumRetained =
flinkEnv.convertFlinkYamlAsMap().get(MAX_RETAINED_CHECKPOINTS.key());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 007aa217b..afc99b928 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -68,6 +68,7 @@ case class SubmitRequest(
case FlinkJobType.FLINK_SQL =>
Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS
case FlinkJobType.PYFLINK => Constants.PYTHON_FLINK_DRIVER_CLASS_NAME
+ case FlinkJobType.FLINK_CDC => Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS
case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 4b8a25212..cf3ad8cd6 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -478,7 +478,7 @@ trait FlinkClientTrait extends Logger {
programArgs += PARAM_KEY_FLINK_PARALLELISM +=
getParallelism(submitRequest).toString
submitRequest.jobType match {
- case FlinkJobType.FLINK_SQL =>
+ case FlinkJobType.FLINK_SQL | FlinkJobType.FLINK_CDC =>
programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
if (submitRequest.appConf != null) {
programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index ac0977b59..402ab1e96 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -46,7 +46,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
override protected def buildProcess(): SimpleBuildResponse = {
execStep(1) {
request.flinkJobType match {
- case FlinkJobType.FLINK_SQL | FlinkJobType.PYFLINK =>
+ case FlinkJobType.FLINK_SQL | FlinkJobType.FLINK_CDC |
FlinkJobType.PYFLINK =>
LfsOperator.mkCleanDirs(request.localWorkspace)
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
case _ =>
@@ -57,7 +57,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
val mavenJars =
execStep(2) {
request.flinkJobType match {
- case FlinkJobType.FLINK_SQL | FlinkJobType.PYFLINK =>
+ case FlinkJobType.FLINK_SQL | FlinkJobType.FLINK_SQL |
FlinkJobType.PYFLINK =>
val mavenArts =
MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
mavenArts.map(_.getAbsolutePath) ++
request.dependencyInfo.extJarLibs