This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 93459f032f [flink-cdc] kafka_sync_database supports table name mapping 
when prefix and postfix could not fit the need. (#4660)
93459f032f is described below

commit 93459f032f088580d72258d1b5a317e6f8d861e7
Author: JackeyLee007 <[email protected]>
AuthorDate: Tue Dec 10 10:32:23 2024 +0800

    [flink-cdc] kafka_sync_database supports table name mapping when prefix and 
postfix could not fit the need. (#4660)
---
 docs/content/cdc-ingestion/kafka-cdc.md            |  1 +
 .../shortcodes/generated/kafka_sync_database.html  |  4 ++
 .../flink/action/cdc/CdcActionCommonUtils.java     |  1 +
 .../flink/action/cdc/SyncDatabaseActionBase.java   | 11 +++++-
 .../action/cdc/SyncDatabaseActionFactoryBase.java  |  2 +
 .../flink/action/cdc/TableNameConverter.java       | 31 ++++++++++++++-
 .../action/cdc/mysql/MySqlSyncDatabaseAction.java  |  3 +-
 .../flink/action/cdc/TableNameConverterTest.java   | 45 ++++++++++++++++++++++
 8 files changed, 94 insertions(+), 4 deletions(-)

diff --git a/docs/content/cdc-ingestion/kafka-cdc.md 
b/docs/content/cdc-ingestion/kafka-cdc.md
index f57260275e..b037937c55 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -198,6 +198,7 @@ To use this feature through `flink run`, run the following 
shell command.
     kafka_sync_database
     --warehouse <warehouse-path> \
     --database <database-name> \
+    [--table_mapping <table-name>=<paimon-table-name>] \
     [--table_prefix <paimon-table-prefix>] \
     [--table_suffix <paimon-table-suffix>] \
     [--including_tables <table-name|name-regular-expr>] \
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 888901991d..6c90f1d7f7 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -37,6 +37,10 @@ under the License.
         <td><h5>--ignore_incompatible</h5></td>
         <td>It is default false, in this case, if MySQL table name exists in 
Paimon and their schema is incompatible,an exception will be thrown. You can 
specify it to true explicitly to ignore the incompatible tables and 
exception.</td>
     </tr>
+    <tr>
+        <td><h5>--table_mapping</h5></td>
+        <td>The table name mapping between source database and Paimon. For 
example, if you want to synchronize a source table named "test" to a Paimon 
table named "paimon_test", you can specify "--table_mapping test=paimon_test". 
Multiple mappings could be specified with multiple "--table_mapping" options. 
"--table_mapping" has higher priority than "--table_prefix" and 
"--table_suffix".</td>
+    </tr>
     <tr>
         <td><h5>--table_prefix</h5></td>
         <td>The prefix of all Paimon tables to be synchronized. For example, 
if you want all synchronized tables to have "ods_" as prefix, you can specify 
"--table_prefix ods_".</td>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 8f96022bde..83891c90b8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -56,6 +56,7 @@ public class CdcActionCommonUtils {
     public static final String PULSAR_CONF = "pulsar_conf";
     public static final String TABLE_PREFIX = "table_prefix";
     public static final String TABLE_SUFFIX = "table_suffix";
+    public static final String TABLE_MAPPING = "table_mapping";
     public static final String INCLUDING_TABLES = "including_tables";
     public static final String EXCLUDING_TABLES = "excluding_tables";
     public static final String TYPE_MAPPING = "type_mapping";
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4ab56bdcf1..ac3483ac23 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -52,6 +52,7 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
     protected MultiTablesSinkMode mode = COMBINED;
     protected String tablePrefix = "";
     protected String tableSuffix = "";
+    protected Map<String, String> tableMapping = new HashMap<>();
     protected String includingTables = ".*";
     protected List<String> partitionKeys = new ArrayList<>();
     protected List<String> primaryKeys = new ArrayList<>();
@@ -97,6 +98,13 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         return this;
     }
 
+    public SyncDatabaseActionBase withTableMapping(Map<String, String> 
tableMapping) {
+        if (tableMapping != null) {
+            this.tableMapping = tableMapping;
+        }
+        return this;
+    }
+
     public SyncDatabaseActionBase includingTables(@Nullable String 
includingTables) {
         if (includingTables != null) {
             this.includingTables = includingTables;
@@ -155,7 +163,8 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         Pattern excludingPattern =
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
         TableNameConverter tableNameConverter =
-                new TableNameConverter(allowUpperCase, mergeShards, 
tablePrefix, tableSuffix);
+                new TableNameConverter(
+                        allowUpperCase, mergeShards, tablePrefix, tableSuffix, 
tableMapping);
         Set<String> createdTables;
         try {
             createdTables = new HashSet<>(catalog.listTables(database));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index e7a386979d..2135f2a281 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -29,6 +29,7 @@ import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.INCLUDING_
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
@@ -51,6 +52,7 @@ public abstract class SyncDatabaseActionFactoryBase<T extends 
SyncDatabaseAction
     protected void withParams(MultipleParameterToolAdapter params, T action) {
         action.withTablePrefix(params.get(TABLE_PREFIX))
                 .withTableSuffix(params.get(TABLE_SUFFIX))
+                .withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
                 .includingTables(params.get(INCLUDING_TABLES))
                 .excludingTables(params.get(EXCLUDING_TABLES))
                 .withPartitionKeyMultiple(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 67c70aa58c..4eca8b903e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -21,6 +21,8 @@ package org.apache.paimon.flink.action.cdc;
 import org.apache.paimon.catalog.Identifier;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
 /** Used to convert a MySQL source table name to corresponding Paimon table 
name. */
 public class TableNameConverter implements Serializable {
@@ -31,20 +33,31 @@ public class TableNameConverter implements Serializable {
     private final boolean mergeShards;
     private final String prefix;
     private final String suffix;
+    private final Map<String, String> tableMapping;
 
     public TableNameConverter(boolean caseSensitive) {
-        this(caseSensitive, true, "", "");
+        this(caseSensitive, true, "", "", null);
     }
 
     public TableNameConverter(
-            boolean caseSensitive, boolean mergeShards, String prefix, String 
suffix) {
+            boolean caseSensitive,
+            boolean mergeShards,
+            String prefix,
+            String suffix,
+            Map<String, String> tableMapping) {
         this.caseSensitive = caseSensitive;
         this.mergeShards = mergeShards;
         this.prefix = prefix;
         this.suffix = suffix;
+        this.tableMapping = lowerMapKey(tableMapping);
     }
 
     public String convert(String originName) {
+        if (tableMapping.containsKey(originName.toLowerCase())) {
+            String mappedName = tableMapping.get(originName.toLowerCase());
+            return caseSensitive ? mappedName : mappedName.toLowerCase();
+        }
+
         String tableName = caseSensitive ? originName : 
originName.toLowerCase();
         return prefix + tableName + suffix;
     }
@@ -58,4 +71,18 @@ public class TableNameConverter implements Serializable {
                                 + originIdentifier.getObjectName();
         return convert(rawName);
     }
+
+    private Map<String, String> lowerMapKey(Map<String, String> map) {
+        int size = map == null ? 0 : map.size();
+        Map<String, String> lowerKeyMap = new HashMap<>(size);
+        if (size == 0) {
+            return lowerKeyMap;
+        }
+
+        for (String key : map.keySet()) {
+            lowerKeyMap.put(key.toLowerCase(), map.get(key));
+        }
+
+        return lowerKeyMap;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index f8ea8cdc44..235b3f9a32 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -138,7 +138,8 @@ public class MySqlSyncDatabaseAction extends 
SyncDatabaseActionBase {
                         + ", or MySQL database does not exist.");
 
         TableNameConverter tableNameConverter =
-                new TableNameConverter(allowUpperCase, mergeShards, 
tablePrefix, tableSuffix);
+                new TableNameConverter(
+                        allowUpperCase, mergeShards, tablePrefix, tableSuffix, 
tableMapping);
         for (JdbcTableInfo tableInfo : jdbcTableInfos) {
             Identifier identifier =
                     Identifier.create(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
new file mode 100644
index 0000000000..dfbe32e3d3
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Tests for {@link TableNameConverter}. */
+public class TableNameConverterTest {
+
+    @Test
+    public void testConvertTableName() {
+        Map<String, String> tableMapping = new HashMap<>(1);
+        tableMapping.put("mapped_src", "mapped_TGT");
+        TableNameConverter caseConverter =
+                new TableNameConverter(true, true, "pre_", "_pos", 
tableMapping);
+        Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT");
+
+        Assert.assertEquals(caseConverter.convert("unmapped_src"), 
"pre_unmapped_src_pos");
+
+        TableNameConverter noCaseConverter =
+                new TableNameConverter(false, true, "pre_", "_pos", 
tableMapping);
+        Assert.assertEquals(noCaseConverter.convert("mapped_src"), 
"mapped_tgt");
+        Assert.assertEquals(noCaseConverter.convert("unmapped_src"), 
"pre_unmapped_src_pos");
+    }
+}

Reply via email to