This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git


The following commit(s) were added to refs/heads/main by this push:
     new 7628d43e [Feature] CDC integration supports full mysql database 
synchronization (#434)
7628d43e is described below

commit 7628d43ec180f8854c45b47d774d8bc7fbc28432
Author: yangyang zhong <[email protected]>
AuthorDate: Thu Jun 20 19:34:16 2024 +0800

    [Feature] CDC integration supports full mysql database synchronization 
(#434)
---
 paimon-web-api/pom.xml                             |  4 --
 ...=> MysqlSyncDatabasesActionContextFactory.java} | 18 ++++----
 .../MysqlSyncTableActionContextFactory.java        |  2 -
 ...on.context.factory.FlinkCdcActionContextFactory | 22 ++++++++++
 .../service/impl/CdcJobDefinitionServiceImpl.java  | 30 ++++++++-----
 .../controller/CdcJobDefinitionControllerTest.java | 51 +++++++++++++++++++++-
 pom.xml                                            |  8 ----
 7 files changed, 98 insertions(+), 37 deletions(-)

diff --git a/paimon-web-api/pom.xml b/paimon-web-api/pom.xml
index 451792bc..22f4f074 100644
--- a/paimon-web-api/pom.xml
+++ b/paimon-web-api/pom.xml
@@ -120,9 +120,5 @@ under the License.
             <artifactId>paimon-oss</artifactId>
             <version>${paimon.version}</version>
         </dependency>
-        <dependency>
-            <groupId>com.google.auto.service</groupId>
-            <artifactId>auto-service</artifactId>
-        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java
similarity index 79%
copy from 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
copy to 
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java
index f67f0e14..312b59c1 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java
@@ -20,7 +20,7 @@ package org.apache.paimon.web.api.action.context.factory;
 
 import org.apache.paimon.web.api.action.context.ActionContext;
 import org.apache.paimon.web.api.action.context.ActionContextUtil;
-import org.apache.paimon.web.api.action.context.MysqlSyncTableActionContext;
+import org.apache.paimon.web.api.action.context.MysqlSyncDatabaseActionContext;
 import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
 import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
 import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
@@ -28,12 +28,12 @@ import org.apache.paimon.web.api.enums.FlinkJobType;
 import org.apache.paimon.web.common.util.JSONUtils;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.auto.service.AutoService;
-
-/** MysqlSyncTableActionContextFactory. */
-@AutoService(FlinkCdcActionContextFactory.class)
-public class MysqlSyncTableActionContextFactory implements 
FlinkCdcActionContextFactory {
 
+/**
+ * A factory designed for creating {@link FlinkCdcActionContextFactory}, 
implementing full database
+ * synchronization with MySQL.
+ */
+public class MysqlSyncDatabasesActionContextFactory implements 
FlinkCdcActionContextFactory {
     @Override
     public String sourceType() {
         return FlinkCdcDataSourceType.MYSQL.getType();
@@ -46,18 +46,16 @@ public class MysqlSyncTableActionContextFactory implements 
FlinkCdcActionContext
 
     @Override
     public FlinkCdcSyncType cdcType() {
-        return FlinkCdcSyncType.SINGLE_TABLE_SYNC;
+        return FlinkCdcSyncType.ALL_DATABASES_SYNC;
     }
 
     @Override
     public ActionContext getActionContext(ObjectNode actionConfigs) {
-        return MysqlSyncTableActionContext.builder()
+        return MysqlSyncDatabaseActionContext.builder()
                 
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
                 .flinkJobType(FlinkJobType.SESSION)
                 .warehouse(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.WAREHOUSE))
                 .database(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.DATABASE))
-                .table(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.TABLE))
-                .primaryKeys(JSONUtils.getString(actionConfigs, 
FlinkCdcOptions.PRIMARY_KEYS))
                 .actionPath(ActionContextUtil.getActionJarPath())
                 .catalogConfList(JSONUtils.getList(actionConfigs, 
FlinkCdcOptions.CATALOG_CONF))
                 .mysqlConfList(JSONUtils.getList(actionConfigs, 
FlinkCdcOptions.MYSQL_CONF))
diff --git 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
index f67f0e14..a71d772b 100644
--- 
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
+++ 
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
@@ -28,10 +28,8 @@ import org.apache.paimon.web.api.enums.FlinkJobType;
 import org.apache.paimon.web.common.util.JSONUtils;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.auto.service.AutoService;
 
 /** MysqlSyncTableActionContextFactory. */
-@AutoService(FlinkCdcActionContextFactory.class)
 public class MysqlSyncTableActionContextFactory implements 
FlinkCdcActionContextFactory {
 
     @Override
diff --git 
a/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
 
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
new file mode 100644
index 00000000..089d3c2a
--- /dev/null
+++ 
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
@@ -0,0 +1,22 @@
+#
+# /*
+#  * Licensed to the Apache Software Foundation (ASF) under one
+#  * or more contributor license agreements.  See the NOTICE file
+#  * distributed with this work for additional information
+#  * regarding copyright ownership.  The ASF licenses this file
+#  * to you under the Apache License, Version 2.0 (the
+#  * "License"); you may not use this file except in compliance
+#  * with the License.  You may obtain 
org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory 
copy of the License at
+#  *
+#  *     http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  */
+#
+
+org.apache.paimon.web.api.action.context.factory.MysqlSyncTableActionContextFactory
+org.apache.paimon.web.api.action.context.factory.MysqlSyncDatabasesActionContextFactory
\ No newline at end of file
diff --git 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index 7a99ee3c..25bb5ad5 100644
--- 
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++ 
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -160,9 +160,9 @@ public class CdcJobDefinitionServiceImpl
         ClusterInfo clusterInfo = clusterService.getById(clusterId);
         actionConfigs.put(
                 FlinkCdcOptions.SESSION_URL,
-                String.format("http://%s:%s";, clusterInfo.getHost(), 
clusterInfo.getPort()));
-        handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource());
-        handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget());
+                String.format("%s:%s", clusterInfo.getHost(), 
clusterInfo.getPort()));
+        handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource(), 
flinkCdcSyncType);
+        handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget(), 
flinkCdcSyncType);
         ActionContext actionContext = factory.getActionContext(actionConfigs);
         try {
             actionService.execute(actionContext);
@@ -172,19 +172,21 @@ public class CdcJobDefinitionServiceImpl
         return R.succeed();
     }
 
