This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-webui.git
The following commit(s) were added to refs/heads/main by this push:
new 7628d43e [Feature] CDC integration supports full mysql database
synchronization (#434)
7628d43e is described below
commit 7628d43ec180f8854c45b47d774d8bc7fbc28432
Author: yangyang zhong <[email protected]>
AuthorDate: Thu Jun 20 19:34:16 2024 +0800
[Feature] CDC integration supports full mysql database synchronization
(#434)
---
paimon-web-api/pom.xml | 4 --
...=> MysqlSyncDatabasesActionContextFactory.java} | 18 ++++----
.../MysqlSyncTableActionContextFactory.java | 2 -
...on.context.factory.FlinkCdcActionContextFactory | 22 ++++++++++
.../service/impl/CdcJobDefinitionServiceImpl.java | 30 ++++++++-----
.../controller/CdcJobDefinitionControllerTest.java | 51 +++++++++++++++++++++-
pom.xml | 8 ----
7 files changed, 98 insertions(+), 37 deletions(-)
diff --git a/paimon-web-api/pom.xml b/paimon-web-api/pom.xml
index 451792bc..22f4f074 100644
--- a/paimon-web-api/pom.xml
+++ b/paimon-web-api/pom.xml
@@ -120,9 +120,5 @@ under the License.
<artifactId>paimon-oss</artifactId>
<version>${paimon.version}</version>
</dependency>
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java
similarity index 79%
copy from
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
copy to
paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java
index f67f0e14..312b59c1 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncDatabasesActionContextFactory.java
@@ -20,7 +20,7 @@ package org.apache.paimon.web.api.action.context.factory;
import org.apache.paimon.web.api.action.context.ActionContext;
import org.apache.paimon.web.api.action.context.ActionContextUtil;
-import org.apache.paimon.web.api.action.context.MysqlSyncTableActionContext;
+import org.apache.paimon.web.api.action.context.MysqlSyncDatabaseActionContext;
import org.apache.paimon.web.api.action.context.options.FlinkCdcOptions;
import org.apache.paimon.web.api.enums.FlinkCdcDataSourceType;
import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
@@ -28,12 +28,12 @@ import org.apache.paimon.web.api.enums.FlinkJobType;
import org.apache.paimon.web.common.util.JSONUtils;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.auto.service.AutoService;
-
-/** MysqlSyncTableActionContextFactory. */
-@AutoService(FlinkCdcActionContextFactory.class)
-public class MysqlSyncTableActionContextFactory implements
FlinkCdcActionContextFactory {
+/**
+ * A factory designed for creating {@link FlinkCdcActionContextFactory},
implementing full database
+ * synchronization with MySQL.
+ */
+public class MysqlSyncDatabasesActionContextFactory implements
FlinkCdcActionContextFactory {
@Override
public String sourceType() {
return FlinkCdcDataSourceType.MYSQL.getType();
@@ -46,18 +46,16 @@ public class MysqlSyncTableActionContextFactory implements
FlinkCdcActionContext
@Override
public FlinkCdcSyncType cdcType() {
- return FlinkCdcSyncType.SINGLE_TABLE_SYNC;
+ return FlinkCdcSyncType.ALL_DATABASES_SYNC;
}
@Override
public ActionContext getActionContext(ObjectNode actionConfigs) {
- return MysqlSyncTableActionContext.builder()
+ return MysqlSyncDatabaseActionContext.builder()
.sessionUrl(String.valueOf(actionConfigs.get(FlinkCdcOptions.SESSION_URL)))
.flinkJobType(FlinkJobType.SESSION)
.warehouse(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.WAREHOUSE))
.database(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.DATABASE))
- .table(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.TABLE))
- .primaryKeys(JSONUtils.getString(actionConfigs,
FlinkCdcOptions.PRIMARY_KEYS))
.actionPath(ActionContextUtil.getActionJarPath())
.catalogConfList(JSONUtils.getList(actionConfigs,
FlinkCdcOptions.CATALOG_CONF))
.mysqlConfList(JSONUtils.getList(actionConfigs,
FlinkCdcOptions.MYSQL_CONF))
diff --git
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
index f67f0e14..a71d772b 100644
---
a/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
+++
b/paimon-web-api/src/main/java/org/apache/paimon/web/api/action/context/factory/MysqlSyncTableActionContextFactory.java
@@ -28,10 +28,8 @@ import org.apache.paimon.web.api.enums.FlinkJobType;
import org.apache.paimon.web.common.util.JSONUtils;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.auto.service.AutoService;
/** MysqlSyncTableActionContextFactory. */
-@AutoService(FlinkCdcActionContextFactory.class)
public class MysqlSyncTableActionContextFactory implements
FlinkCdcActionContextFactory {
@Override
diff --git
a/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
new file mode 100644
index 00000000..089d3c2a
--- /dev/null
+++
b/paimon-web-api/src/main/resources/META-INF/services/org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
@@ -0,0 +1,22 @@
+#
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain
org.apache.paimon.web.api.action.context.factory.FlinkCdcActionContextFactory
copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+#
+
+org.apache.paimon.web.api.action.context.factory.MysqlSyncTableActionContextFactory
+org.apache.paimon.web.api.action.context.factory.MysqlSyncDatabasesActionContextFactory
\ No newline at end of file
diff --git
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
index 7a99ee3c..25bb5ad5 100644
---
a/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
+++
b/paimon-web-server/src/main/java/org/apache/paimon/web/server/service/impl/CdcJobDefinitionServiceImpl.java
@@ -160,9 +160,9 @@ public class CdcJobDefinitionServiceImpl
ClusterInfo clusterInfo = clusterService.getById(clusterId);
actionConfigs.put(
FlinkCdcOptions.SESSION_URL,
- String.format("http://%s:%s", clusterInfo.getHost(),
clusterInfo.getPort()));
- handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource());
- handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget());
+ String.format("%s:%s", clusterInfo.getHost(),
clusterInfo.getPort()));
+ handleCdcGraphNodeData(actionConfigs, cdcGraph.getSource(),
flinkCdcSyncType);
+ handleCdcGraphNodeData(actionConfigs, cdcGraph.getTarget(),
flinkCdcSyncType);
ActionContext actionContext = factory.getActionContext(actionConfigs);
try {
actionService.execute(actionContext);
@@ -172,19 +172,21 @@ public class CdcJobDefinitionServiceImpl
return R.succeed();
}
- private void handleCdcGraphNodeData(ObjectNode actionConfigs, CdcNode
node) {
+ private void handleCdcGraphNodeData(
+ ObjectNode actionConfigs, CdcNode node, FlinkCdcSyncType
cdcSyncType) {
String type = node.getType();
switch (type) {
case "Paimon":
- handlePaimonNodeData(actionConfigs, node.getData());
+ handlePaimonNodeData(actionConfigs, node.getData(),
cdcSyncType);
break;
case "MySQL":
- handleMysqlNodeData(actionConfigs, node.getData());
+ handleMysqlNodeData(actionConfigs, node.getData(),
cdcSyncType);
break;
}
}
- private void handleMysqlNodeData(ObjectNode actionConfigs, ObjectNode
mysqlData) {
+ private void handleMysqlNodeData(
+ ObjectNode actionConfigs, ObjectNode mysqlData, FlinkCdcSyncType
cdcSyncType) {
String otherConfigs = JSONUtils.getString(mysqlData, "other_configs");
List<String> mysqlConfList;
if (StringUtils.isBlank(otherConfigs)) {
@@ -198,18 +200,24 @@ public class CdcJobDefinitionServiceImpl
mysqlConfList.add(buildKeyValueString("port",
JSONUtils.getString(mysqlData, "port")));
mysqlConfList.add(
buildKeyValueString("database-name",
JSONUtils.getString(mysqlData, "database")));
- mysqlConfList.add(
- buildKeyValueString("table-name",
JSONUtils.getString(mysqlData, "table_name")));
+ if (cdcSyncType == FlinkCdcSyncType.SINGLE_TABLE_SYNC) {
+ mysqlConfList.add(
+ buildKeyValueString(
+ "table-name", JSONUtils.getString(mysqlData,
"table_name")));
+ }
mysqlConfList.add(
buildKeyValueString("password", JSONUtils.getString(mysqlData,
"password")));
actionConfigs.putPOJO(FlinkCdcOptions.MYSQL_CONF, mysqlConfList);
}
- private void handlePaimonNodeData(ObjectNode actionConfigs, ObjectNode
paimonData) {
+ private void handlePaimonNodeData(
+ ObjectNode actionConfigs, ObjectNode paimonData, FlinkCdcSyncType
cdcSyncType) {
Integer catalog = JSONUtils.getInteger(paimonData, "catalog");
CatalogInfo catalogInfo = catalogService.getById(catalog);
actionConfigs.put(FlinkCdcOptions.WAREHOUSE,
catalogInfo.getWarehouse());
- actionConfigs.put(FlinkCdcOptions.TABLE,
JSONUtils.getString(paimonData, "table_name"));
+ if (cdcSyncType == FlinkCdcSyncType.SINGLE_TABLE_SYNC) {
+ actionConfigs.put(FlinkCdcOptions.TABLE,
JSONUtils.getString(paimonData, "table_name"));
+ }
actionConfigs.put(FlinkCdcOptions.DATABASE,
JSONUtils.getString(paimonData, "database"));
actionConfigs.put(
FlinkCdcOptions.PRIMARY_KEYS, JSONUtils.getString(paimonData,
"primary_key"));
diff --git
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
index 14d3a8c2..42848197 100644
---
a/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
+++
b/paimon-web-server/src/test/java/org/apache/paimon/web/server/controller/CdcJobDefinitionControllerTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.web.server.controller;
+import org.apache.paimon.web.api.enums.FlinkCdcSyncType;
import org.apache.paimon.web.server.data.dto.CdcJobDefinitionDTO;
import org.apache.paimon.web.server.data.dto.CdcJobSubmitDTO;
import org.apache.paimon.web.server.data.dto.LoginDTO;
@@ -54,9 +55,18 @@ public class CdcJobDefinitionControllerTest extends
ControllerTestBase {
private CdcJobDefinitionDTO cdcJobDefinitionDto() {
CdcJobDefinitionDTO cdcJobDefinitionDTO = new CdcJobDefinitionDTO();
cdcJobDefinitionDTO.setName("1");
- cdcJobDefinitionDTO.setCdcType(0);
- cdcJobDefinitionDTO.setConfig("d");
+
cdcJobDefinitionDTO.setCdcType(FlinkCdcSyncType.SINGLE_TABLE_SYNC.getValue());
+ cdcJobDefinitionDTO.setDescription("d");
+ cdcJobDefinitionDTO.setCreateUser("admin");
+ return cdcJobDefinitionDTO;
+ }
+
+ private CdcJobDefinitionDTO cdcDatabaseSyncJobDefinitionDto() {
+ CdcJobDefinitionDTO cdcJobDefinitionDTO = new CdcJobDefinitionDTO();
+ cdcJobDefinitionDTO.setName("2");
+
cdcJobDefinitionDTO.setCdcType(FlinkCdcSyncType.ALL_DATABASES_SYNC.getValue());
cdcJobDefinitionDTO.setDescription("d");
+ cdcJobDefinitionDTO.setCreateUser("admin");
return cdcJobDefinitionDTO;
}
@@ -209,6 +219,43 @@ public class CdcJobDefinitionControllerTest extends
ControllerTestBase {
.andDo(MockMvcResultHandlers.print());
CdcJobDefinitionDTO cdcJobDefinitionDTO = cdcJobDefinitionDto();
CdcJobSubmitDTO cdcJobSubmitDTO = new CdcJobSubmitDTO();
+ cdcJobSubmitDTO.setClusterId("1");
+ mockMvc.perform(
+ MockMvcRequestBuilders.get(
+ cdcJobDefinitionPath + "/" +
cdcJobDefinitionDTO.getId())
+ .cookie(cookie)
+ .accept(MediaType.APPLICATION_JSON_VALUE)
+
.content(ObjectMapperUtils.toJSON(cdcJobSubmitDTO))
+ .contentType(MediaType.APPLICATION_JSON_VALUE))
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andDo(MockMvcResultHandlers.print())
+ .andReturn()
+ .getResponse();
+ }
+
+ @Order(4)
+ @Test
+ public void submitDatabaseSyncCdcJob() throws Exception {
+ System.setProperty("FLINK_HOME", "/opt/flink");
+ System.setProperty("ACTION_JAR_PATH", "/opt/flink/jar");
+ ClusterInfo cluster = new ClusterInfo();
+ cluster.setId(2);
+ cluster.setClusterName("clusterName");
+ cluster.setHost("127.0.0.1");
+ cluster.setPort(8083);
+ cluster.setType("Flink");
+ cluster.setEnabled(true);
+ mockMvc.perform(
+ MockMvcRequestBuilders.post("/api/cluster")
+ .cookie(cookie)
+ .content(ObjectMapperUtils.toJSON(cluster))
+ .contentType(MediaType.APPLICATION_JSON_VALUE)
+ .accept(MediaType.APPLICATION_JSON_VALUE))
+ .andExpect(MockMvcResultMatchers.status().isOk())
+ .andDo(MockMvcResultHandlers.print());
+ CdcJobDefinitionDTO cdcJobDefinitionDTO =
cdcDatabaseSyncJobDefinitionDto();
+ CdcJobSubmitDTO cdcJobSubmitDTO = new CdcJobSubmitDTO();
+ cdcJobSubmitDTO.setClusterId("2");
mockMvc.perform(
MockMvcRequestBuilders.get(
cdcJobDefinitionPath + "/" +
cdcJobDefinitionDTO.getId())
diff --git a/pom.xml b/pom.xml
index b35213aa..a4f1a3a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,6 @@ under the License.
<common-lang3.version>3.12.0</common-lang3.version>
<common-io.version>2.7</common-io.version>
<gson.version>2.10.1</gson.version>
- <auto-service.version>1.0.1</auto-service.version>
</properties>
<dependencyManagement>
@@ -221,13 +220,6 @@ under the License.
<version>${springdoc-openapi-ui.version}</version>
</dependency>
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <version>${auto-service.version}</version>
- <scope>provided</scope>
- </dependency>
-
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>