This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new f82a728  [Feature] Introduce CDC job submission interface (#237)
f82a728 is described below

commit f82a7287ebbad95aee0dd1a300b2644b7a5016ed
Author: yangyang zhong <[email protected]>
AuthorDate: Wed May 22 21:27:02 2024 +0800

    [Feature] Introduce CDC job submission interface (#237)
---
 .../web/api/action/context/ActionContextUtil.java  |  22 +++
 .../FlinkCdcDatabasesSyncActionContext.java        |  10 +-
 .../context/MysqlSyncDatabaseActionContext.java    |   4 +-
 ...xt.java => PostgresSyncTableActionContext.java} |   2 +-
 .../ActionContextFactory.java}                     |  21 +--
 .../ActionContextFactoryServiceLoadUtil.java       |  45 +++++
 .../FlinkCdcActionContextFactory.java}             |  21 +--
 .../MysqlSyncTableActionContextFactory.java        |  66 +++++++
 .../FlinkCdcOptions.java}                          |  30 +--
 .../api/action/service/FlinkCdcActionService.java  |  24 +++
 .../web/api/catalog/PaimonServiceFactory.java      |  11 +-
 .../FlinkCdcDataSourceType.java}                   |  24 +--
 .../paimon/web/api/enums/FlinkCdcSyncType.java     |  54 ++++++
 .../PostgresSyncTableActionContextTest.java        |   5 +-
 paimon-web-common/pom.xml                          |  10 +
 .../apache/paimon/web/common/util/JSONUtils.java   | 201 +++++++++++++++++++++
 .../org/apache/paimon/web/common/util/MapUtil.java |  27 ++-
 .../controller/CdcJobDefinitionController.java     |   6 +
 .../web/server/data/dto/CdcJobSubmitDTO.java       |  22 +--
 .../paimon/web/server/data/model/cdc/CdcGraph.java |  87 +++++++++
 .../paimon/web/server/data/model/cdc/CdcNode.java  |  23 +--
 .../server/service/CdcJobDefinitionService.java    |   3 +
 .../service/impl/CdcJobDefinitionServiceImpl.java  | 113 ++++++++++++
 23 files changed, 717 insertions(+), 114 deletions(-)

diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
index bf06e94..f184823 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
@@ -18,8 +18,12 @@
 
 package org.apache.paimon.web.api.action.context;
 
+import org.apache.paimon.web.api.exception.ActionException;
+
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /** ActionContext Util. */
@@ -41,4 +45,22 @@ public class ActionContextUtil {
             }
         }
     }