-    private void handleCdcGraphNodeData(ObjectNode actionConfigs, CdcNode 
node) {
+    private void handleCdcGraphNodeData(
+            ObjectNode actionConfigs, CdcNode node, FlinkCdcSyncType 
cdcSyncType) {
         String type = node.getType();
         switch (type) {
             case "Paimon":
-                handlePaimonNodeData(actionConfigs, node.getData());
+                handlePaimonNodeData(actionConfigs, node.getData(), 
cdcSyncType);
                 break;
             case "MySQL":
-                handleMysqlNodeData(actionConfigs, node.getData());
+                handleMysqlNodeData(actionConfigs, node.getData(), 
cdcSyncType);
                 break;
         }
     }
 
-    private void handleMysqlNodeData(ObjectNode actionConfigs, ObjectNode 
mysqlData) {
+    private void handleMysqlNodeData(
+            ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType 
cdcSyncType) {
         String otherConfigs = JSONUtils.getString(mysqlData, "other_configs");
         List<String> mysqlConfList;
         if (StringUtils.isBlank(otherConfigs)) {
@@ -198,18 +200,24 @@ public class CdcJobDefinitionServiceImpl
         mysqlConfList.add(buildKeyValueString("port", 
JSONUtils.getString(mysqlData, "port")));
         mysqlConfList.add(
                 buildKeyValueString("database-name", 
JSONUtils.getString(mysqlData, "database")));
-        mysqlConfList.add(
-                buildKeyValueString("table-name", 
JSONUtils.getString(mysqlData, "table_name")));
+        if (cdcSyncType == FlinkCdcSyncType.SINGLE_TABLE_SYNC) {
+            mysqlConfList.add(
+                    buildKeyValueString(
+                            "table-name", JSONUtils.getString(mysqlData, 
"table_name")));
+        }
         mysqlConfList.add(
                 buildKeyValueString("password", JSONUtils.getString(mysqlData, 
"password")));
         actionConfigs.putPOJO(FlinkCdcOptions.MYSQL_CONF, mysqlConfList);
     }
 
-    private void handlePaimonNodeData(ObjectNode actionConfigs, ObjectNode 
paimonData) {
+    private void handlePaimonNodeData(
+            ObjectNode actionConfigs, ObjectNode paimonData, FlinkCdcSyncType 
cdcSyncType) {
         Integer catalog = JSONUtils.getInteger(paimonData, "catalog");
         CatalogInfo catalogInfo = catalogService.getById(catalog);
         actionConfigs.put(FlinkCdcOptions.WAREHOUSE, 
catalogInfo.getWarehouse());
-        actionConfigs.put(FlinkCdcOptions.TABLE, 
JSONUtils.getString(paimonData, "table_name"));
+        if (cdcSyncType == FlinkCdcSyncType.SINGLE_TABLE_SYNC) {
+            actionConfigs.put(FlinkCdcOptions.TABLE, 
JSONUtils.getString(paimonData, "table_name"));
+        }
         actionConfigs.put(FlinkCdcOptions.DATABASE, 
JSONUtils.getString(paimonData, "database"));
         actionConfigs.put(
                 FlinkCdcOptions.PRIMARY_KEYS, JSONUtils.getString(paimonData, 
"primary_key"));
diff --git 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
index 14d3a8c2..42848197 100644
--- 
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
+++ 
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.web.server.controller;
 
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
 import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
 import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
 import org.apache.paimon.web.server.data.dto.LoginDTO;
@@ -54,9 +55,18 @@ public class CdcJobDefinitionControllerTest extends 
ControllerTestBase {
     private CdcJobDefinitionDTO cdcJobDefinitionDto() {
         CdcJobDefinitionDTO cdcJobDefinitionDTO = new CdcJobDefinitionDTO();
         cdcJobDefinitionDTO.setName("1");
-        cdcJobDefinitionDTO.setCdcType(0);
-        cdcJobDefinitionDTO.setConfig("d");
+        
cdcJobDefinitionDTO.setCdcType(FlinkCdcSyncType.SINGLE_TABLE_SYNC.getValue());
+        cdcJobDefinitionDTO.setDescription("d");
+        cdcJobDefinitionDTO.setCreateUser("admin");
+        return cdcJobDefinitionDTO;
+    }
+
+    private CdcJobDefinitionDTO cdcDatabaseSyncJobDefinitionDto() {
+        CdcJobDefinitionDTO cdcJobDefinitionDTO = new CdcJobDefinitionDTO();
+        cdcJobDefinitionDTO.setName("2");
+        
cdcJobDefinitionDTO.setCdcType(FlinkCdcSyncType.ALL_DATABASES_SYNC.getValue());
         cdcJobDefinitionDTO.setDescription("d");
+        cdcJobDefinitionDTO.setCreateUser("admin");
         return cdcJobDefinitionDTO;
     }
 
@@ -209,6 +219,43 @@ public class CdcJobDefinitionControllerTest extends 
ControllerTestBase {
                 .andDo(MockMvcResultHandlers.print());
         CdcJobDefinitionDTO cdcJobDefinitionDTO = cdcJobDefinitionDto();
         CdcJobSubmitDTO cdcJobSubmitDTO = new CdcJobSubmitDTO();
+        cdcJobSubmitDTO.setClusterId("1");
+        mockMvc.perform(
+                        MockMvcRequestBuilders.get(
+                                        cdcJobDefinitionPath + "/" + 
cdcJobDefinitionDTO.getId())
+                                .cookie(cookie)
+                                .accept(MediaType.APPLICATION_JSON_VALUE)
+                                
.content(ObjectMapperUtils.toJSON(cdcJobSubmitDTO))
+                                .contentType(MediaType.APPLICATION_JSON_VALUE))
+                .andExpect(MockMvcResultMatchers.status().isOk())
+                .andDo(MockMvcResultHandlers.print())
+                .andReturn()
+                .getResponse();
+    }
+
+    @Order(4)
+    @Test
+    public void submitDatabaseSyncCdcJob() throws Exception {
+        System.setProperty("FLINK_HOME", "/opt/flink");
+        System.setProperty("ACTION_JAR_PATH", "/opt/flink/jar");
+        ClusterInfo cluster = new ClusterInfo();
+        cluster.setId(2);
+        cluster.setClusterName("clusterName");
+        cluster.setHost("127.0.0.1");
+        cluster.setPort(8083);
+        cluster.setType("Flink");
+        cluster.setEnabled(true);
+        mockMvc.perform(
+                        MockMvcRequestBuilders.post("/api/cluster")
+                                .cookie(cookie)
+                                .content(ObjectMapperUtils.toJSON(cluster))
+                                .contentType(MediaType.APPLICATION_JSON_VALUE)
+                                .accept(MediaType.APPLICATION_JSON_VALUE))
+                .andExpect(MockMvcResultMatchers.status().isOk())
+                .andDo(MockMvcResultHandlers.print());
+        CdcJobDefinitionDTO cdcJobDefinitionDTO = 
cdcDatabaseSyncJobDefinitionDto();
+        CdcJobSubmitDTO cdcJobSubmitDTO = new CdcJobSubmitDTO();
+        cdcJobSubmitDTO.setClusterId("2");
         mockMvc.perform(
                         MockMvcRequestBuilders.get(
                                         cdcJobDefinitionPath + "/" + 
cdcJobDefinitionDTO.getId())
diff --git a/pom.xml b/pom.xml
index b35213aa..a4f1a3a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,6 @@ under the License.
         <common-lang3.version>3.12.0</common-lang3.version>
         <common-io.version>2.7</common-io.version>
         <gson.version>2.10.1</gson.version>
-        <auto-service.version>1.0.1</auto-service.version>
     </properties>
 
     <dependencyManagement>
@@ -221,13 +220,6 @@ under the License.
                 <version>${springdoc-openapi-ui.version}</version>
             </dependency>
 
-            <dependency>
-                <groupId>com.google.auto.service</groupId>
-                <artifactId>auto-service</artifactId>
-                <version>${auto-service.version}</version>
-                <scope>provided</scope>
-            </dependency>
-
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>

Reply via email to