This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d61f3d2659 [flink] kafka_sync_database supports different prefix and
suffix for different db (#4704)
d61f3d2659 is described below
commit d61f3d2659572d44fc028e5e16f957504ee1a7f8
Author: JackeyLee007 <[email protected]>
AuthorDate: Sun Dec 15 13:44:47 2024 +0800
[flink] kafka_sync_database supports different prefix and suffix for
different db (#4704)
---
docs/content/cdc-ingestion/kafka-cdc.md | 2 +
.../shortcodes/generated/kafka_sync_database.html | 12 +++++-
.../flink/action/cdc/CdcActionCommonUtils.java | 2 +
.../flink/action/cdc/SyncDatabaseActionBase.java | 34 +++++++++++++++-
.../action/cdc/SyncDatabaseActionFactoryBase.java | 4 ++
.../flink/action/cdc/TableNameConverter.java | 47 +++++++++++++++++++---
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 3 +-
.../flink/action/cdc/TableNameConverterTest.java | 42 +++++++++++++++++--
pom.xml | 1 +
9 files changed, 133 insertions(+), 14 deletions(-)
diff --git a/docs/content/cdc-ingestion/kafka-cdc.md
b/docs/content/cdc-ingestion/kafka-cdc.md
index b037937c55..26a5be3409 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -199,7 +199,9 @@ To use this feature through `flink run`, run the following
shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
+ [--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
+ [--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index 6c90f1d7f7..3664128a26 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -41,13 +41,21 @@ under the License.
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For
example, if you want to synchronize a source table named "test" to a Paimon
table named "paimon_test", you can specify "--table_mapping test=paimon_test".
Multiple mappings could be specified with multiple "--table_mapping" options.
"--table_mapping" has higher priority than "--table_prefix" and
"--table_suffix".</td>
</tr>
+ <tr>
+ <td><h5>--table_prefix_db</h5></td>
+ <td>The prefix of the Paimon tables to be synchronized from the
specified db. For example, if you want to prefix the tables from db1 with
"ods_db1_", you can specify "--table_prefix_db db1=ods_db1_".
"--table_prefix_db" has higher priority than "--table_prefix".</td>
+ </tr>
<tr>
<td><h5>--table_prefix</h5></td>
- <td>The prefix of all Paimon tables to be synchronized. For example,
if you want all synchronized tables to have "ods_" as prefix, you can specify
"--table_prefix ods_".</td>
+ <td>The prefix of all Paimon tables to be synchronized except those
specified by "--table_mapping" or "--table_prefix_db". For example, if you want
all synchronized tables to have "ods_" as prefix, you can specify
"--table_prefix ods_".</td>
+ </tr>
+ <tr>
+ <td><h5>--table_suffix_db</h5></td>
+ <td>The suffix of the Paimon tables to be synchronized from the
specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
- <td>The suffix of all Paimon tables to be synchronized. The usage is
same as "--table_prefix".</td>
+ <td>The suffix of all Paimon tables to be synchronized except those
specified by "--table_mapping" or "--table_suffix_db". The usage is same as
"--table_prefix".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 83891c90b8..c8af6f91c4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -56,6 +56,8 @@ public class CdcActionCommonUtils {
public static final String PULSAR_CONF = "pulsar_conf";
public static final String TABLE_PREFIX = "table_prefix";
public static final String TABLE_SUFFIX = "table_suffix";
+ public static final String TABLE_PREFIX_DB = "table_prefix_db";
+ public static final String TABLE_SUFFIX_DB = "table_suffix_db";
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index ac3483ac23..4fb1339c51 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -53,6 +53,8 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
protected String tablePrefix = "";
protected String tableSuffix = "";
protected Map<String, String> tableMapping = new HashMap<>();
+ protected Map<String, String> dbPrefix = new HashMap<>();
+ protected Map<String, String> dbSuffix = new HashMap<>();
protected String includingTables = ".*";
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
@@ -98,6 +100,30 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
return this;
}
+ public SyncDatabaseActionBase withDbPrefix(Map<String, String> dbPrefix) {
+ if (dbPrefix != null) {
+ this.dbPrefix =
+ dbPrefix.entrySet().stream()
+ .collect(
+ HashMap::new,
+ (m, e) -> m.put(e.getKey().toLowerCase(),
e.getValue()),
+ HashMap::putAll);
+ }
+ return this;
+ }
+
+ public SyncDatabaseActionBase withDbSuffix(Map<String, String> dbSuffix) {
+ if (dbSuffix != null) {
+ this.dbSuffix =
+ dbSuffix.entrySet().stream()
+ .collect(
+ HashMap::new,
+ (m, e) -> m.put(e.getKey().toLowerCase(),
e.getValue()),
+ HashMap::putAll);
+ }
+ return this;
+ }
+
public SyncDatabaseActionBase withTableMapping(Map<String, String>
tableMapping) {
if (tableMapping != null) {
this.tableMapping = tableMapping;
@@ -164,7 +190,13 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
excludingTables == null ? null :
Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(
- allowUpperCase, mergeShards, tablePrefix, tableSuffix,
tableMapping);
+ allowUpperCase,
+ mergeShards,
+ dbPrefix,
+ dbSuffix,
+ tablePrefix,
+ tableSuffix,
+ tableMapping);
Set<String> createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 2135f2a281..d497b588c2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -31,7 +31,9 @@ import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX_DB;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;
/** Base {@link ActionFactory} for synchronizing into database. */
@@ -52,6 +54,8 @@ public abstract class SyncDatabaseActionFactoryBase<T extends
SyncDatabaseAction
protected void withParams(MultipleParameterToolAdapter params, T action) {
action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
+ .withDbPrefix(optionalConfigMap(params, TABLE_PREFIX_DB))
+ .withDbSuffix(optionalConfigMap(params, TABLE_SUFFIX_DB))
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
index 4eca8b903e..15fc3507ce 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java
@@ -31,6 +31,8 @@ public class TableNameConverter implements Serializable {
private final boolean caseSensitive;
private final boolean mergeShards;
+ private final Map<String, String> dbPrefix;
+ private final Map<String, String> dbSuffix;
private final String prefix;
private final String suffix;
private final Map<String, String> tableMapping;
@@ -45,21 +47,54 @@ public class TableNameConverter implements Serializable {
String prefix,
String suffix,
Map<String, String> tableMapping) {
+ this(
+ caseSensitive,
+ mergeShards,
+ new HashMap<>(),
+ new HashMap<>(),
+ prefix,
+ suffix,
+ tableMapping);
+ }
+
+ public TableNameConverter(
+ boolean caseSensitive,
+ boolean mergeShards,
+ Map<String, String> dbPrefix,
+ Map<String, String> dbSuffix,
+ String prefix,
+ String suffix,
+ Map<String, String> tableMapping) {
this.caseSensitive = caseSensitive;
this.mergeShards = mergeShards;
+ this.dbPrefix = dbPrefix;
+ this.dbSuffix = dbSuffix;
this.prefix = prefix;
this.suffix = suffix;
this.tableMapping = lowerMapKey(tableMapping);
}
- public String convert(String originName) {
- if (tableMapping.containsKey(originName.toLowerCase())) {
- String mappedName = tableMapping.get(originName.toLowerCase());
+ public String convert(String originDbName, String originTblName) {
+ // top priority: table mapping
+ if (tableMapping.containsKey(originTblName.toLowerCase())) {
+ String mappedName = tableMapping.get(originTblName.toLowerCase());
return caseSensitive ? mappedName : mappedName.toLowerCase();
}
- String tableName = caseSensitive ? originName :
originName.toLowerCase();
- return prefix + tableName + suffix;
+ String tblPrefix = prefix;
+ String tblSuffix = suffix;
+
+ // second priority: prefix and postfix specified by db
+ if (dbPrefix.containsKey(originDbName.toLowerCase())) {
+ tblPrefix = dbPrefix.get(originDbName.toLowerCase());
+ }
+ if (dbSuffix.containsKey(originDbName.toLowerCase())) {
+ tblSuffix = dbSuffix.get(originDbName.toLowerCase());
+ }
+
+ // third priority: normal prefix and suffix
+ String tableName = caseSensitive ? originTblName :
originTblName.toLowerCase();
+ return tblPrefix + tableName + tblSuffix;
}
public String convert(Identifier originIdentifier) {
@@ -69,7 +104,7 @@ public class TableNameConverter implements Serializable {
: originIdentifier.getDatabaseName()
+ "_"
+ originIdentifier.getObjectName();
- return convert(rawName);
+ return convert(originIdentifier.getDatabaseName(), rawName);
}
private Map<String, String> lowerMapKey(Map<String, String> map) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 235b3f9a32..ce2e9124a6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -143,7 +143,8 @@ public class MySqlSyncDatabaseAction extends
SyncDatabaseActionBase {
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
- database,
tableNameConverter.convert(tableInfo.toPaimonTableName()));
+ database,
+ tableNameConverter.convert("",
tableInfo.toPaimonTableName()));
FileStoreTable table;
Schema fromMySql =
CdcActionCommonUtils.buildPaimonSchema(
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
index dfbe32e3d3..89bbadfeb8 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java
@@ -33,13 +33,47 @@ public class TableNameConverterTest {
tableMapping.put("mapped_src", "mapped_TGT");
TableNameConverter caseConverter =
new TableNameConverter(true, true, "pre_", "_pos",
tableMapping);
- Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT");
+ Assert.assertEquals(caseConverter.convert("", "mapped_SRC"),
"mapped_TGT");
- Assert.assertEquals(caseConverter.convert("unmapped_src"),
"pre_unmapped_src_pos");
+ Assert.assertEquals(caseConverter.convert("", "unmapped_src"),
"pre_unmapped_src_pos");
TableNameConverter noCaseConverter =
new TableNameConverter(false, true, "pre_", "_pos",
tableMapping);
- Assert.assertEquals(noCaseConverter.convert("mapped_src"),
"mapped_tgt");
- Assert.assertEquals(noCaseConverter.convert("unmapped_src"),
"pre_unmapped_src_pos");
+ Assert.assertEquals(noCaseConverter.convert("", "mapped_src"),
"mapped_tgt");
+ Assert.assertEquals(noCaseConverter.convert("", "unmapped_src"),
"pre_unmapped_src_pos");
+ }
+
+ @Test
+ public void testConvertTableNameByDBPrefix_Suffix() {
+ Map<String, String> dbPrefix = new HashMap<>(2);
+ dbPrefix.put("db_with_prefix", "db_pref_");
+ dbPrefix.put("db_with_prefix_suffix", "db_pref_");
+
+ Map<String, String> dbSuffix = new HashMap<>(2);
+ dbSuffix.put("db_with_suffix", "_db_suff");
+ dbSuffix.put("db_with_prefix_suffix", "_db_suff");
+
+ TableNameConverter tblNameConverter =
+ new TableNameConverter(false, true, dbPrefix, dbSuffix,
"pre_", "_suf", null);
+
+ // Tables in the specified db should have the specified prefix and
suffix.
+
+ // db prefix + normal suffix
+ Assert.assertEquals(
+ "db_pref_table_name_suf",
tblNameConverter.convert("db_with_prefix", "table_name"));
+
+ // normal prefix + db suffix
+ Assert.assertEquals(
+ "pre_table_name_db_suff",
tblNameConverter.convert("db_with_suffix", "table_name"));
+
+ // db prefix + db suffix
+ Assert.assertEquals(
+ "db_pref_table_name_db_suff",
+ tblNameConverter.convert("db_with_prefix_suffix",
"table_name"));
+
+ // only normal prefix and suffix
+ Assert.assertEquals(
+ "pre_table_name_suf",
+ tblNameConverter.convert("db_without_prefix_suffix",
"table_name"));
}
}
diff --git a/pom.xml b/pom.xml
index 904b1c73c7..dbef98af06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -529,6 +529,7 @@ under the License.
<exclude>release/**</exclude>
<!-- antlr grammar files -->
<exclude>paimon-common/src/main/antlr4/**</exclude>
+
<exclude>paimon-core/src/test/resources/compatibility/**</exclude>
</excludes>
</configuration>
</plugin>