This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new 6be0bbc1 [Feature] Support Postgres CDC (#437)
6be0bbc1 is described below

commit 6be0bbc143930589921584ea859c86416f3058b3
Author: XianmingZhou00 <[email protected]>
AuthorDate: Fri Jun 21 11:10:26 2024 +0800

    [Feature] Support Postgres CDC (#437)
---
 .../context/FlinkCdcTableSyncActionContext.java    |  4 ++
 .../PostgresSyncTableActionContextFactory.java     | 65 ++++++++++++++++++++++
 .../action/context/options/FlinkCdcOptions.java    |  5 ++
 .../web/api/enums/FlinkCdcDataSourceType.java      | 13 ++++-
 ...on.context.factory.FlinkCdcActionContextFactory |  3 +-
 .../service/impl/CdcJobDefinitionServiceImpl.java  | 56 +++++++++++++++----
 6 files changed, 134 insertions(+), 12 deletions(-)

diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
index d2ba6552..c4ca2745 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/FlinkCdcTableSyncActionContext.java
@@ -46,6 +46,10 @@ public abstract class FlinkCdcTableSyncActionContext extends 
FlinkActionContext
     @Nullable
     protected String primaryKeys;
 
+    @ActionConf("computed_column")
+    @Nullable
+    protected String computedColumn;
+
     @ActionConf(value = "catalog_conf")
     @Nullable
     protected List<String> catalogConfList;
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java
new file mode 100644
index 00000000..3a176b27
--- /dev/null
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/PostgresSyncTableActionContextFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.web.api.action.context.factory;
+
+import org.apache.paimon.web.api.action.context.ActionContext;
+import org.apache.paimon.web.api.action.context.ActionContextUtil;
+import org.apache.paimon.web.api.action.context.PostgresSyncTableActionContext;
+import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
+import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
+import org.apache.paimon.web.api.enums.FlinkJobType;
+import org.apache.paimon.web.common.util.JSONUtils;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/** PostgresSyncTableActionContextFactory. */
+public class PostgresSyncTableActionContextFactory implements 
FlinkCdcActionContextFactory {
+    @Override
+    public String sourceType() {
+        return FlinkCdcDataSourceType.POSTGRESQL.getType();
+    }
+
+    @Override
+    public String targetType() {
+        return FlinkCdcDataSourceType.PAIMON.getType();
+    }
+
+    @Override
+    public FlinkCdcSyncType cdcType() {
+        return FlinkCdcSyncType.SINGLE_TABLE_SYNC;
+    }
+
+    @Override
+    public ActionContext getActionContext(ObjectNode actionConfigs) {
+        return PostgresSyncTableActionContext.builder()
+                
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
+                .flinkJobType(FlinkJobType.SESSION)
+                .warehouse(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.WAREHOUSE))
+                .database(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.DATABASE))
+                .table(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.TABLE))
+                .partitionKeys(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.PARTITION_KEYS))
+                .primaryKeys(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.PRIMARY_KEYS))
+                .computedColumn(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.COMPUTED_COLUMN))
+                .actionPath(ActionContextUtil.getActionJarPath())
+                .catalogConfList(JSONUtils.getList(actionConfigs, 
FlinkCdcOptions.CATALOG_CONF))
+                .postgresConfList(JSONUtils.getList(actionConfigs, 
FlinkCdcOptions.POSTGRES_CONF))
+                .build();
+    }
+}
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
index 10fbdb81..ce364dca 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/options/FlinkCdcOptions.java
@@ -24,6 +24,7 @@ public class FlinkCdcOptions {
     private FlinkCdcOptions() {}
 
     public static final String MYSQL_CONF = "mysql_conf";
+    public static final String POSTGRES_CONF = "postgres_conf";
 
     public static final String TABLE_CONF = "table_conf";
 
@@ -33,8 +34,12 @@ public class FlinkCdcOptions {
 
     public static final String TABLE = "table";
 
+    public static final String PARTITION_KEYS = "partition_keys";
+
     public static final String PRIMARY_KEYS = "primary_keys";
 
+    public static final String COMPUTED_COLUMN = "computed_column";
+
     public static final String SESSION_URL = "sessionUrl";
 
     public static final String CATALOG_CONF = "catalog_conf";
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
index 5d8a863e..9c5920eb 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/enums/FlinkCdcDataSourceType.java
@@ -20,15 +20,26 @@ package org.apache.paimon.web.api.enums;
 
 import lombok.Getter;
 
+import java.util.Arrays;
+import java.util.Objects;
+
 /** FlinkCdcDataSourceType. */
 @Getter
 public enum FlinkCdcDataSourceType {
     MYSQL("MySQL"),
-    PAIMON("Paimon");
+    PAIMON("Paimon"),
+    POSTGRESQL("PostgreSQL");
 
     private final String type;
 
     FlinkCdcDataSourceType(String type) {
         this.type = type;
     }
+
+    public static FlinkCdcDataSourceType of(String type) {
+        return Arrays.stream(values())
+                .filter(x -> Objects.equals(x.getType(), type))
+                .findFirst()
+                .orElse(null);
+    }
 }
diff --git 
a/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
 
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
index 089d3c2a..b6839721 100644
--- 
a/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
+++ 
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
@@ -19,4 +19,5 @@
 #
 
 
org.apache.paimon.web.api.action.context.factory.MysqlSyncTableActionContextFactory
-org.apache.paimon.web.api.action.context.factory.MysqlSyncDatabasesActionContextFactory
\ No newline at end of file
+org.apache.paimon.web.api.action.context.factory.MysqlSyncDatabasesActionContextFactory
+org.apache.paimon.web.api.action.context.factory.PostgresSyncTableActionContextFactory
\ No newline at end of file
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index 25bb5ad5..28844b91 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -25,6 +25,7 @@ import 
org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
 import org.apache.paimon.web.api.action.service.ActionService;
 import org.apache.paimon.web.api.action.service.FlinkCdcActionService;
 import org.apache.paimon.web.api.catalog.PaimonServiceFactory;
+import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
 import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
 import org.apache.paimon.web.common.util.JSONUtils;
 import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
@@ -49,6 +50,7 @@ import 
com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -174,26 +176,60 @@ public class CdcJobDefinitionServiceImpl
 
     private void handleCdcGraphNodeData(
             ObjectNode actionConfigs, CdcNode node, FlinkCdcSyncType 
cdcSyncType) {
-        String type = node.getType();
-        switch (type) {
-            case "Paimon":
+        FlinkCdcDataSourceType cdcDataSourceType = 
FlinkCdcDataSourceType.of(node.getType());
+        Preconditions.checkNotNull(
+                cdcDataSourceType,
+                String.format("CDC datasource type should not be null.", 
node.getType()));
+        switch (cdcDataSourceType) {
+            case PAIMON:
                 handlePaimonNodeData(actionConfigs, node.getData(), 
cdcSyncType);
                 break;
-            case "MySQL":
+            case MYSQL:
                 handleMysqlNodeData(actionConfigs, node.getData(), 
cdcSyncType);
                 break;
+            case POSTGRESQL:
+                handlePostgresNodeData(actionConfigs, node.getData());
+                break;
         }
     }
 
-    private void handleMysqlNodeData(
-            ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType 
cdcSyncType) {
-        String otherConfigs = JSONUtils.getString(mysqlData, "other_configs");
-        List<String> mysqlConfList;
+    private List<String> getOtherConfigs(ObjectNode node) {
+        String otherConfigs = JSONUtils.getString(node, "other_configs");
+        List<String> configList;
         if (StringUtils.isBlank(otherConfigs)) {
-            mysqlConfList = new ArrayList<>();
+            configList = new ArrayList<>();
         } else {
-            mysqlConfList = new 
ArrayList<>(Arrays.asList(otherConfigs.split(";")));
+            configList = new 
ArrayList<>(Arrays.asList(otherConfigs.split(";")));
         }
+        return configList;
+    }
+
+    private void handlePostgresNodeData(ObjectNode actionConfigs, ObjectNode 
postgresData) {
+        List<String> postgresConfList = getOtherConfigs(postgresData);
+        postgresConfList.add(
+                buildKeyValueString("hostname", 
JSONUtils.getString(postgresData, "host")));
+        postgresConfList.add(
+                buildKeyValueString("port", JSONUtils.getString(postgresData, 
"port")));
+        postgresConfList.add(
+                buildKeyValueString("username", 
JSONUtils.getString(postgresData, "username")));
+        postgresConfList.add(
+                buildKeyValueString("password", 
JSONUtils.getString(postgresData, "password")));
+        postgresConfList.add(
+                buildKeyValueString(
+                        "database-name", JSONUtils.getString(postgresData, 
"database")));
+        postgresConfList.add(
+                buildKeyValueString(
+                        "schema-name", JSONUtils.getString(postgresData, 
"schema_name")));
+        postgresConfList.add(
+                buildKeyValueString("table-name", 
JSONUtils.getString(postgresData, "table_name")));
+        postgresConfList.add(
+                buildKeyValueString("slot.name", 
JSONUtils.getString(postgresData, "slot_name")));
+        actionConfigs.putPOJO(FlinkCdcOptions.POSTGRES_CONF, postgresConfList);
+    }
+
+    private void handleMysqlNodeData(
+            ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType 
cdcSyncType) {
+        List<String> mysqlConfList = getOtherConfigs(actionConfigs);
         mysqlConfList.add(buildKeyValueString("hostname", 
JSONUtils.getString(mysqlData, "host")));
         mysqlConfList.add(
                 buildKeyValueString("username", JSONUtils.getString(mysqlData, 
"username")));

Reply via email to