This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new f82a728 [Feature] Introduce CDC job submission interface (#237)
f82a728 is described below
commit f82a7287ebbad95aee0dd1a300b2644b7a5016ed
Author: yangyang zhong <[email protected]>
AuthorDate: Wed May 22 21:27:02 2024 +0800
[Feature] Introduce CDC job submission interface (#237)
---
.../web/api/action/context/ActionContextUtil.java | 22 +++
.../FlinkCdcDatabasesSyncActionContext.java | 10 +-
.../context/MysqlSyncDatabaseActionContext.java | 4 +-
...xt.java => PostgresSyncTableActionContext.java} | 2 +-
.../ActionContextFactory.java} | 21 +--
.../ActionContextFactoryServiceLoadUtil.java | 45 +++++
.../FlinkCdcActionContextFactory.java} | 21 +--
.../MysqlSyncTableActionContextFactory.java | 66 +++++++
.../FlinkCdcOptions.java} | 30 +--
.../api/action/service/FlinkCdcActionService.java | 24 +++
.../web/api/catalog/PaimonServiceFactory.java | 11 +-
.../FlinkCdcDataSourceType.java} | 24 +--
.../paimon/web/api/enums/FlinkCdcSyncType.java | 54 ++++++
.../PostgresSyncTableActionContextTest.java | 5 +-
paimon-web-common/pom.xml | 10 +
.../apache/paimon/web/common/util/JSONUtils.java | 201 +++++++++++++++++++++
.../org/apache/paimon/web/common/util/MapUtil.java | 27 ++-
.../controller/CdcJobDefinitionController.java | 6 +
.../web/server/data/dto/CdcJobSubmitDTO.java | 22 +--
.../paimon/web/server/data/model/cdc/CdcGraph.java | 87 +++++++++
.../paimon/web/server/data/model/cdc/CdcNode.java | 23 +--
.../server/service/CdcJobDefinitionService.java | 3 +
.../service/impl/CdcJobDefinitionServiceImpl.java | 113 ++++++++++++
23 files changed, 717 insertions(+), 114 deletions(-)
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
index bf06e94..f184823 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/ActionContextUtil.java
@@ -18,8 +18,12 @@
package org.apache.paimon.web.api.action.context;
+import org.apache.paimon.web.api.exception.ActionException;
+
import org.apache.commons.lang3.StringUtils;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/** ActionContext Util. */
@@ -41,4 +45,22 @@ public class ActionContextUtil {
}
}
}
+
+ public static String getActionJarPath() {
+ String actionJarPath = System.getenv("ACTION_JAR_PATH");
+ if (StringUtils.isBlank(actionJarPath)) {
+ actionJarPath = System.getProperty("ACTION_JAR_PATH");
+ }
+ if (StringUtils.isBlank(actionJarPath)) {
+ throw new ActionException("ACTION_JAR_PATH is null");
+ }
+ return actionJarPath;
+ }
+
+ public static List<String> getConfListFromString(String value, String
spiltCharacter) {
+ if (StringUtils.isBlank(value)) {
+ return new ArrayList<>();
+ }
+ return Arrays.asList(value.split(spiltCharacter));
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
index 9cc42d6..546312b 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcDatabasesSyncActionContext.java
@@ -18,6 +18,8 @@
package org.apache.paimon.web.api.action.context;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+
import lombok.experimental.SuperBuilder;
import javax.annotation.Nullable;
@@ -29,17 +31,17 @@ import java.util.List;
public abstract class FlinkCdcDatabasesSyncActionContext extends
FlinkActionContext
implements ActionContext {
- @ActionConf(value = "warehouse")
+ @ActionConf(value = FlinkCdcOptions.WAREHOUSE)
protected String warehouse;
- @ActionConf(value = "database")
+ @ActionConf(value = FlinkCdcOptions.DATABASE)
protected String database;
- @ActionConf(value = "catalog_conf")
+ @ActionConf(value = FlinkCdcOptions.CATALOG_CONF)
@Nullable
protected List<String> catalogConfList;
- @ActionConf(value = "table_conf")
+ @ActionConf(value = FlinkCdcOptions.TABLE_CONF)
@Nullable
protected List<String> tableConfList;
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
index 03e5621..d725176 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
@@ -18,6 +18,8 @@
package org.apache.paimon.web.api.action.context;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+
import lombok.experimental.SuperBuilder;
import javax.annotation.Nullable;
@@ -29,7 +31,7 @@ import java.util.List;
public class MysqlSyncDatabaseActionContext extends
FlinkCdcDatabasesSyncActionContext
implements ActionContext {
- @ActionConf(value = "mysql_conf")
+ @ActionConf(value = FlinkCdcOptions.MYSQL_CONF)
@Nullable
private final List<String> mysqlConfList;
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContext.java
similarity index 93%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContext.java
index 8cf5dd8..b00f429 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContext.java
@@ -26,7 +26,7 @@ import java.util.List;
/** Postgres sync table action context. */
@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends
FlinkCdcTableSyncActionContext {
+public class PostgresSyncTableActionContext extends
FlinkCdcTableSyncActionContext {
@ActionConf(value = "postgres_conf")
@Nullable
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactory.java
similarity index 63%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactory.java
index 8cf5dd8..3eb4506 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactory.java
@@ -16,23 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.action.context.factory;
-import lombok.experimental.SuperBuilder;
+import org.apache.paimon.web.api.action.context.ActionContext;
-import javax.annotation.Nullable;
+import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.util.List;
+/** ActionContextFactory. */
+public interface ActionContextFactory {
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends
FlinkCdcTableSyncActionContext {
-
- @ActionConf(value = "postgres_conf")
- @Nullable
- private final List<String> postgresConfList;
-
- public String name() {
- return "postgres_sync_table";
- }
+ ActionContext getActionContext(ObjectNode actionConfigs);
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactoryServiceLoadUtil.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactoryServiceLoadUtil.java
new file mode 100644
index 0000000..2fe07d7
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/ActionContextFactoryServiceLoadUtil.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.action.context.factory;
+
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.api.exception.ActionException;
+
+import java.util.Objects;
+import java.util.ServiceLoader;
+
+/** ActionContextFactoryServiceLoadUtil. */
+public class ActionContextFactoryServiceLoadUtil {
+
+ private ActionContextFactoryServiceLoadUtil() {}
+
+ public static FlinkCdcActionContextFactory getFlinkCdcActionContextFactory(
+ String sourceType, String targetType, FlinkCdcSyncType
flinkCdcSyncType) {
+ ServiceLoader<FlinkCdcActionContextFactory> serviceLoader =
+ ServiceLoader.load(FlinkCdcActionContextFactory.class);
+ for (FlinkCdcActionContextFactory factory : serviceLoader) {
+ if (factory.cdcType() == flinkCdcSyncType
+ && Objects.equals(factory.sourceType(), sourceType)
+ && Objects.equals(factory.targetType(), targetType)) {
+ return factory;
+ }
+ }
+ throw new ActionException(("Could not find suitable
FlinkCdcActionContextFactory."));
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/FlinkCdcActionContextFactory.java
similarity index 63%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/FlinkCdcActionContextFactory.java
index 8cf5dd8..6a52dd0 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/FlinkCdcActionContextFactory.java
@@ -16,23 +16,16 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.action.context.factory;
-import lombok.experimental.SuperBuilder;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
-import javax.annotation.Nullable;
+/** FlinkCdcActionContextFactory. */
+public interface FlinkCdcActionContextFactory extends ActionContextFactory {
-import java.util.List;
+ String sourceType();
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends
FlinkCdcTableSyncActionContext {
+ String targetType();
- @ActionConf(value = "postgres_conf")
- @Nullable
- private final List<String> postgresConfList;
-
- public String name() {
- return "postgres_sync_table";
- }
+ FlinkCdcSyncType cdcType();
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
new file mode 100644
index 0000000..8b315e1
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.action.context.factory;
+
+import org.apache.paimon.web.api.action.context.ActionContext;
+import org.apache.paimon.web.api.action.context.ActionContextUtil;
+import org.apache.paimon.web.api.action.context.MysqlSyncTableActionContext;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.api.enums.FlinkJobType;
+import org.apache.paimon.web.common.util.JSONUtils;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.service.AutoService;
+
+/** MysqlSyncTableActionContextFactory. */
+@AutoService(FlinkCdcActionContextFactory.class)
+public class MysqlSyncTableActionContextFactory implements
FlinkCdcActionContextFactory {
+
+ @Override
+ public String sourceType() {
+ return FlinkCdcDataSourceType.MYSQL.name();
+ }
+
+ @Override
+ public String targetType() {
+ return FlinkCdcDataSourceType.PAIMON.name();
+ }
+
+ @Override
+ public FlinkCdcSyncType cdcType() {
+ return FlinkCdcSyncType.SINGLE_TABLE_SYNC;
+ }
+
+ @Override
+ public ActionContext getActionContext(ObjectNode actionConfigs) {
+ return MysqlSyncTableActionContext.builder()
+
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
+ .flinkJobType(FlinkJobType.SESSION)
+ .warehouse(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.WAREHOUSE))
+ .database(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.DATABASE))
+ .table(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.TABLE))
+ .primaryKeys(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.PRIMARY_KEYS))
+ .actionPath(ActionContextUtil.getActionJarPath())
+ .catalogConfList(JSONUtils.getList(actionConfigs,
FlinkCdcOptions.CATALOG_CONF))
+ .mysqlConfList(JSONUtils.getList(actionConfigs,
FlinkCdcOptions.MYSQL_CONF))
+ .build();
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
similarity index 57%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
index 03e5621..10fbdb8 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
@@ -16,24 +16,26 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.action.context.options;
-import lombok.experimental.SuperBuilder;
+/** FlinkCdcOptions. */
+public class FlinkCdcOptions {
-import javax.annotation.Nullable;
+ private FlinkCdcOptions() {}
-import java.util.List;
+ public static final String MYSQL_CONF = "mysql_conf";
-/** Mysql sync database action context. */
-@SuperBuilder
-public class MysqlSyncDatabaseActionContext extends
FlinkCdcDatabasesSyncActionContext
- implements ActionContext {
+ public static final String TABLE_CONF = "table_conf";
- @ActionConf(value = "mysql_conf")
- @Nullable
- private final List<String> mysqlConfList;
+ public static final String WAREHOUSE = "warehouse";
- public String name() {
- return "mysql_sync_database";
- }
+ public static final String DATABASE = "database";
+
+ public static final String TABLE = "table";
+
+ public static final String PRIMARY_KEYS = "primary_keys";
+
+ public static final String SESSION_URL = "sessionUrl";
+
+ public static final String CATALOG_CONF = "catalog_conf";
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
index 6020634..cf73f55 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/service/FlinkCdcActionService.java
@@ -26,14 +26,20 @@ import org.apache.paimon.web.api.exception.ActionException;
import org.apache.paimon.web.api.shell.ShellService;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/** An abstract Action service that executes actions through the shell. */
@Slf4j
public class FlinkCdcActionService implements ActionService {
+ private static final ExecutorService shellExecutor =
Executors.newFixedThreadPool(5);
private List<String> getCommand(FlinkActionContext actionContext) {
List<String> commandList = new ArrayList<>();
@@ -63,6 +69,24 @@ public class FlinkCdcActionService implements ActionService {
try {
List<String> command = getCommand(flinkActionContext);
Process process = new ShellService(flinkHome, command).execute();
+ shellExecutor.execute(
+ () -> {
+ try (InputStream inputStream =
process.getInputStream();
+ InputStream errorStream =
process.getErrorStream(); ) {
+ List<String> logLines =
+ IOUtils.readLines(inputStream,
StandardCharsets.UTF_8);
+ for (String logLine : logLines) {
+ log.info(logLine);
+ }
+ List<String> errorLines =
+ IOUtils.readLines(errorStream,
StandardCharsets.UTF_8);
+ for (String logLine : errorLines) {
+ log.error(logLine);
+ }
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ });
result = ActionExecutionResult.success();
} catch (Exception exception) {
log.error(exception.getMessage(), exception);
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
index edd5e91..2b86893 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/catalog/PaimonServiceFactory.java
@@ -37,8 +37,14 @@ public class PaimonServiceFactory {
if (catalogOptions == null) {
catalogOptions = new HashMap<>();
}
- Options options = new Options();
+ Options options = convertToPaimonOptions(catalogOptions);
+ CatalogContext context = CatalogContext.create(options);
options.set(CatalogProperties.WAREHOUSE, warehouse);
+ return new PaimonService(CatalogFactory.createCatalog(context), name);
+ }
+
+ public static Options convertToPaimonOptions(Map<String, String>
catalogOptions) {
+ Options options = new Options();
String fileSystemType = catalogOptions.get("fileSystemType");
if ("s3".equalsIgnoreCase(fileSystemType)) {
options.set(CatalogProperties.S3_ENDPOINT,
catalogOptions.get("endpoint"));
@@ -49,8 +55,7 @@ public class PaimonServiceFactory {
options.set(CatalogProperties.OSS_ACCESS_KEY_ID,
catalogOptions.get("accessKey"));
options.set(CatalogProperties.OSS_ACCESS_KEY_SECRET,
catalogOptions.get("secretKey"));
}
- CatalogContext context = CatalogContext.create(options);
- return new PaimonService(CatalogFactory.createCatalog(context), name);
+ return options;
}
public static PaimonService createHiveCatalogService(
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
similarity index 65%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
index 8cf5dd8..bf0a286 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
@@ -16,23 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.api.enums;
-import lombok.experimental.SuperBuilder;
+import lombok.Getter;
-import javax.annotation.Nullable;
+/** FlinkCdcDataSourceType. */
+@Getter
+public enum FlinkCdcDataSourceType {
+ MYSQL("MySQL"),
+ PAIMON("Paimon");
-import java.util.List;
+ private final String name;
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends
FlinkCdcTableSyncActionContext {
-
- @ActionConf(value = "postgres_conf")
- @Nullable
- private final List<String> postgresConfList;
-
- public String name() {
- return "postgres_sync_table";
+ FlinkCdcDataSourceType(String name) {
+ this.name = name;
}
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcSyncType.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcSyncType.java
new file mode 100644
index 0000000..8d0b727
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcSyncType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.enums;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** FlinkCdcType. */
+@Getter
+public enum FlinkCdcSyncType {
+ SINGLE_TABLE_SYNC(0),
+ ALL_DATABASES_SYNC(1);
+
+ private final Integer value;
+
+ private static final Map<Integer, FlinkCdcSyncType> enumsMap;
+
+ static {
+ enumsMap =
+ Arrays.stream(FlinkCdcSyncType.values())
+ .collect(Collectors.toMap(FlinkCdcSyncType::getValue,
Function.identity()));
+ }
+
+ FlinkCdcSyncType(Integer value) {
+ this.value = value;
+ }
+
+ public static FlinkCdcSyncType valueOf(Integer value) {
+ if (enumsMap.containsKey(value)) {
+ return enumsMap.get(value);
+ }
+ throw new RuntimeException("Invalid cdc sync type.");
+ }
+}
diff --git
a/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
b/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
index 562e914..bde2458 100644
---
a/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
+++
b/paimon-web-api/src/test/java/org/apache/paimon/web/api/action/context/PostgresSyncTableActionContextTest.java
@@ -26,15 +26,14 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertLinesMatch;
/**
- * The test class of postgres sync table action context in {@link
- * PostgresSyncTableCdcActionContext}.
+ * The test class of postgres sync table action context in {@link
PostgresSyncTableActionContext}.
*/
public class PostgresSyncTableActionContextTest extends
FlinkCdcActionContextTestBase {
@Test
public void testBuild() {
List<String> args =
- PostgresSyncTableCdcActionContext.builder()
+ PostgresSyncTableActionContext.builder()
.warehouse(WAREHOUSE)
.database(DATABASE)
.table(TABLE)
diff --git a/paimon-web-common/pom.xml b/paimon-web-common/pom.xml
index cbe8946..7b8545d 100644
--- a/paimon-web-common/pom.xml
+++ b/paimon-web-common/pom.xml
@@ -57,6 +57,16 @@ under the License.
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java
new file mode 100644
index 0000000..cb3240d
--- /dev/null
+++
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/JSONUtils.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.common.util;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.POJONode;
+import com.google.common.base.Strings;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimeZone;
+
+import static
com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static
com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static
com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static
com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+/** Json utils. */
+@Slf4j
+public class JSONUtils {
+
+ private static final ObjectMapper OBJECT_MAPPER =
+ JsonMapper.builder()
+ .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .addModule(
+ new SimpleModule()
+ .addSerializer(
+ LocalDateTime.class, new
LocalDateTimeSerializer())
+ .addDeserializer(
+ LocalDateTime.class, new
LocalDateTimeDeserializer()))
+ .defaultTimeZone(TimeZone.getDefault())
+ .defaultDateFormat(new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss"))
+ .build();
+
+ public static ObjectMapper getObjectMapper() {
+ return OBJECT_MAPPER;
+ }
+
+ private JSONUtils() {
+ throw new UnsupportedOperationException("Construct JSONUtils");
+ }
+
+ public static synchronized void setTimeZone(TimeZone timeZone) {
+ OBJECT_MAPPER.setTimeZone(timeZone);
+ }
+
+ public static ArrayNode createArrayNode() {
+ return OBJECT_MAPPER.createArrayNode();
+ }
+
+ public static ObjectNode createObjectNode() {
+ return OBJECT_MAPPER.createObjectNode();
+ }
+
+ public static String getString(JsonNode jsonNode, String fieldName) {
+ return getString(jsonNode, fieldName, "");
+ }
+
+ public static String getString(JsonNode jsonNode, String fieldName, String
defaultValue) {
+ if (jsonNode == null) {
+ return defaultValue;
+ }
+ JsonNode node = jsonNode.get(fieldName);
+ if (node == null || !node.isTextual()) {
+ return defaultValue;
+ }
+ return node.asText();
+ }
+
+ public static Integer getInteger(JsonNode jsonNode, String fieldName) {
+ return getInteger(jsonNode, fieldName, 0);
+ }
+
+ public static Integer getInteger(JsonNode jsonNode, String fieldName,
Integer defaultValue) {
+ if (jsonNode == null) {
+ return defaultValue;
+ }
+ JsonNode node = jsonNode.get(fieldName);
+ if (node == null || !node.isInt()) {
+ return defaultValue;
+ }
+ return node.asInt();
+ }
+
+ public static List<String> getList(JsonNode jsonNode, String fieldName) {
+ return getList(jsonNode, fieldName, String.class);
+ }
+
+ public static <E> List<E> getList(JsonNode jsonNode, String fieldName,
Class<E> clazz) {
+ if (jsonNode == null) {
+ return new ArrayList<>();
+ }
+ JsonNode child = jsonNode.get(fieldName);
+ if (!(child instanceof ArrayNode) && !(child instanceof POJONode)) {
+ return new ArrayList<>();
+ }
+ List<?> childArray;
+ if (child instanceof POJONode) {
+ Object pojo = ((POJONode) child).getPojo();
+ childArray = (List<?>) pojo;
+ } else {
+ childArray = (List<?>) child;
+ }
+
+ List<E> result = new ArrayList<>();
+ childArray.forEach(
+ e -> {
+
result.add(JSONUtils.parseObject(JSONUtils.toJsonString(e), clazz));
+ });
+ return result;
+ }
+
+ /**
+ * object to json string.
+ *
+ * @param object object
+ * @return json string
+ */
+ public static String toJsonString(Object object) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(object);
+ } catch (Exception e) {
+ throw new RuntimeException("Object json deserialization
exception.", e);
+ }
+ }
+
+ public static @Nullable <T> T parseObject(String json, Class<T> clazz) {
+ if (Strings.isNullOrEmpty(json)) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(json, clazz);
+ } catch (Exception e) {
+ log.error("Parse object exception, jsonStr: {}, class: {}", json,
clazz, e);
+ }
+ return null;
+ }
+
+ /** LocalDateTimeSerializer. */
+ public static class LocalDateTimeSerializer extends
JsonSerializer<LocalDateTime> {
+
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
+
+ @Override
+ public void serialize(
+ LocalDateTime value, JsonGenerator gen, SerializerProvider
serializers)
+ throws IOException {
+ gen.writeString(value.format(formatter));
+ }
+ }
+
+ /** LocalDateTimeDeserializer. */
+ public static class LocalDateTimeDeserializer extends
JsonDeserializer<LocalDateTime> {
+
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss");
+
+ @Override
+ public LocalDateTime deserialize(JsonParser p, DeserializationContext
context)
+ throws IOException {
+ return LocalDateTime.parse(p.getValueAsString(), formatter);
+ }
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/MapUtil.java
similarity index 61%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
copy to
paimon-web-common/src/main/java/org/apache/paimon/web/common/util/MapUtil.java
index 03e5621..8861aab 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/MysqlSyncDatabaseActionContext.java
+++
b/paimon-web-common/src/main/java/org/apache/paimon/web/common/util/MapUtil.java
@@ -16,24 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
-
-import lombok.experimental.SuperBuilder;
-
-import javax.annotation.Nullable;
+package org.apache.paimon.web.common.util;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
-/** Mysql sync database action context. */
-@SuperBuilder
-public class MysqlSyncDatabaseActionContext extends
FlinkCdcDatabasesSyncActionContext
- implements ActionContext {
+/** Map util. */
+public class MapUtil {
- @ActionConf(value = "mysql_conf")
- @Nullable
- private final List<String> mysqlConfList;
+ private MapUtil() {}
+
+ public static String getString(Map<String, ?> map, String key) {
+ return Optional.ofNullable(map).map(e ->
String.valueOf(e.get(key))).orElse("");
+ }
- public String name() {
- return "mysql_sync_database";
+ public static <E> List<E> getList(Map<String, ?> map, String key) {
+ return Optional.ofNullable(map).map(e -> (List<E>)
map.get(key)).orElse(new ArrayList<>());
}
}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
index f539cba..75fc885 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/controller/CdcJobDefinitionController.java
@@ -19,6 +19,7 @@
package org.apache.paimon.web.server.controller;
import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
import org.apache.paimon.web.server.data.model.CdcJobDefinition;
import org.apache.paimon.web.server.data.result.PageR;
import org.apache.paimon.web.server.data.result.R;
@@ -85,4 +86,9 @@ public class CdcJobDefinitionController {
cdcJobDefinitionService.removeById(id);
return R.succeed();
}
+
+ @PostMapping("{id}/submit")
+ public R<Void> submit(@PathVariable Integer id, @RequestBody
CdcJobSubmitDTO cdcJobSubmitDTO) {
+ return cdcJobDefinitionService.submit(id, cdcJobSubmitDTO);
+ }
}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
similarity index 63%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
copy to
paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
index 8cf5dd8..6800b73 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/dto/CdcJobSubmitDTO.java
@@ -16,23 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.server.data.dto;
-import lombok.experimental.SuperBuilder;
+import lombok.Data;
-import javax.annotation.Nullable;
+/** DTO of CdcJobSubmit. */
+@Data
+public class CdcJobSubmitDTO {
-import java.util.List;
-
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends
FlinkCdcTableSyncActionContext {
-
- @ActionConf(value = "postgres_conf")
- @Nullable
- private final List<String> postgresConfList;
-
- public String name() {
- return "postgres_sync_table";
- }
+ private String flinkSessionUrl;
}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcGraph.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcGraph.java
new file mode 100644
index 0000000..1fd8b13
--- /dev/null
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcGraph.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.server.data.model.cdc;
+
+import org.apache.paimon.web.common.util.JSONUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/** CdcGraph. */
+@Getter
+public class CdcGraph {
+
+ private CdcNode source;
+
+ private CdcNode target;
+
+ public static CdcGraph fromCdcGraphJsonString(String json) {
+ CdcGraph cdcGraph = new CdcGraph();
+ try {
+ List<JsonNode> edges = new ArrayList<>();
+ List<JsonNode> nodes = new ArrayList<>();
+ ObjectMapper objectMapper = JSONUtils.getObjectMapper();
+ JsonNode cdcGraphJson = objectMapper.readTree(json);
+ ArrayNode cellsJson = (ArrayNode) cdcGraphJson.get("cells");
+ cellsJson.forEach(
+ e -> {
+ String shape = e.get("shape").asText();
+ if ("dag-edge".equals(shape)) {
+ edges.add(e);
+ } else {
+ nodes.add(e);
+ }
+ });
+ if (edges.size() != 1 && nodes.size() != 2) {
+ throw new RuntimeException("The number of cdc graph nodes is
abnormal.");
+ }
+ JsonNode jsonNode = edges.get(0);
+ JsonNode sourceTypeJson = jsonNode.get("source");
+ String sourceType = sourceTypeJson.get("cell").asText();
+ JsonNode targetTypeJson = jsonNode.get("target");
+ String targetType = targetTypeJson.get("cell").asText();
+ CdcNode source = new CdcNode();
+ CdcNode target = new CdcNode();
+ for (JsonNode node : nodes) {
+ ObjectNode data = (ObjectNode) node.get("data");
+ String type = node.get("id").asText();
+ if (Objects.equals(sourceType, type)) {
+ source.setType(type);
+ source.setData(data);
+ }
+ if (Objects.equals(targetType, type)) {
+ target.setType(type);
+ target.setData(data);
+ }
+ }
+ cdcGraph.source = source;
+ cdcGraph.target = target;
+ } catch (Exception e) {
+ throw new RuntimeException("CdcGraph is not supported:" +
e.getMessage(), e);
+ }
+ return cdcGraph;
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcNode.java
similarity index 63%
rename from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
rename to
paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcNode.java
index 8cf5dd8..15354c2 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/PostgresSyncTableCdcActionContext.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/data/model/cdc/CdcNode.java
@@ -16,23 +16,16 @@
* limitations under the License.
*/
-package org.apache.paimon.web.api.action.context;
+package org.apache.paimon.web.server.data.model.cdc;
-import lombok.experimental.SuperBuilder;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Data;
-import javax.annotation.Nullable;
+/** CdcNode. */
+@Data
+public class CdcNode {
-import java.util.List;
+ private String type;
-/** Postgres sync table action context. */
-@SuperBuilder
-public class PostgresSyncTableCdcActionContext extends
FlinkCdcTableSyncActionContext {
-
- @ActionConf(value = "postgres_conf")
- @Nullable
- private final List<String> postgresConfList;
-
- public String name() {
- return "postgres_sync_table";
- }
+ private ObjectNode data;
}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
index 9da2679..0ec3814 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/CdcJobDefinitionService.java
@@ -19,6 +19,7 @@
package org.apache.paimon.web.server.service;
import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
import org.apache.paimon.web.server.data.model.CdcJobDefinition;
import org.apache.paimon.web.server.data.result.PageR;
import org.apache.paimon.web.server.data.result.R;
@@ -33,4 +34,6 @@ public interface CdcJobDefinitionService extends
IService<CdcJobDefinition> {
PageR<CdcJobDefinition> listAll(boolean withConfig, long currentPage, long
pageSize);
R<Void> update(CdcJobDefinitionDTO cdcJobDefinitionDTO);
+
+ R<Void> submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO);
}
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index be3979f..f7dce8f 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -18,25 +18,49 @@
package org.apache.paimon.web.server.service.impl;
+import org.apache.paimon.web.api.action.context.ActionContext;
+import
org.apache.paimon.web.api.action.context.factory.ActionContextFactoryServiceLoadUtil;
+import
org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+import org.apache.paimon.web.api.action.service.ActionService;
+import org.apache.paimon.web.api.action.service.FlinkCdcActionService;
+import org.apache.paimon.web.api.catalog.PaimonServiceFactory;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.common.util.JSONUtils;
import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
+import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
+import org.apache.paimon.web.server.data.model.CatalogInfo;
import org.apache.paimon.web.server.data.model.CdcJobDefinition;
+import org.apache.paimon.web.server.data.model.cdc.CdcGraph;
+import org.apache.paimon.web.server.data.model.cdc.CdcNode;
import org.apache.paimon.web.server.data.result.PageR;
import org.apache.paimon.web.server.data.result.R;
import org.apache.paimon.web.server.data.result.enums.Status;
import org.apache.paimon.web.server.mapper.CdcJobDefinitionMapper;
+import org.apache.paimon.web.server.service.CatalogService;
import org.apache.paimon.web.server.service.CdcJobDefinitionService;
+import org.apache.paimon.web.server.util.StringUtils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
/** CdcJobDefinitionServiceImpl. */
@Service
public class CdcJobDefinitionServiceImpl
extends ServiceImpl<CdcJobDefinitionMapper, CdcJobDefinition>
implements CdcJobDefinitionService {
+ @Autowired private CatalogService catalogService;
+
@Override
public R<Void> create(CdcJobDefinitionDTO cdcJobDefinitionDTO) {
String name = cdcJobDefinitionDTO.getName();
@@ -102,4 +126,93 @@ public class CdcJobDefinitionServiceImpl
baseMapper.updateById(cdcJobDefinition);
return R.succeed();
}
+
+ @Override
+ public R<Void> submit(Integer id, CdcJobSubmitDTO cdcJobSubmitDTO) {
+ CdcJobDefinition cdcJobDefinition = baseMapper.selectById(id);
+ String config = cdcJobDefinition.getConfig();
+ FlinkCdcSyncType flinkCdcSyncType =
FlinkCdcSyncType.valueOf(cdcJobDefinition.getCdcType());
+ ActionService actionService = new FlinkCdcActionService();
+ CdcGraph cdcGraph = CdcGraph.fromCdcGraphJsonString(config);
+ FlinkCdcActionContextFactory factory =
+
ActionContextFactoryServiceLoadUtil.getFlinkCdcActionContextFactory(
+ cdcGraph.getSource().getType(),
+ cdcGraph.getTarget().getType(),
+ flinkCdcSyncType);
+ ObjectNode actionConfigs = JSONUtils.createObjectNode();
+ actionConfigs.put(FlinkCdcOptions.SESSION_URL,
cdcJobSubmitDTO.getFlinkSessionUrl());
+ handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource());
+ handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget());
+ ActionContext actionContext = factory.getActionContext(actionConfigs);
+ try {
+ actionService.execute(actionContext);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return R.succeed();
+ }
+
+ private void handleCdcGraphNodeData(ObjectNode actionConfigs, CdcNode
node) {
+ String type = node.getType();
+ switch (type) {
+ case "Paimon":
+ handlePaimonNodeData(actionConfigs, node.getData());
+ break;
+ case "MySQL":
+ handleMysqlNodeData(actionConfigs, node.getData());
+ break;
+ }
+ }
+
+ private void handleMysqlNodeData(ObjectNode actionConfigs, ObjectNode
mysqlData) {
+ String otherConfigs = JSONUtils.getString(mysqlData, "other_configs");
+ List<String> mysqlConfList;
+ if (StringUtils.isBlank(otherConfigs)) {
+ mysqlConfList = new ArrayList<>();
+ } else {
+ mysqlConfList = new
ArrayList<>(Arrays.asList(otherConfigs.split(";")));
+ }
+ mysqlConfList.add(buildKeyValueString("hostname",
JSONUtils.getString(mysqlData, "host")));
+ mysqlConfList.add(
+ buildKeyValueString("username", JSONUtils.getString(mysqlData,
"username")));
+ mysqlConfList.add(buildKeyValueString("port",
JSONUtils.getString(mysqlData, "port")));
+ mysqlConfList.add(
+ buildKeyValueString("database-name",
JSONUtils.getString(mysqlData, "database")));
+ mysqlConfList.add(
+ buildKeyValueString("table-name",
JSONUtils.getString(mysqlData, "table_name")));
+ mysqlConfList.add(
+ buildKeyValueString("password", JSONUtils.getString(mysqlData,
"password")));
+ actionConfigs.putPOJO(FlinkCdcOptions.MYSQL_CONF, mysqlConfList);
+ }
+
+ private void handlePaimonNodeData(ObjectNode actionConfigs, ObjectNode
paimonData) {
+ Integer catalog = JSONUtils.getInteger(paimonData, "catalog");
+ CatalogInfo catalogInfo = catalogService.getById(catalog);
+ actionConfigs.put(FlinkCdcOptions.WAREHOUSE,
catalogInfo.getWarehouse());
+ actionConfigs.put(FlinkCdcOptions.TABLE,
JSONUtils.getString(paimonData, "table_name"));
+ actionConfigs.put(FlinkCdcOptions.DATABASE,
JSONUtils.getString(paimonData, "database"));
+ actionConfigs.put(
+ FlinkCdcOptions.PRIMARY_KEYS, JSONUtils.getString(paimonData,
"primary_key"));
+ String otherConfigs = JSONUtils.getString(paimonData,
"other_configs2");
+ if (StringUtils.isBlank(otherConfigs)) {
+ actionConfigs.putPOJO(FlinkCdcOptions.TABLE_CONF, new
ArrayList<>());
+ } else {
+ actionConfigs.putPOJO(
+ FlinkCdcOptions.TABLE_CONF,
Arrays.asList(otherConfigs.split(";")));
+ }
+ List<String> catalogConfList = new ArrayList<>();
+ Map<String, String> options = catalogInfo.getOptions();
+ PaimonServiceFactory.convertToPaimonOptions(options)
+ .toMap()
+ .forEach(
+ (k, v) -> {
+ catalogConfList.add(buildKeyValueString(k, v));
+ });
+
+ actionConfigs.putPOJO(FlinkCdcOptions.CATALOG_CONF, catalogConfList);
+ }
+
+ private String buildKeyValueString(String key, String value) {
+ return key + "=" + value;
+ }
}