+
+    public static String getActionJarPath() {
+        String actionJarPath = System.getenv("ACTION_JAR_PATH");
+        if (StringUtils.isBlank(actionJarPath)) {
+            actionJarPath = System.getProperty("ACTION_JAR_PATH");
+        }
+        if (StringUtils.isBlank(actionJarPath)) {
+            throw new ActionException("ACTION_JAR_PATH is null");
+        }
+        return actionJarPath;
+    }
+
+    public static List<String> getConfListFromString(String value, String 
spiltCharacter) {
+        if (StringUtils.isBlank(value)) {
+            return new ArrayList<>();
+        }
+        return Arrays.asList(value.split(spiltCharacter));
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
index 9cc42d6..546312b 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.web.api.action.context;
 
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+
 import lombok.experimental.SuperBuilder;
 
 import javax.annotation.Nullable;
@@ -29,17 +31,17 @@ import java.util.List;
 public abstract class FlinkCdcDatabasesSyncActionContext extends 
FlinkActionContext
         implements ActionContext {
 
-    @ActionConf(value = "warehouse")
+    @ActionConf(value = FlinkCdcOptions.WAREHOUSE)
     protected String warehouse;
 
-    @ActionConf(value = "database")
+    @ActionConf(value = FlinkCdcOptions.DATABASE)
     protected String database;
 
-    @ActionConf(value = "catalog_conf")
+    @ActionConf(value = FlinkCdcOptions.CATALOG_CONF)
     @Nullable
     protected List<String> catalogConfList;
 
-    @ActionConf(value = "table_conf")
+    @ActionConf(value = FlinkCdcOptions.TABLE_CONF)
     @Nullable
     protected List<String> tableConfList;
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
index 03e5621..d725176 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.web.api.action.context;
 
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+
 import lombok.experimental.SuperBuilder;
 
 import javax.annotation.Nullable;
@@ -29,7 +31,7 @@ import java.util.List;
 public class MysqlSyncDatabaseActionContext extends 
FlinkCdcDatabasesSyncActionContext
         implements ActionContext {
 
-    @ActionConf(value = "mysql_conf")
+    @ActionConf(value = FlinkCdcOptions.MYSQL_CONF)
     @Nullable
     private final List<String> mysqlConfList;
 
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContext.java
similarity index 93%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContext.java
index 8cf5dd8..b00f429 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContext.java
@@ -26,7 +26,7 @@ import java.util.List;
 
 /** Postgres sync table action context. */
 @SuperBuilder
-public class PostgresSyncTableCdcActionContext extends 
FlinkCdcTableSyncActionContext {
+public class PostgresSyncTableActionContext extends 
FlinkCdcTableSyncActionContext {
 
     @ActionConf(value = "postgres_conf")
     @Nullable
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactory.java
similarity index 63%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactory.java
index 8cf5dd8..3eb4506 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactory.java
@@ -16,23 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.action.context.factory;
 
-import lombok.experimental.SuperBuilder;
+import org.apache.paimon.web.api.action.context.ActionContext;
 
-import javax.annotation.Nullable;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
-import java.util.List;
+/** ActionContextFactory. */
+public interface ActionContextFactory {
 
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends 
FlinkCdcTableSyncActionContext {
-
-    @ActionConf(value = "postgres_conf")
-    @Nullable
-    private final List<String> postgresConfList;
-
-    public String name() {
-        return "postgres_sync_table";
-    }
+    ActionContext getActionContext(ObjectNode actionConfigs);
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactoryServiceLoadUtil.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactoryServiceLoadUtil.java
new file mode 100644
index 0000000..2fe07d7
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactoryServiceLoadUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.action.context.factory;
+
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.api.exception.ActionException;
+
+import java.util.Objects;
+import java.util.ServiceLoader;
+
+/** ActionContextFactoryServiceLoadUtil. */
+public class ActionContextFactoryServiceLoadUtil {
+
+    private ActionContextFactoryServiceLoadUtil() {}
+
+    public static FlinkCdcActionContextFactory getFlinkCdcActionContextFactory(
+            String sourceType, String targetType, FlinkCdcSyncType 
flinkCdcSyncType) {
+        ServiceLoader<FlinkCdcActionContextFactory> serviceLoader =
+                ServiceLoader.load(FlinkCdcActionContextFactory.class);
+        for (FlinkCdcActionContextFactory factory : serviceLoader) {
+            if (factory.cdcType() == flinkCdcSyncType
+                    && Objects.equals(factory.sourceType(), sourceType)
+                    && Objects.equals(factory.targetType(), targetType)) {
+                return factory;
+            }
+        }
+        throw new ActionException(("Could not find suitable 
FlinkCdcActionContextFactory."));
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/FlinkCdcActionContextFactory.java
similarity index 63%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/FlinkCdcActionContextFactory.java
index 8cf5dd8..6a52dd0 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/FlinkCdcActionContextFactory.java
@@ -16,23 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.action.context.factory;
 
-import lombok.experimental.SuperBuilder;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
 
-import javax.annotation.Nullable;
+/** FlinkCdcActionContextFactory. */
+public interface FlinkCdcActionContextFactory extends ActionContextFactory {
 
-import java.util.List;
+    String sourceType();
 
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends 
FlinkCdcTableSyncActionContext {
+    String targetType();
 
-    @ActionConf(value = "postgres_conf")
-    @Nullable
-    private final List<String> postgresConfList;
-
-    public String name() {
-        return "postgres_sync_table";
-    }
+    FlinkCdcSyncType cdcType();
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
new file mode 100644
index 0000000..8b315e1
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.action.context.factory;
+
+import org.apache.paimon.web.api.action.context.ActionContext;
+import org.apache.paimon.web.api.action.context.ActionContextUtil;
+import org.apache.paimon.web.api.action.context.MysqlSyncTableActionContext;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.api.enums.FlinkJobType;
+import org.apache.paimon.web.common.util.JSONUtils;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.service.AutoService;
+
+/** MysqlSyncTableActionContextFactory. */
+@AutoService(FlinkCdcActionContextFactory.class)
+public class MysqlSyncTableActionContextFactory implements 
FlinkCdcActionContextFactory {
+
+    @Override
+    public String sourceType() {
+        return FlinkCdcDataSourceType.MYSQL.name();
+    }
+
+    @Override
+    public String targetType() {
+        return FlinkCdcDataSourceType.PAIMON.name();
+    }
+
+    @Override
+    public FlinkCdcSyncType cdcType() {
+        return FlinkCdcSyncType.SINGLE_TABLE_SYNC;
+    }
+
+    @Override
+    public ActionContext getActionContext(ObjectNode actionConfigs) {
+        return MysqlSyncTableActionContext.builder()
+                
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
+                .flinkJobType(FlinkJobType.SESSION)
+                .warehouse(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.WAREHOUSE))
+                .database(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.DATABASE))
+                .table(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.TABLE))
+                .primaryKeys(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.PRIMARY_KEYS))
+                .actionPath(ActionContextUtil.getActionJarPath())
+                .catalogConfList(JSONUtils.getList(actionConfigs, 
FlinkCdcOptions.CATALOG_CONF))
+                .mysqlConfList(JSONUtils.getList(actionConfigs, 
FlinkCdcOptions.MYSQL_CONF))
+                .build();
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
similarity index 57%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
index 03e5621..10fbdb8 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
@@ -16,24 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.action.context.options;
 
-import lombok.experimental.SuperBuilder;
+/** FlinkCdcOptions. */
+public class FlinkCdcOptions {
 
-import javax.annotation.Nullable;
+    private FlinkCdcOptions() {}
 
-import java.util.List;
+    public static final String MYSQL_CONF = "mysql_conf";
 
-/** Mysql sync database action context. */
-@SuperBuilder
-public class MysqlSyncDatabaseActionContext extends 
FlinkCdcDatabasesSyncActionContext
-        implements ActionContext {
+    public static final String TABLE_CONF = "table_conf";
 
-    @ActionConf(value = "mysql_conf")
-    @Nullable
-    private final List<String> mysqlConfList;
+    public static final String WAREHOUSE = "warehouse";
 
-    public String name() {
-        return "mysql_sync_database";
-    }
+    public static final String DATABASE = "database";
+
+    public static final String TABLE = "table";
+
+    public static final String PRIMARY_KEYS = "primary_keys";
+
+    public static final String SESSION_URL = "sessionUrl";
+
+    public static final String CATALOG_CONF = "catalog_conf";
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
index 6020634..cf73f55 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
@@ -26,14 +26,20 @@ import org.apache.paimon.web.api.exception.ActionException;
 import org.apache.paimon.web.api.shell.ShellService;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /** An abstract Action service that executes actions through the shell. */
 @Slf4j
 public class FlinkCdcActionService implements ActionService {
+    private static final ExecutorService shellExecutor = 
Executors.newFixedThreadPool(5);
 
     private List<String> getCommand(FlinkActionContext actionContext) {
         List<String> commandList = new ArrayList<>();
@@ -63,6 +69,24 @@ public class FlinkCdcActionService implements ActionService {
         try {
             List<String> command = getCommand(flinkActionContext);
             Process process = new ShellService(flinkHome, command).execute();
+            shellExecutor.execute(
+                    () -> {
+                        try (InputStream inputStream = 
process.getInputStream();
+                                InputStream errorStream = 
process.getErrorStream(); ) {
+                            List<String> logLines =
+                                    IOUtils.readLines(inputStream, 
StandardCharsets.UTF_8);
+                            for (String logLine : logLines) {
+                                log.info(logLine);
+                            }
+                            List<String> errorLines =
+                                    IOUtils.readLines(errorStream, 
StandardCharsets.UTF_8);
+                            for (String logLine : errorLines) {
+                                log.error(logLine);
+                            }
+                        } catch (Exception e) {
+                            log.error(e.getMessage(), e);
+                        }
+                    });
             result = ActionExecutionResult.success();
         } catch (Exception exception) {
             log.error(exception.getMessage(), exception);
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
index edd5e91..2b86893 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
@@ -37,8 +37,14 @@ public class PaimonServiceFactory {
         if (catalogOptions == null) {
             catalogOptions = new HashMap<>();
         }
-        Options options = new Options();
+        Options options = convertToPaimonOptions(catalogOptions);
+        CatalogContext context = CatalogContext.create(options);
         options.set(CatalogProperties.WAREHOUSE, warehouse);
+        return new PaimonService(CatalogFactory.createCatalog(context), name);
+    }
+
+    public static Options convertToPaimonOptions(Map<String, String> 
catalogOptions) {
+        Options options = new Options();
         String fileSystemType = catalogOptions.get("fileSystemType");
         if ("s3".equalsIgnoreCase(fileSystemType)) {
             options.set(CatalogProperties.S3_ENDPOINT, 
catalogOptions.get("endpoint"));
@@ -49,8 +55,7 @@ public class PaimonServiceFactory {
             options.set(CatalogProperties.OSS_ACCESS_KEY_ID, 
catalogOptions.get("accessKey"));
             options.set(CatalogProperties.OSS_ACCESS_KEY_SECRET, 
catalogOptions.get("secretKey"));
         }
-        CatalogContext context = CatalogContext.create(options);
-        return new PaimonService(CatalogFactory.createCatalog(context), name);
+        return options;
     }
 
     public static PaimonService createHiveCatalogService(
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
similarity index 65%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
index 8cf5dd8..bf0a286 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
@@ -16,23 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.enums;
 
-import lombok.experimental.SuperBuilder;
+import lombok.Getter;
 
-import javax.annotation.Nullable;
+/** FlinkCdcDataSourceType. */
+@Getter
+public enum FlinkCdcDataSourceType {
+    MYSQL("MySQL"),
+    PAIMON("Paimon");
 
-import java.util.List;
+    private final String name;
 
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends 
FlinkCdcTableSyncActionContext {
-
-    @ActionConf(value = "postgres_conf")
-    @Nullable
-    private final List<String> postgresConfList;
-
-    public String name() {
-        return "postgres_sync_table";
+    FlinkCdcDataSourceType(String name) {
+        this.name = name;
     }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcSyncType.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcSyncType.java
new file mode 100644
index 0000000..8d0b727
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcSyncType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.enums;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** FlinkCdcType. */
+@Getter
+public enum FlinkCdcSyncType {
+    SINGLE_TABLE_SYNC(0),
+    ALL_DATABASES_SYNC(1);
+
+    private final Integer value;
+
+    private static final Map<Integer, FlinkCdcSyncType> enumsMap;
+
+    static {
+        enumsMap =
+                Arrays.stream(FlinkCdcSyncType.values())
+                        .collect(Collectors.toMap(FlinkCdcSyncType::getValue, 
Function.identity()));
+    }
+
+    FlinkCdcSyncType(Integer value) {
+        this.value = value;
+    }
+
+    public static FlinkCdcSyncType valueOf(Integer value) {
+        if (enumsMap.containsKey(value)) {
+            return enumsMap.get(value);
+        }
+        throw new RuntimeException("Invalid cdc sync type.");
+    }
+}
diff --git 
a/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
 
b/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
index 562e914..bde2458 100644
--- 
a/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
+++ 
b/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
@@ -26,15 +26,14 @@ import java.util.List;
 import static org.junit.jupiter.api.Assertions.assertLinesMatch;
 
 /**
- * The test class of postgres sync table action context in {@link
- * PostgresSyncTableCdcActionContext}.
+ * The test class of postgres sync table action context in {@link 
PostgresSyncTableActionContext}.
  */
 public class PostgresSyncTableActionContextTest extends 
FlinkCdcActionContextTestBase {
 
     @Test
     public void testBuild() {
         List<String> args =
-                PostgresSyncTableCdcActionContext.builder()
+                PostgresSyncTableActionContext.builder()
                         .warehouse(WAREHOUSE)
                         .database(DATABASE)
                         .table(TABLE)
diff --git a/paimon-web-common/pom.xml b/paimon-web-common/pom.xml
index cbe8946..7b8545d 100644
--- a/paimon-web-common/pom.xml
+++ b/paimon-web-common/pom.xml
@@ -57,6 +57,16 @@ under the License.
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java
 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java
new file mode 100644
index 0000000..cb3240d
--- /dev/null
+++ 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.common.util;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.POJONode;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static 
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static 
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+/** Json utils. */
+@Slf4j
+public class JSONUtils {
+
+    private static final ObjectMapper OBJECT_MAPPER =
+            JsonMapper.builder()
+                    .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+                    .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+                    .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+                    .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+                    .addModule(
+                            new SimpleModule()
+                                    .addSerializer(
+                                            LocalDateTime.class, new 
LocalDateTimeSerializer())
+                                    .addDeserializer(
+                                            LocalDateTime.class, new 
LocalDateTimeDeserializer()))
+                    .defaultTimeZone(TimeZone.getDefault())
+                    .defaultDateFormat(new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss"))
+                    .build();
+
+    public static ObjectMapper getObjectMapper() {
+        return OBJECT_MAPPER;
+    }
+
+    private JSONUtils() {
+        throw new UnsupportedOperationException("Construct JSONUtils");
+    }
+
+    public static synchronized void setTimeZone(TimeZone timeZone) {
+        OBJECT_MAPPER.setTimeZone(timeZone);
+    }
+
+    public static ArrayNode createArrayNode() {
+        return OBJECT_MAPPER.createArrayNode();
+    }
+
+    public static ObjectNode createObjectNode() {
+        return OBJECT_MAPPER.createObjectNode();
+    }
+
+    public static String getString(JsonNode jsonNode, String fieldName) {
+        return getString(jsonNode, fieldName, "");
+    }
+
+    public static String getString(JsonNode jsonNode, String fieldName, String 
defaultValue) {
+        if (jsonNode == null) {
+            return defaultValue;
+        }
+        JsonNode node = jsonNode.get(fieldName);
+        if (node == null || !node.isTextual()) {
+            return defaultValue;
+        }
+        return node.asText();
+    }
+
+    public static Integer getInteger(JsonNode jsonNode, String fieldName) {
+        return getInteger(jsonNode, fieldName, 0);
+    }
+
+    public static Integer getInteger(JsonNode jsonNode, String fieldName, 
Integer defaultValue) {
+        if (jsonNode == null) {
+            return defaultValue;
+        }
+        JsonNode node = jsonNode.get(fieldName);
+        if (node == null || !node.isInt()) {
+            return defaultValue;
+        }
+        return node.asInt();
+    }
+
+    public static List<String> getList(JsonNode jsonNode, String fieldName) {
+        return getList(jsonNode, fieldName, String.class);
+    }
+
+    public static <E> List<E> getList(JsonNode jsonNode, String fieldName, 
Class<E> clazz) {
+        if (jsonNode == null) {
+            return new ArrayList<>();
+        }
+        JsonNode child = jsonNode.get(fieldName);
+        if (!(child instanceof ArrayNode) && !(child instanceof POJONode)) {
+            return new ArrayList<>();
+        }
+        List<?> childArray;
+        if (child instanceof POJONode) {
+            Object pojo = ((POJONode) child).getPojo();
+            childArray = (List<?>) pojo;
+        } else {
+            childArray = (List<?>) child;
+        }
+
+        List<E> result = new ArrayList<>();
+        childArray.forEach(
+                e -> {
+                    
result.add(JSONUtils.parseObject(JSONUtils.toJsonString(e), clazz));
+                });
+        return result;
+    }
+
+    /**
+     * object to json string.
+     *
+     * @param object object
+     * @return json string
+     */
+    public static String toJsonString(Object object) {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(object);
+        } catch (Exception e) {
+            throw new RuntimeException("Object json deserialization 
exception.", e);
+        }
+    }
+
+    public static @Nullable <T> T parseObject(String json, Class<T> clazz) {
+        if (Strings.isNullOrEmpty(json)) {
+            return null;
+        }
+
+        try {
+            return OBJECT_MAPPER.readValue(json, clazz);
+        } catch (Exception e) {
+            log.error("Parse object exception, jsonStr: {}, class: {}", json, 
clazz, e);
+        }
+        return null;
+    }
+
+    /** LocalDateTimeSerializer. */
+    public static class LocalDateTimeSerializer extends 
JsonSerializer<LocalDateTime> {
+
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+
+        @Override
+        public void serialize(
+                LocalDateTime value, JsonGenerator gen, SerializerProvider 
serializers)
+                throws IOException {
+            gen.writeString(value.format(formatter));
+        }
+    }
+
+    /** LocalDateTimeDeserializer. */
+    public static class LocalDateTimeDeserializer extends 
JsonDeserializer<LocalDateTime> {
+
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+
+        @Override
+        public LocalDateTime deserialize(JsonParser p, DeserializationContext 
context)
+                throws IOException {
+            return LocalDateTime.parse(p.getValueAsString(), formatter);
+        }
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/MapUtil.java
similarity index 61%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
copy to 
paimon-web-common/src/main/java/org/apache/paimon/web/common/util/MapUtil.java
index 03e5621..8861aab 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
+++ 
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/MapUtil.java
@@ -16,24 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
-
-import lombok.experimental.SuperBuilder;
-
-import javax.annotation.Nullable;
+package org.apache.paimon.web.common.util;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
-/** Mysql sync database action context. */
-@SuperBuilder
-public class MysqlSyncDatabaseActionContext extends 
FlinkCdcDatabasesSyncActionContext
-        implements ActionContext {
+/** Map util. */
+public class MapUtil {
 
-    @ActionConf(value = "mysql_conf")
-    @Nullable
-    private final List<String> mysqlConfList;
+    private MapUtil() {}
+
+    public static String getString(Map<String, ?> map, String key) {
+        return Optional.ofNullable(map).map(e -> 
String.valueOf(e.get(key))).orElse("");
+    }
 
-    public String name() {
-        return "mysql_sync_database";
+    public static <E> List<E> getList(Map<String, ?> map, String key) {
+        return Optional.ofNullable(map).map(e -> (List<E>) 
map.get(key)).orElse(new ArrayList<>());
     }
 }
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
index f539cba..75fc885 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.web.server.controller;
 
 import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
 import org.apache.paimon.web.server.data.model.CdcJobDefinition;
 import org.apache.paimon.web.server.data.result.PageR;
 import org.apache.paimon.web.server.data.result.R;
@@ -85,4 +86,9 @@ public class CdcJobDefinitionController {
         cdcJobDefinitionService.removeById(id);
         return R.succeed();
     }
+
+    @PostMapping("{id}/submit")
+    public R<Void> submit(@PathVariable Integer id, @RequestBody 
CdcJobSubmitDTO cdcJobSubmitDTO) {
+        return cdcJobDefinitionService.submit(id, cdcJobSubmitDTO);
+    }
 }
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
similarity index 63%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to 
paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
index 8cf5dd8..6800b73 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
@@ -16,23 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.server.data.dto;
 
-import lombok.experimental.SuperBuilder;
+import lombok.Data;
 
-import javax.annotation.Nullable;
+/** DTO of CdcJobSubmit. */
+@Data
+public class CdcJobSubmitDTO {
 
-import java.util.List;
-
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends 
FlinkCdcTableSyncActionContext {
-
-    @ActionConf(value = "postgres_conf")
-    @Nullable
-    private final List<String> postgresConfList;
-
-    public String name() {
-        return "postgres_sync_table";
-    }
+    private String flinkSessionUrl;
 }
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcGraph.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcGraph.java
new file mode 100644
index 0000000..1fd8b13
--- /dev/null
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcGraph.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.server.data.model.cdc;
+
+import org.apache.paimon.web.common.util.JSONUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/** CdcGraph. */
+@Getter
+public class CdcGraph {
+
+    private CdcNode source;
+
+    private CdcNode target;
+
+    public static CdcGraph fromCdcGraphJsonString(String json) {
+        CdcGraph cdcGraph = new CdcGraph();
+        try {
+            List<JsonNode> edges = new ArrayList<>();
+            List<JsonNode> nodes = new ArrayList<>();
+            ObjectMapper objectMapper = JSONUtils.getObjectMapper();
+            JsonNode cdcGraphJson = objectMapper.readTree(json);
+            ArrayNode cellsJson = (ArrayNode) cdcGraphJson.get("cells");
+            cellsJson.forEach(
+                    e -> {
+                        String shape = e.get("shape").asText();
+                        if ("dag-edge".equals(shape)) {
+                            edges.add(e);
+                        } else {
+                            nodes.add(e);
+                        }
+                    });
+            if (edges.size() != 1 && nodes.size() != 2) {
+                throw new RuntimeException("The number of cdc graph nodes is 
abnormal.");
+            }
+            JsonNode jsonNode = edges.get(0);
+            JsonNode sourceTypeJson = jsonNode.get("source");
+            String sourceType = sourceTypeJson.get("cell").asText();
+            JsonNode targetTypeJson = jsonNode.get("target");
+            String targetType = targetTypeJson.get("cell").asText();
+            CdcNode source = new CdcNode();
+            CdcNode target = new CdcNode();
+            for (JsonNode node : nodes) {
+                ObjectNode data = (ObjectNode) node.get("data");
+                String type = node.get("id").asText();
+                if (Objects.equals(sourceType, type)) {
+                    source.setType(type);
+                    source.setData(data);
+                }
+                if (Objects.equals(targetType, type)) {
+                    target.setType(type);
+                    target.setData(data);
+                }
+            }
+            cdcGraph.source = source;
+            cdcGraph.target = target;
+        } catch (Exception e) {
+            throw new RuntimeException("CdcGraph is not supported:" + 
e.getMessage(), e);
+        }
+        return cdcGraph;
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcNode.java
similarity index 63%
rename from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
rename to 
paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcNode.java
index 8cf5dd8..15354c2 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcNode.java
@@ -16,23 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.server.data.model.cdc;
 
-import lombok.experimental.SuperBuilder;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Data;
 
-import javax.annotation.Nullable;
+/** CdcNode. */
+@Data
+public class CdcNode {
 
-import java.util.List;
+    private String type;
 
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends 
FlinkCdcTableSyncActionContext {
-
-    @ActionConf(value = "postgres_conf")
-    @Nullable
-    private final List<String> postgresConfList;
-
-    public String name() {
-        return "postgres_sync_table";
-    }
+    private ObjectNode data;
 }
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
index 9da2679..0ec3814 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.web.server.service;
 
 import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
 import org.apache.paimon.web.server.data.model.CdcJobDefinition;
 import org.apache.paimon.web.server.data.result.PageR;
 import org.apache.paimon.web.server.data.result.R;
@@ -33,4 +34,6 @@ public interface CdcJobDefinitionService extends 
IService<CdcJobDefinition> {
     PageR<CdcJobDefinition> listAll(boolean withConfig, long currentPage, long 
pageSize);
 
     R<Void> update(CdcJobDefinitionDTO cdcJobDefinitionDTO);
+
+    R<Void> submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO);
 }
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index be3979f..f7dce8f 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -18,25 +18,49 @@
 
 package org.apache.paimon.web.server.service.impl;
 
+import org.apache.paimon.web.api.action.context.ActionContext;
+import 
org.apache.paimon.web.api.action.context.factory.ActionContextFactoryServiceLoadUtil;
+import 
org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+import org.apache.paimon.web.api.action.service.ActionService;
+import org.apache.paimon.web.api.action.service.FlinkCdcActionService;
+import org.apache.paimon.web.api.catalog.PaimonServiceFactory;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.common.util.JSONUtils;
 import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
+import org.apache.paimon.web.server.data.model.CatalogInfo;
 import org.apache.paimon.web.server.data.model.CdcJobDefinition;
+import org.apache.paimon.web.server.data.model.cdc.CdcGraph;
+import org.apache.paimon.web.server.data.model.cdc.CdcNode;
 import org.apache.paimon.web.server.data.result.PageR;
 import org.apache.paimon.web.server.data.result.R;
 import org.apache.paimon.web.server.data.result.enums.Status;
 import org.apache.paimon.web.server.mapper.CdcJobDefinitionMapper;
+import org.apache.paimon.web.server.service.CatalogService;
 import org.apache.paimon.web.server.service.CdcJobDefinitionService;
+import org.apache.paimon.web.server.util.StringUtils;
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 /** CdcJobDefinitionServiceImpl. */
 @Service
 public class CdcJobDefinitionServiceImpl
         extends ServiceImpl<CdcJobDefinitionMapper, CdcJobDefinition>
         implements CdcJobDefinitionService {
 
+    @Autowired private CatalogService catalogService;
+
     @Override
     public R<Void> create(CdcJobDefinitionDTO cdcJobDefinitionDTO) {
         String name = cdcJobDefinitionDTO.getName();
@@ -102,4 +126,93 @@ public class CdcJobDefinitionServiceImpl
         baseMapper.updateById(cdcJobDefinition);
         return R.succeed();
     }
+
+    @Override
+    public R<Void> submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO) {
+        CdcJobDefinition cdcJobDefinition = baseMapper.selectById(id);
+        String config = cdcJobDefinition.getConfig();
+        FlinkCdcSyncType flinkCdcSyncType = 
FlinkCdcSyncType.valueOf(cdcJobDefinition.getCdcType());
+        ActionService actionService = new FlinkCdcActionService();
+        CdcGraph cdcGraph = CdcGraph.fromCdcGraphJsonString(config);
+        FlinkCdcActionContextFactory factory =
+                
ActionContextFactoryServiceLoadUtil.getFlinkCdcActionContextFactory(
+                        cdcGraph.getSource().getType(),
+                        cdcGraph.getTarget().getType(),
+                        flinkCdcSyncType);
+        ObjectNode actionConfigs = JSONUtils.createObjectNode();
+        actionConfigs.put(FlinkCdcOptions.SESSION_URL, 
cdcJobSubmitDTO.getFlinkSessionUrl());
+        handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource());
+        handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget());
+        ActionContext actionContext = factory.getActionContext(actionConfigs);
+        try {
+            actionService.execute(actionContext);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return R.succeed();
+    }
+
+    private void handleCdcGraphNodeData(ObjectNode actionConfigs, CdcNode 
node) {
+        String type = node.getType();
+        switch (type) {
+            case "Paimon":
+                handlePaimonNodeData(actionConfigs, node.getData());
+                break;
+            case "MySQL":
+                handleMysqlNodeData(actionConfigs, node.getData());
+                break;
+        }
+    }
+
+    private void handleMysqlNodeData(ObjectNode actionConfigs, ObjectNode 
mysqlData) {
+        String otherConfigs = JSONUtils.getString(mysqlData, "other_configs");
+        List<String> mysqlConfList;
+        if (StringUtils.isBlank(otherConfigs)) {
+            mysqlConfList = new ArrayList<>();
+        } else {
+            mysqlConfList = new 
ArrayList<>(Arrays.asList(otherConfigs.split(";")));
+        }
+        mysqlConfList.add(buildKeyValueString("hostname", 
JSONUtils.getString(mysqlData, "host")));
+        mysqlConfList.add(
+                buildKeyValueString("username", JSONUtils.getString(mysqlData, 
"username")));
+        mysqlConfList.add(buildKeyValueString("port", 
JSONUtils.getString(mysqlData, "port")));
+        mysqlConfList.add(
+                buildKeyValueString("database-name", 
JSONUtils.getString(mysqlData, "database")));
+        mysqlConfList.add(
+                buildKeyValueString("table-name", 
JSONUtils.getString(mysqlData, "table_name")));
+        mysqlConfList.add(
+                buildKeyValueString("password", JSONUtils.getString(mysqlData, 
"password")));
+        actionConfigs.putPOJO(FlinkCdcOptions.MYSQL_CONF, mysqlConfList);
+    }
+
+    private void handlePaimonNodeData(ObjectNode actionConfigs, ObjectNode 
paimonData) {
+        Integer catalog = JSONUtils.getInteger(paimonData, "catalog");
+        CatalogInfo catalogInfo = catalogService.getById(catalog);
+        actionConfigs.put(FlinkCdcOptions.WAREHOUSE, 
catalogInfo.getWarehouse());
+        actionConfigs.put(FlinkCdcOptions.TABLE, 
JSONUtils.getString(paimonData, "table_name"));
+        actionConfigs.put(FlinkCdcOptions.DATABASE, 
JSONUtils.getString(paimonData, "database"));
+        actionConfigs.put(
+                FlinkCdcOptions.PRIMARY_KEYS, JSONUtils.getString(paimonData, 
"primary_key"));
+        String otherConfigs = JSONUtils.getString(paimonData, 
"other_configs2");
+        if (StringUtils.isBlank(otherConfigs)) {
+            actionConfigs.putPOJO(FlinkCdcOptions.TABLE_CONF, new 
ArrayList<>());
+        } else {
+            actionConfigs.putPOJO(
+                    FlinkCdcOptions.TABLE_CONF, 
Arrays.asList(otherConfigs.split(";")));
+        }
+        List<String> catalogConfList = new ArrayList<>();
+        Map<String, String> options = catalogInfo.getOptions();
+        PaimonServiceFactory.convertToPaimonOptions(options)
+                .toMap()
+                .forEach(
+                        (k, v) -> {
+                            catalogConfList.add(buildKeyValueString(k, v));
+                        });
+
+        actionConfigs.putPOJO(FlinkCdcOptions.CATALOG_CONF, catalogConfList);
+    }
+
+    private String buildKeyValueString(String key, String value) {
+        return key + "=" + value;
+    }
 }

Reply via email to