This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new 6be0bbc1 [Feature] Support Postgres CDC (#437)
6be0bbc1 is described below
commit 6be0bbc143930589921584ea859c86416f3058b3
Author: XianmingZhou00 <[email protected]>
AuthorDate: Fri Jun 21 11:10:26 2024 +0800
[Feature] Support Postgres CDC (#437)
---
.../context/FlinkCdcTableSyncActionContext.java | 4 ++
.../PostgresSyncTableActionContextFactory.java | 65 ++++++++++++++++++++++
.../action/context/options/FlinkCdcOptions.java | 5 ++
.../web/api/enums/FlinkCdcDataSourceType.java | 13 ++++-
...on.context.factory.FlinkCdcActionContextFactory | 3 +-
.../service/impl/CdcJobDefinitionServiceImpl.java | 56 +++++++++++++++----
6 files changed, 134 insertions(+), 12 deletions(-)
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
index d2ba6552..c4ca2745 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
@@ -46,6 +46,10 @@ public abstract class FlinkCdcTableSyncActionContext extends
FlinkActionContext
@Nullable
protected String primaryKeys;
+ @ActionConf("computed_column")
+ @Nullable
+ protected String computedColumn;
+
@ActionConf(value = "catalog_conf")
@Nullable
protected List<String> catalogConfList;
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java
new file mode 100644
index 00000000..3a176b27
--- /dev/null
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.action.context.factory;
+
+import org.apache.paimon.web.api.action.context.ActionContext;
+import org.apache.paimon.web.api.action.context.ActionContextUtil;
+import org.apache.paimon.web.api.action.context.PostgresSyncTableActionContext;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.api.enums.FlinkJobType;
+import org.apache.paimon.web.common.util.JSONUtils;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/** PostgresSyncTableActionContextFactory. */
+public class PostgresSyncTableActionContextFactory implements
FlinkCdcActionContextFactory {
+ @Override
+ public String sourceType() {
+ return FlinkCdcDataSourceType.POSTGRESQL.getType();
+ }
+
+ @Override
+ public String targetType() {
+ return FlinkCdcDataSourceType.PAIMON.getType();
+ }
+
+ @Override
+ public FlinkCdcSyncType cdcType() {
+ return FlinkCdcSyncType.SINGLE_TABLE_SYNC;
+ }
+
+ @Override
+ public ActionContext getActionContext(ObjectNode actionConfigs) {
+ return PostgresSyncTableActionContext.builder()
+
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
+ .flinkJobType(FlinkJobType.SESSION)
+ .warehouse(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.WAREHOUSE))
+ .database(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.DATABASE))
+ .table(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.TABLE))
+ .partitionKeys(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.PARTITION_KEYS))
+ .primaryKeys(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.PRIMARY_KEYS))
+ .computedColumn(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.COMPUTED_COLUMN))
+ .actionPath(ActionContextUtil.getActionJarPath())
+ .catalogConfList(JSONUtils.getList(actionConfigs,
FlinkCdcOptions.CATALOG_CONF))
+ .postgresConfList(JSONUtils.getList(actionConfigs,
FlinkCdcOptions.POSTGRES_CONF))
+ .build();
+ }
+}
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
index 10fbdb81..ce364dca 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
@@ -24,6 +24,7 @@ public class FlinkCdcOptions {
private FlinkCdcOptions() {}
public static final String MYSQL_CONF = "mysql_conf";
+ public static final String POSTGRES_CONF = "postgres_conf";
public static final String TABLE_CONF = "table_conf";
@@ -33,8 +34,12 @@ public class FlinkCdcOptions {
public static final String TABLE = "table";
+ public static final String PARTITION_KEYS = "partition_keys";
+
public static final String PRIMARY_KEYS = "primary_keys";
+ public static final String COMPUTED_COLUMN = "computed_column";
+
public static final String SESSION_URL = "sessionUrl";
public static final String CATALOG_CONF = "catalog_conf";
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
index 5d8a863e..9c5920eb 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
@@ -20,15 +20,26 @@ package org.apache.paimon.web.api.enums;
import lombok.Getter;
+import java.util.Arrays;
+import java.util.Objects;
+
/** FlinkCdcDataSourceType. */
@Getter
public enum FlinkCdcDataSourceType {
MYSQL("MySQL"),
- PAIMON("Paimon");
+ PAIMON("Paimon"),
+ POSTGRESQL("PostgreSQL");
private final String type;
FlinkCdcDataSourceType(String type) {
this.type = type;
}
+
+ public static FlinkCdcDataSourceType of(String type) {
+ return Arrays.stream(values())
+ .filter(x -> Objects.equals(x.getType(), type))
+ .findFirst()
+ .orElse(null);
+ }
}
diff --git
a/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
index 089d3c2a..b6839721 100644
---
a/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
+++
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
@@ -19,4 +19,5 @@
#
org.apache.paimon.web.api.action.context.factory.MysqlSyncTableActionContextFactory
-org.apache.paimon.web.api.action.context.factory.MysqlSyncDatabasesActionContextFactory
\ No newline at end of file
+org.apache.paimon.web.api.action.context.factory.MysqlSyncDatabasesActionContextFactory
+org.apache.paimon.web.api.action.context.factory.PostgresSyncTableActionContextFactory
\ No newline at end of file
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index 25bb5ad5..28844b91 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -25,6 +25,7 @@ import
org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
import org.apache.paimon.web.api.action.service.ActionService;
import org.apache.paimon.web.api.action.service.FlinkCdcActionService;
import org.apache.paimon.web.api.catalog.PaimonServiceFactory;
+import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
import org.apache.paimon.web.common.util.JSONUtils;
import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
@@ -49,6 +50,7 @@ import
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -174,26 +176,60 @@ public class CdcJobDefinitionServiceImpl
private void handleCdcGraphNodeData(
ObjectNode actionConfigs, CdcNode node, FlinkCdcSyncType
cdcSyncType) {
- String type = node.getType();
- switch (type) {
- case "Paimon":
+ FlinkCdcDataSourceType cdcDataSourceType =
FlinkCdcDataSourceType.of(node.getType());
+ Preconditions.checkNotNull(
+ cdcDataSourceType,
+ String.format("CDC datasource type should not be null.",
node.getType()));
+ switch (cdcDataSourceType) {
+ case PAIMON:
handlePaimonNodeData(actionConfigs, node.getData(),
cdcSyncType);
break;
- case "MySQL":
+ case MYSQL:
handleMysqlNodeData(actionConfigs, node.getData(),
cdcSyncType);
break;
+ case POSTGRESQL:
+ handlePostgresNodeData(actionConfigs, node.getData());
+ break;
}
}
- private void handleMysqlNodeData(
- ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType
cdcSyncType) {
- String otherConfigs = JSONUtils.getString(mysqlData, "other_configs");
- List<String> mysqlConfList;
+ private List<String> getOtherConfigs(ObjectNode node) {
+ String otherConfigs = JSONUtils.getString(node, "other_configs");
+ List<String> configList;
if (StringUtils.isBlank(otherConfigs)) {
- mysqlConfList = new ArrayList<>();
+ configList = new ArrayList<>();
} else {
- mysqlConfList = new
ArrayList<>(Arrays.asList(otherConfigs.split(";")));
+ configList = new
ArrayList<>(Arrays.asList(otherConfigs.split(";")));
}
+ return configList;
+ }
+
+ private void handlePostgresNodeData(ObjectNode actionConfigs, ObjectNode
postgresData) {
+ List<String> postgresConfList = getOtherConfigs(postgresData);
+ postgresConfList.add(
+ buildKeyValueString("hostname",
JSONUtils.getString(postgresData, "host")));
+ postgresConfList.add(
+ buildKeyValueString("port", JSONUtils.getString(postgresData,
"port")));
+ postgresConfList.add(
+ buildKeyValueString("username",
JSONUtils.getString(postgresData, "username")));
+ postgresConfList.add(
+ buildKeyValueString("password",
JSONUtils.getString(postgresData, "password")));
+ postgresConfList.add(
+ buildKeyValueString(
+ "database-name", JSONUtils.getString(postgresData,
"database")));
+ postgresConfList.add(
+ buildKeyValueString(
+ "schema-name", JSONUtils.getString(postgresData,
"schema_name")));
+ postgresConfList.add(
+ buildKeyValueString("table-name",
JSONUtils.getString(postgresData, "table_name")));
+ postgresConfList.add(
+ buildKeyValueString("slot.name",
JSONUtils.getString(postgresData, "slot_name")));
+ actionConfigs.putPOJO(FlinkCdcOptions.POSTGRES_CONF, postgresConfList);
+ }
+
+ private void handleMysqlNodeData(
+ ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType
cdcSyncType) {
+ List<String> mysqlConfList = getOtherConfigs(actionConfigs);
mysqlConfList.add(buildKeyValueString("hostname",
JSONUtils.getString(mysqlData, "host")));
mysqlConfList.add(
buildKeyValueString("username", JSONUtils.getString(mysqlData,
"username")));