This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 246fab62d [Feature] Backend modifications support the cdc yaml api 
(#4175)
246fab62d is described below

commit 246fab62d049939b4d610e113b47e86a1155a356
Author: ouyangwulin <[email protected]>
AuthorDate: Sun Jan 19 20:38:19 2025 +0800

    [Feature] Backend modifications support the cdc yaml api (#4175)
    
    * [Feature] Backend modifications support the cdc yaml api
    
    * fixd imports
---
 .../streampark/common/enums/FlinkJobType.java      |  31 ++++-
 .../console/core/entity/FlinkApplication.java      | 146 ++++++++++++++++-----
 .../impl/FlinkApplicationActionServiceImpl.java    |  22 +++-
 .../impl/FlinkApplicationBackupServiceImpl.java    |   4 +-
 .../FlinkApplicationBuildPipelineServiceImpl.java  |  31 +++--
 .../impl/FlinkApplicationConfigServiceImpl.java    |   6 +-
 .../impl/FlinkApplicationManageServiceImpl.java    |  14 +-
 .../service/impl/FlinkSavepointServiceImpl.java    |  10 +-
 .../flink/client/bean/SubmitRequest.scala          |   1 +
 .../flink/client/trait/FlinkClientTrait.scala      |   2 +-
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |   4 +-
 11 files changed, 200 insertions(+), 71 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
index b15d2c097..16c63952e 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkJobType.java
@@ -20,20 +20,35 @@ package org.apache.streampark.common.enums;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-/** The flink deployment mode enum. */
+/**
+ * The flink deployment mode enum.
+ */
 public enum FlinkJobType {
 
-    /** Unknown type replace null */
+    /**
+     * Unknown type replace null
+     */
     UNKNOWN("Unknown", -1),
 
-    /** custom code */
+    /**
+     * custom code
+     */
     CUSTOM_CODE("Custom Code", 1),
 
-    /** Flink SQL */
+    /**
+     * Flink SQL
+     */
     FLINK_SQL("Flink SQL", 2),
 
-    /** Py flink Mode */
-    PYFLINK("Python Flink", 3);
+    /**
+     * Py flink Mode
+     */
+    PYFLINK("Python Flink", 3),
+
+    /**
+     * Flink CDC
+     */
+    FLINK_CDC("Flink CDC", 4);
 
     private final String name;
 
@@ -60,7 +75,9 @@ public enum FlinkJobType {
         return FlinkJobType.UNKNOWN;
     }
 
-    /** Get the mode value of the current {@link FlinkJobType} enum. */
+    /**
+     * Get the mode value of the current {@link FlinkJobType} enum.
+     */
     @Nonnull
     public Integer getMode() {
         return mode;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
index 49557c51e..fcb787b95 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkApplication.java
@@ -71,38 +71,58 @@ public class FlinkApplication extends BaseEntity {
 
     private Long teamId;
 
-    /** 1) custom code 2) flink SQL */
+    /**
+     * 1) custom code 2) flink SQL
+     */
     private Integer jobType;
 
     private Long projectId;
-    /** creator */
+    /**
+     * creator
+     */
     private Long userId;
 
-    /** The name of the frontend and program displayed in yarn */
+    /**
+     * The name of the frontend and program displayed in yarn
+     */
     private String jobName;
 
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String jobId;
 
-    /** The address of the jobmanager, that is, the direct access address of 
the Flink web UI */
+    /**
+     * The address of the jobmanager, that is, the direct access address of 
the Flink web UI
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String jobManagerUrl;
 
-    /** flink version */
+    /**
+     * flink version
+     */
     private Long versionId;
 
-    /** 1. yarn application id(on yarn) 2. k8s application id (on k8s 
application) */
+    /**
+     * 1. yarn application id(on yarn) 2. k8s application id (on k8s 
application)
+     */
     private String clusterId;
 
-    /** flink docker base image */
+    /**
+     * flink docker base image
+     */
     private String flinkImage;
 
-    /** k8s namespace */
+    /**
+     * k8s namespace
+     */
     private String k8sNamespace = Constants.DEFAULT;
 
-    /** The exposed type of the rest service of 
K8s(kubernetes.rest-service.exposed.type) */
+    /**
+     * The exposed type of the rest service of 
K8s(kubernetes.rest-service.exposed.type)
+     */
     private Integer k8sRestExposedType;
-    /** flink kubernetes pod template */
+    /**
+     * flink kubernetes pod template
+     */
     private String k8sPodTemplate;
 
     private String k8sJmPodTemplate;
@@ -113,32 +133,46 @@ public class FlinkApplication extends BaseEntity {
     @Setter
     private String defaultModeIngress;
 
-    /** flink-hadoop integration on flink-k8s mode */
+    /**
+     * flink-hadoop integration on flink-k8s mode
+     */
     private Boolean k8sHadoopIntegration;
 
     private Integer state;
-    /** task release status */
+    /**
+     * task release status
+     */
     @TableField("`release`")
     private Integer release;
 
-    /** determine if a task needs to be built */
+    /**
+     * determine if a task needs to be built
+     */
     private Boolean build;
 
-    /** max restart retries after job failed */
+    /**
+     * max restart retries after job failed
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Integer restartSize;
 
-    /** has restart count */
+    /**
+     * has restart count
+     */
     private Integer restartCount;
 
     private Integer optionState;
 
-    /** alert id */
+    /**
+     * alert id
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Long alertId;
 
     private String args;
-    /** application module */
+    /**
+     * application module
+     */
     private String module;
 
     private String options;
@@ -155,7 +189,9 @@ public class FlinkApplication extends BaseEntity {
 
     private Integer appType;
 
-    /** determine if tracking status */
+    /**
+     * determine if tracking status
+     */
     private Integer tracking;
 
     private String jar;
@@ -176,19 +212,27 @@ public class FlinkApplication extends BaseEntity {
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Long duration;
 
-    /** checkpoint max failure interval */
+    /**
+     * checkpoint max failure interval
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Integer cpMaxFailureInterval;
 
-    /** checkpoint failure rate interval */
+    /**
+     * checkpoint failure rate interval
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Integer cpFailureRateInterval;
 
-    /** Actions triggered after X minutes failed Y times: 1: send alert 2: 
restart */
+    /**
+     * Actions triggered after X minutes failed Y times: 1: send alert 2: 
restart
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Integer cpFailureAction;
 
-    /** overview */
+    /**
+     * overview
+     */
     @TableField("TOTAL_TM")
     private Integer totalTM;
 
@@ -201,7 +245,9 @@ public class FlinkApplication extends BaseEntity {
     private Integer tmMemory;
     private Integer totalTask;
 
-    /** the cluster id bound to the task in remote mode */
+    /**
+     * the cluster id bound to the task in remote mode
+     */
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private Long flinkClusterId;
 
@@ -214,13 +260,17 @@ public class FlinkApplication extends BaseEntity {
 
     private Date modifyTime;
 
-    /** 1: cicd (build from csv) 2: upload (upload local jar job) */
+    /**
+     * 1: cicd (build from csv) 2: upload (upload local jar job)
+     */
     private Integer resourceFrom;
 
     @TableField(updateStrategy = FieldStrategy.IGNORED)
     private String tags;
 
-    /** running job */
+    /**
+     * running job
+     */
     private transient JobsOverview.Task overview;
 
     private transient String teamResource;
@@ -254,10 +304,14 @@ public class FlinkApplication extends BaseEntity {
     private transient String yarnQueue;
     private transient String serviceAccount;
 
-    /** Flink Web UI Url */
+    /**
+     * Flink Web UI Url
+     */
     private transient String flinkRestUrl;
 
-    /** refer to {@link 
org.apache.streampark.flink.packer.pipeline.BuildPipeline} */
+    /**
+     * refer to {@link 
org.apache.streampark.flink.packer.pipeline.BuildPipeline}
+     */
     private transient Integer buildStatus;
 
     private transient AppControl appControl;
@@ -367,15 +421,17 @@ public class FlinkApplication extends BaseEntity {
     }
 
     public boolean eqFlinkJob(FlinkApplication other) {
-        if (this.isFlinkSqlJob()
-            && other.isFlinkSqlJob()
+        if (this.isFlinkSqlJobOrCDC()
+            && other.isFlinkSqlJobOrCDC()
             && this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) {
             return 
this.getDependencyObject().equals(other.getDependencyObject());
         }
         return false;
     }
 
-    /** Local compilation and packaging working directory */
+    /**
+     * Local compilation and packaging working directory
+     */
     @JsonIgnore
     public String getDistHome() {
         String path = String.format("%s/%s/%s", Workspace.APP_LOCAL_DIST(), 
projectId.toString(), getModule());
@@ -397,7 +453,9 @@ public class FlinkApplication extends BaseEntity {
         return path;
     }
 
-    /** Automatically identify remoteAppHome or localAppHome based on app 
FlinkDeployMode */
+    /**
+     * Automatically identify remoteAppHome or localAppHome based on app 
FlinkDeployMode
+     */
     @JsonIgnore
     public String getAppHome() {
         switch (this.getDeployModeEnum()) {
@@ -416,6 +474,22 @@ public class FlinkApplication extends BaseEntity {
         }
     }
 
+    public String getMainClass() {
+        FlinkJobType flinkJobType = FlinkJobType.of(jobType);
+        if (flinkJobType == FlinkJobType.FLINK_SQL) {
+            return Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS;
+        } else if (flinkJobType == FlinkJobType.FLINK_CDC) {
+            return Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS;
+        } else if (flinkJobType == FlinkJobType.PYFLINK) {
+            return Constants.PYTHON_FLINK_DRIVER_CLASS_NAME; // Assuming this 
is the default behavior for other enum
+            // values
+        } else if (flinkJobType == FlinkJobType.CUSTOM_CODE) {
+            return mainClass;
+        } else {
+            return null;
+        }
+    }
+
     @JsonIgnore
     public String getAppLib() {
         return getAppHome().concat("/lib");
@@ -439,14 +513,16 @@ public class FlinkApplication extends BaseEntity {
     }
 
     @JsonIgnore
-    public boolean isFlinkSqlJob() {
-        return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType());
+    public boolean isFlinkSqlJobOrCDC() {
+        return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType()) ||
+            FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
     }
 
     @JsonIgnore
-    public boolean isFlinkSqlJobOrPyFlinkJob() {
+    public boolean isFlinkSqlJobOrPyFlinkJobOrFlinkCDC() {
         return FlinkJobType.FLINK_SQL.getMode().equals(this.getJobType())
-            || FlinkJobType.PYFLINK.getMode().equals(this.getJobType());
+            || FlinkJobType.PYFLINK.getMode().equals(this.getJobType())
+            || FlinkJobType.FLINK_CDC.getMode().equals(this.getJobType());
     }
 
     @JsonIgnore
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
index d645570f9..717f90b55 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java
@@ -216,7 +216,7 @@ public class FlinkApplicationActionServiceImpl
         // 3) restore related status
         LambdaUpdateWrapper<FlinkApplication> updateWrapper = 
Wrappers.lambdaUpdate();
         updateWrapper.eq(FlinkApplication::getId, application.getId());
-        if (application.isFlinkSqlJob()) {
+        if (application.isFlinkSqlJobOrCDC()) {
             updateWrapper.set(FlinkApplication::getRelease, 
ReleaseStateEnum.FAILED.get());
         } else {
             updateWrapper.set(FlinkApplication::getRelease, 
ReleaseStateEnum.NEED_RELEASE.get());
@@ -452,7 +452,7 @@ public class FlinkApplicationActionServiceImpl
         applicationManageService.toEffective(application);
 
         Map<String, Object> extraParameter = new HashMap<>(0);
-        if (application.isFlinkSqlJob()) {
+        if (application.isFlinkSqlJobOrCDC()) {
             FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), true);
             // Get the sql of the replaced placeholder
             String realSql = 
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
@@ -690,6 +690,24 @@ public class FlinkApplicationActionServiceImpl
                 }
                 break;
 
+            case FLINK_CDC:
+                log.info("the current job id: {}", application.getId());
+                FlinkSql flinkCDC = 
flinkSqlService.getEffective(application.getId(), false);
+                AssertUtils.notNull(flinkCDC);
+                // 1) dist_userJar
+                String cdcDistJar = 
ServiceHelper.getFlinkCDCClientJar(flinkEnv);
+                // 2) appConfig
+                appConf =
+                    applicationConfig == null
+                        ? null
+                        : String.format("yaml://%s", 
applicationConfig.getContent());
+                // 3) client
+                if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
+                    String clientPath = Workspace.remote().APP_CLIENT();
+                    flinkUserJar = String.format("%s/%s", clientPath, 
cdcDistJar);
+                }
+                break;
+
             case PYFLINK:
                 Resource resource =
                     
resourceService.findByResourceName(application.getTeamId(), 
application.getJar());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
index f8ccdac43..38d1554bd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBackupServiceImpl.java
@@ -89,7 +89,7 @@ public class FlinkApplicationBackupServiceImpl
         // If necessary, perform the backup first
         if (bakParam.isBackup()) {
             application.setBackUpDescription(bakParam.getDescription());
-            if (application.isFlinkSqlJob()) {
+            if (application.isFlinkSqlJobOrCDC()) {
                 FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), false);
                 backup(application, flinkSql);
             } else {
@@ -107,7 +107,7 @@ public class FlinkApplicationBackupServiceImpl
             effectiveService.saveOrUpdate(
                 bakParam.getAppId(), EffectiveTypeEnum.CONFIG, 
bakParam.getId());
             // if flink sql task, will be rollback sql and dependencies
-            if (application.isFlinkSqlJob()) {
+            if (application.isFlinkSqlJobOrCDC()) {
                 effectiveService.saveOrUpdate(
                     bakParam.getAppId(), EffectiveTypeEnum.FLINKSQL, 
bakParam.getSqlId());
             }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
index fc9958cb2..f930fb3cc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java
@@ -19,7 +19,6 @@ package 
org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.constants.Constants;
-import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkDeployMode;
 import org.apache.streampark.common.enums.FlinkJobType;
 import org.apache.streampark.common.fs.FsOperator;
@@ -110,6 +109,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.streampark.common.enums.ApplicationType.APACHE_FLINK;
 import static org.apache.streampark.console.core.enums.OperationEnum.RELEASE;
 
 @Service
@@ -170,7 +170,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
     /**
      * Build application. This is an async call method.
      *
-     * @param appId application id
+     * @param appId      application id
      * @param forceBuild forced start pipeline or not
      * @return Whether the pipeline was successfully started
      */
@@ -192,14 +192,14 @@ public class FlinkApplicationBuildPipelineServiceImpl
             return true;
         }
         // rollback
-        if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+        if (app.isNeedRollback() && app.isFlinkSqlJobOrCDC()) {
             flinkSqlService.rollback(app);
         }
 
         // 1) flink sql setDependency
         FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), 
CandidateTypeEnum.NEW);
         FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), 
false);
-        if (app.isFlinkSqlJobOrPyFlinkJob()) {
+        if (app.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
             FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : 
newFlinkSql;
             AssertUtils.notNull(flinkSql);
             app.setDependency(flinkSql.getDependency());
@@ -324,7 +324,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
                             // If the current task is not running, or the task 
has just been added, directly
                             // set
                             // the candidate version to the official version
-                            if (app.isFlinkSqlJob()) {
+                            if (app.isFlinkSqlJobOrCDC()) {
                                 applicationManageService.toEffective(app);
                             } else {
                                 if (app.isStreamParkJob()) {
@@ -340,7 +340,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
                         }
                         // backup.
                         if (!app.isNeedRollback()) {
-                            if (app.isFlinkSqlJob() && newFlinkSql != null) {
+                            if (app.isFlinkSqlJobOrCDC() && newFlinkSql != 
null) {
                                 backUpService.backup(app, newFlinkSql);
                             } else {
                                 backUpService.backup(app, null);
@@ -423,7 +423,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
     /**
      * check the build environment
      *
-     * @param appId application id
+     * @param appId      application id
      * @param forceBuild forced start pipeline or not
      */
     private void checkBuildEnv(Long appId, boolean forceBuild) {
@@ -447,7 +447,9 @@ public class FlinkApplicationBuildPipelineServiceImpl
             "The job is invalid, or the job cannot be built while it is 
running");
     }
 
-    /** create building pipeline instance */
+    /**
+     * create building pipeline instance
+     */
     private BuildPipeline createPipelineInstance(@Nonnull FlinkApplication 
app) {
         FlinkEnv flinkEnv = 
flinkEnvService.getByIdOrDefault(app.getVersionId());
         String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
@@ -466,7 +468,7 @@ public class FlinkApplicationBuildPipelineServiceImpl
                 String yarnProvidedPath = app.getAppLib();
                 String localWorkspace = app.getLocalAppHome().concat("/lib");
                 if (FlinkJobType.CUSTOM_CODE == app.getJobTypeEnum()
-                    && ApplicationType.APACHE_FLINK == 
app.getApplicationType()) {
+                    && APACHE_FLINK == app.getApplicationType()) {
                     yarnProvidedPath = app.getAppHome();
                     localWorkspace = app.getLocalAppHome();
                 }
@@ -579,7 +581,9 @@ public class FlinkApplicationBuildPipelineServiceImpl
             getMergedDependencyInfo(app));
     }
 
-    /** copy from {@link FlinkApplicationActionService#start(FlinkApplication, 
boolean)} */
+    /**
+     * copy from {@link FlinkApplicationActionService#start(FlinkApplication, 
boolean)}
+     */
     private String retrieveFlinkUserJar(FlinkEnv flinkEnv, FlinkApplication 
app) {
         switch (app.getJobTypeEnum()) {
             case CUSTOM_CODE:
@@ -603,6 +607,13 @@ public class FlinkApplicationBuildPipelineServiceImpl
                     return String.format("%s/%s", clientPath, sqlDistJar);
                 }
                 return 
Workspace.local().APP_CLIENT().concat("/").concat(sqlDistJar);
+            case FLINK_CDC:
+                String cdcDistJar = 
ServiceHelper.getFlinkCDCClientJar(flinkEnv);
+                if (app.getDeployModeEnum() == 
FlinkDeployMode.YARN_APPLICATION) {
+                    String clientPath = Workspace.remote().APP_CLIENT();
+                    return String.format("%s/%s", clientPath, cdcDistJar);
+                }
+                return 
Workspace.local().APP_CLIENT().concat("/").concat(cdcDistJar);
             default:
                 throw new UnsupportedOperationException(
                     "[StreamPark] unsupported JobType: " + 
app.getJobTypeEnum());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
index 283fc8f39..8eb6ef3a2 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationConfigServiceImpl.java
@@ -99,7 +99,7 @@ public class FlinkApplicationConfigServiceImpl
     public synchronized void update(FlinkApplication appParam, Boolean latest) 
{
         // flink sql job
         FlinkApplicationConfig latestConfig = getLatest(appParam.getId());
-        if (appParam.isFlinkSqlJob()) {
+        if (appParam.isFlinkSqlJobOrCDC()) {
             updateForFlinkSqlJob(appParam, latest, latestConfig);
         } else {
             updateForNonFlinkSqlJob(appParam, latest, latestConfig);
@@ -169,7 +169,9 @@ public class FlinkApplicationConfigServiceImpl
         }
     }
 
-    /** Not running tasks are set to Effective, running tasks are set to 
Latest */
+    /**
+     * Not running tasks are set to Effective, running tasks are set to Latest
+     */
     @Override
     public void setLatestOrEffective(Boolean latest, Long configId, Long 
appId) {
         if (latest) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
index aec454bac..f1bba8c61 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java
@@ -162,7 +162,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
         if (config != null) {
             this.configService.toEffective(appParam.getId(), config.getId());
         }
-        if (appParam.isFlinkSqlJob()) {
+        if (appParam.isFlinkSqlJobOrCDC()) {
             FlinkSql flinkSql = flinkSqlService.getCandidate(appParam.getId(), 
null);
             if (flinkSql != null) {
                 flinkSqlService.toEffective(appParam.getId(), 
flinkSql.getId());
@@ -365,7 +365,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
 
         boolean saveSuccess = save(appParam);
         if (saveSuccess) {
-            if (appParam.isFlinkSqlJobOrPyFlinkJob()) {
+            if (appParam.isFlinkSqlJobOrPyFlinkJobOrFlinkCDC()) {
                 FlinkSql flinkSql = new FlinkSql(appParam);
                 flinkSqlService.create(flinkSql);
             }
@@ -449,7 +449,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
 
         boolean saved = save(newApp);
         if (saved) {
-            if (newApp.isFlinkSqlJob()) {
+            if (newApp.isFlinkSqlJobOrCDC()) {
                 FlinkSql copyFlinkSql = 
flinkSqlService.getLatestFlinkSql(appParam.getId(), true);
                 newApp.setFlinkSql(copyFlinkSql.getSql());
                 newApp.setDependency(copyFlinkSql.getDependency());
@@ -584,7 +584,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
         }
 
         // Flink Sql job...
-        if (application.isFlinkSqlJob()) {
+        if (application.isFlinkSqlJobOrCDC()) {
             updateFlinkSqlJob(application, appParam);
             return true;
         }
@@ -721,7 +721,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
             this.update(update);
 
             // backup
-            if (appParam.isFlinkSqlJob()) {
+            if (appParam.isFlinkSqlJobOrCDC()) {
                 FlinkSql newFlinkSql = 
flinkSqlService.getCandidate(appParam.getId(), CandidateTypeEnum.NEW);
                 if (!appParam.isNeedRollback() && newFlinkSql != null) {
                     backUpService.backup(appParam, newFlinkSql);
@@ -752,7 +752,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
         if (config != null) {
             config.setToApplication(application);
         }
-        if (application.isFlinkSqlJob()) {
+        if (application.isFlinkSqlJobOrCDC()) {
             FlinkSql flinkSql = 
flinkSqlService.getEffective(application.getId(), true);
             if (flinkSql == null) {
                 flinkSql = flinkSqlService.getCandidate(application.getId(), 
CandidateTypeEnum.NEW);
@@ -823,7 +823,7 @@ public class FlinkApplicationManageServiceImpl extends 
ServiceImpl<FlinkApplicat
      *
      * @param application application entity.
      * @return If the deployMode is (Yarn PerJob or application mode) and the 
queue label is not
-     *     (empty or default), return true, false else.
+     * (empty or default), return true, false else.
      */
     private boolean isYarnNotDefaultQueue(FlinkApplication application) {
         return 
FlinkDeployMode.isYarnPerJobOrAppMode(application.getDeployModeEnum())
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
index 30a4e0152..50afd4749 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java
@@ -342,7 +342,7 @@ public class FlinkSavepointServiceImpl extends 
ServiceImpl<FlinkSavepointMapper,
     @VisibleForTesting
     @Nullable
     public String getSavepointFromConfig(FlinkApplication application) {
-        if (!application.isStreamParkJob() && !application.isFlinkSqlJob()) {
+        if (!application.isStreamParkJob() && 
!application.isFlinkSqlJobOrCDC()) {
             return null;
         }
         FlinkApplicationConfig applicationConfig = 
configService.getEffective(application.getId());
@@ -384,7 +384,9 @@ public class FlinkSavepointServiceImpl extends 
ServiceImpl<FlinkSavepointMapper,
         return config.isEmpty() ? null : config.get(SAVEPOINT_DIRECTORY.key());
     }
 
-    /** Try get the 'state.checkpoints.num-retained' from the dynamic 
properties. */
+    /**
+     * Try get the 'state.checkpoints.num-retained' from the dynamic 
properties.
+     */
     private Optional<Integer> tryGetChkNumRetainedFromDynamicProps(String 
dynamicProps) {
         String rawCfgValue = 
extractDynamicPropertiesAsJava(dynamicProps).get(MAX_RETAINED_CHECKPOINTS.key());
         if (StringUtils.isBlank(rawCfgValue)) {
@@ -404,7 +406,9 @@ public class FlinkSavepointServiceImpl extends 
ServiceImpl<FlinkSavepointMapper,
         return Optional.empty();
     }
 
-    /** Try get the 'state.checkpoints.num-retained' from the flink env. */
+    /**
+     * Try get the 'state.checkpoints.num-retained' from the flink env.
+     */
     private int getChkNumRetainedFromFlinkEnv(
                                               @Nonnull FlinkEnv flinkEnv, 
@Nonnull FlinkApplication application) {
         String flinkConfNumRetained = 
flinkEnv.convertFlinkYamlAsMap().get(MAX_RETAINED_CHECKPOINTS.key());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 007aa217b..afc99b928 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -68,6 +68,7 @@ case class SubmitRequest(
     case FlinkJobType.FLINK_SQL =>
       Constants.STREAMPARK_FLINKSQL_CLIENT_CLASS
     case FlinkJobType.PYFLINK => Constants.PYTHON_FLINK_DRIVER_CLASS_NAME
+    case FlinkJobType.FLINK_CDC => Constants.STREAMPARK_FLINKCDC_CLIENT_CLASS
     case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 4b8a25212..cf3ad8cd6 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -478,7 +478,7 @@ trait FlinkClientTrait extends Logger {
       programArgs += PARAM_KEY_FLINK_PARALLELISM += 
getParallelism(submitRequest).toString
 
       submitRequest.jobType match {
-        case FlinkJobType.FLINK_SQL =>
+        case FlinkJobType.FLINK_SQL | FlinkJobType.FLINK_CDC =>
           programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
           if (submitRequest.appConf != null) {
             programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index ac0977b59..402ab1e96 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -46,7 +46,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
   override protected def buildProcess(): SimpleBuildResponse = {
     execStep(1) {
       request.flinkJobType match {
-        case FlinkJobType.FLINK_SQL | FlinkJobType.PYFLINK =>
+        case FlinkJobType.FLINK_SQL | FlinkJobType.FLINK_CDC | 
FlinkJobType.PYFLINK =>
           LfsOperator.mkCleanDirs(request.localWorkspace)
           HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
         case _ =>
@@ -57,7 +57,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
     val mavenJars =
       execStep(2) {
         request.flinkJobType match {
-          case FlinkJobType.FLINK_SQL | FlinkJobType.PYFLINK =>
+          case FlinkJobType.FLINK_SQL | FlinkJobType.FLINK_SQL | 
FlinkJobType.PYFLINK =>
             val mavenArts =
               MavenTool.resolveArtifacts(request.dependencyInfo.mavenArts)
             mavenArts.map(_.getAbsolutePath) ++ 
request.dependencyInfo.extJarLibs


Reply via email to