This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fdfa6d99d [flink][cdc] Extract common util methods (#1926)
fdfa6d99d is described below
commit fdfa6d99d7d4ec3d9eb948589da338f0e16d8050
Author: yuzelin <[email protected]>
AuthorDate: Thu Aug 31 18:55:38 2023 +0800
[flink][cdc] Extract common util methods (#1926)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 253 +++++++++++++++++++++
.../flink/action/cdc/kafka/KafkaActionUtils.java | 109 ++-------
.../paimon/flink/action/cdc/kafka/KafkaSchema.java | 8 +-
.../action/cdc/kafka/KafkaSyncTableAction.java | 3 +-
.../action/cdc/kafka/formats/RecordParser.java | 21 --
.../cdc/kafka/formats/canal/CanalRecordParser.java | 8 +-
.../cdc/kafka/formats/ogg/OggRecordParser.java | 4 +-
.../action/cdc/mongodb/MongoDBActionUtils.java | 50 ++--
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 50 +---
.../flink/action/cdc/mongodb/MongodbSchema.java | 20 +-
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 27 +--
.../flink/action/cdc/mysql/MySqlActionUtils.java | 121 ++--------
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 20 +-
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 67 +-----
.../action/cdc/mysql/MySqlSyncTableAction.java | 3 +-
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 23 +-
.../flink/action/cdc/mysql/schema/MySqlSchema.java | 11 +-
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 4 +-
.../cdc/kafka/KafkaOggSyncTableActionITCase.java | 4 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 10 +-
20 files changed, 391 insertions(+), 425 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
new file mode 100644
index 000000000..4d0af4d37
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Common utils for CDC Action. */
+public class CdcActionCommonUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CdcActionCommonUtils.class);
+
+ public static void assertSchemaCompatible(
+ TableSchema paimonSchema, List<DataField> sourceTableFields) {
+ if (!schemaCompatible(paimonSchema, sourceTableFields)) {
+ throw new IllegalArgumentException(
+ "Paimon schema and source table schema are not
compatible.\n"
+ + "Paimon fields are: "
+ + paimonSchema.fields()
+ + ".\nSource table fields are: "
+ + sourceTableFields);
+ }
+ }
+
+ public static boolean schemaCompatible(
+ TableSchema paimonSchema, List<DataField> sourceTableFields) {
+ for (DataField field : sourceTableFields) {
+ int idx = paimonSchema.fieldNames().indexOf(field.name());
+ if (idx < 0) {
+ LOG.info("Cannot find field '{}' in Paimon table.",
field.name());
+ return false;
+ }
+ DataType type = paimonSchema.fields().get(idx).type();
+ if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
+ != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT)
{
+ LOG.info(
+ "Cannot convert field '{}' from source table type '{}'
to Paimon type '{}'.",
+ field.name(),
+ field.type(),
+ type);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static <T> LinkedHashMap<String, T> mapKeyCaseConvert(
+ LinkedHashMap<String, T> origin,
+ boolean caseSensitive,
+ Function<String, String> duplicateErrMsg) {
+ return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg,
LinkedHashMap::new);
+ }
+
+ public static <T> Map<String, T> mapKeyCaseConvert(
+ Map<String, T> origin,
+ boolean caseSensitive,
+ Function<String, String> duplicateErrMsg) {
+ return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg,
HashMap::new);
+ }
+
+ private static <T, M extends Map<String, T>> M mapKeyCaseConvert(
+ M origin,
+ boolean caseSensitive,
+ Function<String, String> duplicateErrMsg,
+ Supplier<M> mapSupplier) {
+ if (caseSensitive) {
+ return origin;
+ } else {
+ M newMap = mapSupplier.get();
+ for (Map.Entry<String, T> entry : origin.entrySet()) {
+ String key = entry.getKey();
+ checkArgument(!newMap.containsKey(key.toLowerCase()),
duplicateErrMsg.apply(key));
+ newMap.put(key.toLowerCase(), entry.getValue());
+ }
+ return newMap;
+ }
+ }
+
+ public static Function<String, String> columnDuplicateErrMsg(String
tableName) {
+ return column ->
+ String.format(
+ "Failed to convert columns of table '%s' to
case-insensitive form because duplicate column found: '%s'.",
+ tableName, column);
+ }
+
+ public static Function<String, String>
recordKeyDuplicateErrMsg(Map<String, String> record) {
+ return column ->
+ "Failed to convert record map to case-insensitive form because
duplicate column found. Original record map is:\n"
+ + record;
+ }
+
+ public static List<String> listCaseConvert(List<String> origin, boolean
caseSensitive) {
+ return caseSensitive
+ ? origin
+ :
origin.stream().map(String::toLowerCase).collect(Collectors.toList());
+ }
+
+ public static Schema buildPaimonSchema(
+ List<String> specifiedPartitionKeys,
+ List<String> specifiedPrimaryKeys,
+ List<ComputedColumn> computedColumns,
+ Map<String, String> tableConfig,
+ LinkedHashMap<String, DataType> sourceColumns,
+ @Nullable List<String> sourceColumnComments,
+ List<String> sourcePrimaryKeys) {
+ Schema.Builder builder = Schema.newBuilder();
+
+ // options
+ builder.options(tableConfig);
+
+ // columns
+ if (sourceColumnComments != null) {
+ checkArgument(
+ sourceColumns.size() == sourceColumnComments.size(),
+ "Source table columns count and column comments count
should be equal.");
+
+ int i = 0;
+ for (Map.Entry<String, DataType> entry : sourceColumns.entrySet())
{
+ builder.column(entry.getKey(), entry.getValue(),
sourceColumnComments.get(i++));
+ }
+ } else {
+ sourceColumns.forEach(builder::column);
+ }
+
+ for (ComputedColumn computedColumn : computedColumns) {
+ builder.column(computedColumn.columnName(),
computedColumn.columnType());
+ }
+
+ // primary keys
+ if (!specifiedPrimaryKeys.isEmpty()) {
+ for (String key : specifiedPrimaryKeys) {
+ if (!sourceColumns.containsKey(key)
+ && computedColumns.stream().noneMatch(c ->
c.columnName().equals(key))) {
+ throw new IllegalArgumentException(
+ "Specified primary key '"
+ + key
+ + "' does not exist in source tables or
computed columns.");
+ }
+ }
+ builder.primaryKey(specifiedPrimaryKeys);
+ } else if (!sourcePrimaryKeys.isEmpty()) {
+ builder.primaryKey(sourcePrimaryKeys);
+ } else {
+ throw new IllegalArgumentException(
+ "Primary keys are not specified. "
+ + "Also, can't infer primary keys from source
table schemas because "
+ + "source tables have no primary keys or have
different primary keys.");
+ }
+
+ // partition keys
+ if (!specifiedPartitionKeys.isEmpty()) {
+ builder.partitionKeys(specifiedPartitionKeys);
+ }
+
+ return builder.build();
+ }
+
+ public static String tableList(
+ MultiTablesSinkMode mode,
+ String databasePattern,
+ String includingTablePattern,
+ List<Identifier> monitoredTables,
+ List<Identifier> excludedTables) {
+ if (mode == DIVIDED) {
+ return dividedModeTableList(monitoredTables);
+ } else if (mode == COMBINED) {
+ return combinedModeTableList(databasePattern,
includingTablePattern, excludedTables);
+ }
+
+ throw new UnsupportedOperationException("Unknown MultiTablesSinkMode:
" + mode);
+ }
+
+ private static String dividedModeTableList(List<Identifier>
monitoredTables) {
+ // In DIVIDED mode, we only concern about existed tables
+ return monitoredTables.stream()
+ .map(t -> t.getDatabaseName() + "\\." + t.getObjectName())
+ .collect(Collectors.joining("|"));
+ }
+
+ public static String combinedModeTableList(
+ String databasePattern, String includingTablePattern,
List<Identifier> excludedTables) {
+ // In COMBINED mode, we should consider both existed tables
+ // and possible newly created
+ // tables, so we should use regular expression to monitor all valid
tables and exclude
+ // certain invalid tables
+
+ // The table list is built by template:
+ //
(?!(^db\\.tbl$)|(^...$))((databasePattern)\\.(including_pattern1|...))
+
+ // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables
whose qualified name
+ // is exactly equal to 'db.tbl'
+ // The including pattern (databasePattern)\\.(including_pattern1|...)
can include tables
+ // whose qualified name matches one of the patterns
+
+ // a table can be monitored only when its name meets the including
pattern and doesn't
+ // be excluded by excluding pattern at the same time
+ String includingPattern =
+ String.format("(%s)\\.(%s)", databasePattern,
includingTablePattern);
+
+ if (excludedTables.isEmpty()) {
+ return includingPattern;
+ }
+
+ String excludingPattern =
+ excludedTables.stream()
+ .map(
+ t ->
+ String.format(
+ "(^%s$)",
+ t.getDatabaseName() + "\\." +
t.getObjectName()))
+ .collect(Collectors.joining("|"));
+ excludingPattern = "?!" + excludingPattern;
+ return String.format("(%s)(%s)", excludingPattern, includingPattern);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
index 3fae54f01..02a3b0640 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java
@@ -18,11 +18,9 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
-import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.StringUtils;
@@ -52,6 +50,9 @@ import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.utils.Preconditions.checkArgument;
class KafkaActionUtils {
@@ -61,98 +62,28 @@ class KafkaActionUtils {
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
- static void assertSchemaCompatible(TableSchema tableSchema, Schema
kafkaCanalSchema) {
- if (!schemaCompatible(tableSchema, kafkaCanalSchema)) {
- throw new IllegalArgumentException(
- "Paimon schema and Kafka schema are not compatible.\n"
- + "Paimon fields are: "
- + tableSchema.fields()
- + ".\nKafka fields are: "
- + kafkaCanalSchema.fields());
- }
- }
-
- static boolean schemaCompatible(TableSchema paimonSchema, Schema
kafkaCanalSchema) {
- for (DataField field : kafkaCanalSchema.fields()) {
- int idx = paimonSchema.fieldNames().indexOf(field.name());
- if (idx < 0) {
- return false;
- }
- DataType type = paimonSchema.fields().get(idx).type();
- if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
- != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT)
{
- return false;
- }
- }
- return true;
- }
-
static Schema buildPaimonSchema(
KafkaSchema kafkaSchema,
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
List<ComputedColumn> computedColumns,
- Map<String, String> paimonConfig,
+ Map<String, String> tableConfig,
boolean caseSensitive) {
- Schema.Builder builder = Schema.newBuilder();
- builder.options(paimonConfig);
-
- // build columns and primary keys from mySqlSchema
- Map<String, DataType> mySqlFields;
- List<String> mySqlPrimaryKeys;
- if (caseSensitive) {
- mySqlFields = kafkaSchema.fields();
- mySqlPrimaryKeys = kafkaSchema.primaryKeys();
- } else {
- mySqlFields = new LinkedHashMap<>();
- for (Map.Entry<String, DataType> entry :
kafkaSchema.fields().entrySet()) {
- String fieldName = entry.getKey();
- checkArgument(
- !mySqlFields.containsKey(fieldName.toLowerCase()),
- String.format(
- "Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
- fieldName, kafkaSchema.tableName()));
- mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
- }
- mySqlPrimaryKeys =
- kafkaSchema.primaryKeys().stream()
- .map(String::toLowerCase)
- .collect(Collectors.toList());
- }
-
- for (Map.Entry<String, DataType> entry : mySqlFields.entrySet()) {
- builder.column(entry.getKey(), entry.getValue(), null);
- }
-
- for (ComputedColumn computedColumn : computedColumns) {
- builder.column(computedColumn.columnName(),
computedColumn.columnType());
- }
-
- if (specifiedPrimaryKeys.size() > 0) {
- for (String key : specifiedPrimaryKeys) {
- if (!mySqlFields.containsKey(key)
- && computedColumns.stream().noneMatch(c ->
c.columnName().equals(key))) {
- throw new IllegalArgumentException(
- "Specified primary key "
- + key
- + " does not exist in kafka topic's table
or computed columns.");
- }
- }
- builder.primaryKey(specifiedPrimaryKeys);
- } else if (mySqlPrimaryKeys.size() > 0) {
- builder.primaryKey(mySqlPrimaryKeys);
- } else {
- throw new IllegalArgumentException(
- "Primary keys are not specified. "
- + "Also, can't infer primary keys from kafka
topic's table schemas because "
- + "Kafka topic's table have no primary keys or
have different primary keys.");
- }
-
- if (specifiedPartitionKeys.size() > 0) {
- builder.partitionKeys(specifiedPartitionKeys);
- }
-
- return builder.build();
+ LinkedHashMap<String, DataType> sourceColumns =
+ mapKeyCaseConvert(
+ kafkaSchema.fields(),
+ caseSensitive,
+ columnDuplicateErrMsg(kafkaSchema.tableName()));
+ List<String> primaryKeys = listCaseConvert(kafkaSchema.primaryKeys(),
caseSensitive);
+
+ return CdcActionCommonUtils.buildPaimonSchema(
+ specifiedPartitionKeys,
+ specifiedPrimaryKeys,
+ computedColumns,
+ tableConfig,
+ sourceColumns,
+ null,
+ primaryKeys);
}
static KafkaSource<String> buildKafkaSource(Configuration kafkaConfig) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
index 0a5931955..58ff74d84 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchema.java
@@ -37,8 +37,8 @@ import
org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
@@ -55,13 +55,13 @@ public class KafkaSchema {
private static final int POLL_TIMEOUT_MILLIS = 1000;
private final String databaseName;
private final String tableName;
- private final Map<String, DataType> fields;
+ private final LinkedHashMap<String, DataType> fields;
private final List<String> primaryKeys;
public KafkaSchema(
String databaseName,
String tableName,
- Map<String, DataType> fields,
+ LinkedHashMap<String, DataType> fields,
List<String> primaryKeys) {
this.databaseName = databaseName;
this.tableName = tableName;
@@ -77,7 +77,7 @@ public class KafkaSchema {
return databaseName;
}
- public Map<String, DataType> fields() {
+ public LinkedHashMap<String, DataType> fields() {
return fields;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index ef3dee8da..d8669eba3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
@@ -163,7 +164,7 @@ public class KafkaSyncTableAction extends ActionBase {
caseSensitive);
try {
table = (FileStoreTable) catalog.getTable(identifier);
- KafkaActionUtils.assertSchemaCompatible(table.schema(), fromCanal);
+ CdcActionCommonUtils.assertSchemaCompatible(table.schema(),
fromCanal.fields());
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromCanal, false);
table = (FileStoreTable) catalog.getTable(identifier);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
index 8527d93b6..01a744e96 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
@@ -31,14 +31,10 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMap
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/**
* Provides a base implementation for parsing messages of various formats into
{@link
* RichCdcMultiplexRecord} objects.
@@ -92,19 +88,6 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
extractRecords().forEach(out::collect);
}
- protected Map<String, String> keyCaseInsensitive(Map<String, String>
origin) {
- Map<String, String> keyCaseInsensitive = new HashMap<>();
- for (Map.Entry<String, String> entry : origin.entrySet()) {
- String fieldName = entry.getKey().toLowerCase();
- checkArgument(
- !keyCaseInsensitive.containsKey(fieldName),
- "Duplicate key appears when converting map keys to
case-insensitive form. Original map is:\n%s",
- origin);
- keyCaseInsensitive.put(fieldName, entry.getValue());
- }
- return keyCaseInsensitive;
- }
-
protected List<String> extractPrimaryKeys(String primaryKeys) {
return StreamSupport.stream(root.get(primaryKeys).spliterator(), false)
.map(pk -> toFieldName(pk.asText()))
@@ -114,8 +97,4 @@ public abstract class RecordParser implements
FlatMapFunction<String, RichCdcMul
protected String toFieldName(String rawName) {
return StringUtils.caseSensitiveConversion(rawName, caseSensitive);
}
-
- protected Map<String, String> adjustCase(Map<String, String> map) {
- return caseSensitive ? map : keyCaseInsensitive(map);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
index 5f3b82c40..2cc3dd43e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
@@ -45,6 +45,8 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
@@ -256,7 +258,7 @@ public class CanalRecordParser extends RecordParser {
for (JsonNode datum : data) {
Map<String, String> rowData =
extractRowFromJson(datum, mySqlFieldTypes,
paimonFieldTypes);
- rowData = adjustCase(rowData);
+ rowData = mapKeyCaseConvert(rowData, caseSensitive,
recordKeyDuplicateErrMsg(rowData));
records.add(
new RichCdcMultiplexRecord(
@@ -291,7 +293,7 @@ public class CanalRecordParser extends RecordParser {
before.putIfAbsent(entry.getKey(), entry.getValue());
}
- before = adjustCase(before);
+ before = mapKeyCaseConvert(before, caseSensitive,
recordKeyDuplicateErrMsg(before));
records.add(
new RichCdcMultiplexRecord(
databaseName,
@@ -301,7 +303,7 @@ public class CanalRecordParser extends RecordParser {
new CdcRecord(RowKind.DELETE, before)));
}
- after = adjustCase(after);
+ after = mapKeyCaseConvert(after, caseSensitive,
recordKeyDuplicateErrMsg(after));
records.add(
new RichCdcMultiplexRecord(
databaseName,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
index 8d735b504..4704ee66e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
@@ -41,6 +41,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
@@ -93,7 +95,7 @@ public class OggRecordParser extends RecordParser {
JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord>
records) {
LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
Map<String, String> rowData = extractRow(jsonNode, paimonFieldTypes);
- rowData = adjustCase(rowData);
+ rowData = mapKeyCaseConvert(rowData, caseSensitive,
recordKeyDuplicateErrMsg(rowData));
records.add(createRecord(rowKind, rowData, paimonFieldTypes));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
index 5870db179..4b18bc832 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBActionUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.mongodb;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
@@ -34,13 +35,15 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -158,36 +161,21 @@ public class MongoDBActionUtils {
static Schema buildPaimonSchema(
MongodbSchema mongodbSchema,
List<String> specifiedPartitionKeys,
- Map<String, String> paimonConfig,
+ Map<String, String> tableConfig,
boolean caseSensitive) {
-
- Schema.Builder builder = Schema.newBuilder().options(paimonConfig);
-
- Map<String, DataType> mongodbFields =
- caseSensitive
- ? mongodbSchema.fields()
- : mongodbSchema.fields().entrySet().stream()
- .collect(
- Collectors.toMap(
- entry ->
entry.getKey().toLowerCase(),
- Map.Entry::getValue,
- (existing, replacement) -> {
- throw new
IllegalArgumentException(
- String.format(
- "Duplicate
key '%s' in table '%s' appears when converting fields map keys to
case-insensitive form.",
- existing,
-
mongodbSchema.tableName()));
- },
- LinkedHashMap::new));
-
- mongodbFields.forEach(builder::column);
-
- builder.primaryKey(Lists.newArrayList(PRIMARY_KEY));
-
- if (!specifiedPartitionKeys.isEmpty()) {
- builder.partitionKeys(specifiedPartitionKeys);
- }
-
- return builder.build();
+ LinkedHashMap<String, DataType> sourceColumns =
+ mapKeyCaseConvert(
+ mongodbSchema.fields(),
+ caseSensitive,
+ columnDuplicateErrMsg(mongodbSchema.tableName()));
+
+ return CdcActionCommonUtils.buildPaimonSchema(
+ specifiedPartitionKeys,
+ Lists.newArrayList(PRIMARY_KEY),
+ Collections.emptyList(),
+ tableConfig,
+ sourceColumns,
+ null,
+ Collections.emptyList());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index 75a687387..8ec535f19 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
@@ -42,7 +43,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -74,7 +74,7 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
private final Map<String, String> tableConfig;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
- @Nullable private final String includingTables;
+ private final String includingTables;
public MongoDBSyncDatabaseAction(
Map<String, String> mongodbConfig,
@@ -112,7 +112,11 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
MongoDBSource<String> source =
MongoDBActionUtils.buildMongodbSource(
- mongodbConfig, buildTableList(excludedTables));
+ mongodbConfig,
+ CdcActionCommonUtils.combinedModeTableList(
+
mongodbConfig.get(MongoDBSourceOptions.DATABASE),
+ includingTables,
+ excludedTables));
EventParser.Factory<RichCdcMultiplexRecord> parserFactory;
RichCdcMultiplexRecordSchemaBuilder schemaBuilder =
@@ -164,46 +168,6 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
tableSuffix));
}
- private String buildTableList(List<Identifier> excludedTables) {
- String separatorRex = "\\.";
- // In COMBINED mode, we should consider both existed tables and
possible newly added
- // tables, so we should use regular expression to monitor all valid
tables and exclude
- // certain invalid tables
-
- // The table list is built by template:
- // (?!(^db\\.tbl$)|(^...$))(databasePattern\\.(including_pattern1|...))
-
- // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables
whose qualified name
- // is exactly equal to 'db.tbl'
- // The including pattern databasePattern\\.(including_pattern1|...)
can include tables
- // whose qualified name matches one of the patterns
-
- // a table can be monitored only when its name meets the including
pattern and doesn't
- // be excluded by excluding pattern at the same time
- String includingPattern =
- String.format(
- "%s%s(%s)",
- mongodbConfig.get(MongoDBSourceOptions.DATABASE),
- separatorRex,
- includingTables);
- if (excludedTables.isEmpty()) {
- return includingPattern;
- }
-
- String excludingPattern =
- excludedTables.stream()
- .map(
- t ->
- String.format(
- "(^%s$)",
- t.getDatabaseName()
- + separatorRex
- + t.getObjectName()))
- .collect(Collectors.joining("|"));
- excludingPattern = "?!" + excludingPattern;
- return String.format("(%s)(%s)", excludingPattern, includingPattern);
- }
-
@VisibleForTesting
public Map<String, String> tableConfig() {
return tableConfig;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
index 8c70ba4c1..c266c163f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongodbSchema.java
@@ -34,7 +34,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
@@ -57,13 +56,13 @@ public class MongodbSchema {
private static final String ID_FIELD = "_id";
private final String databaseName;
private final String tableName;
- private final Map<String, DataType> fields;
+ private final LinkedHashMap<String, DataType> fields;
private final List<String> primaryKeys;
public MongodbSchema(
String databaseName,
String tableName,
- Map<String, DataType> fields,
+ LinkedHashMap<String, DataType> fields,
List<String> primaryKeys) {
this.databaseName = databaseName;
this.tableName = tableName;
@@ -79,7 +78,7 @@ public class MongodbSchema {
return databaseName;
}
- public Map<String, DataType> fields() {
+ public LinkedHashMap<String, DataType> fields() {
return fields;
}
@@ -105,7 +104,8 @@ public class MongodbSchema {
private static MongodbSchema createSchemaFromSpecifiedConfig(Configuration
mongodbConfig) {
String[] columnNames = mongodbConfig.get(FIELD_NAME).split(",");
- Map<String, DataType> schemaFields =
generateSchemaFields(Arrays.asList(columnNames));
+ LinkedHashMap<String, DataType> schemaFields =
+ generateSchemaFields(Arrays.asList(columnNames));
String databaseName = mongodbConfig.get(MongoDBSourceOptions.DATABASE);
String collectionName =
mongodbConfig.get(MongoDBSourceOptions.COLLECTION);
return new MongodbSchema(
@@ -129,8 +129,8 @@ public class MongodbSchema {
return document != null ? new ArrayList<>(document.keySet()) :
Collections.emptyList();
}
- private static Map<String, DataType> generateSchemaFields(List<String>
columnNames) {
- Map<String, DataType> schemaFields = new LinkedHashMap<>();
+ private static LinkedHashMap<String, DataType>
generateSchemaFields(List<String> columnNames) {
+ LinkedHashMap<String, DataType> schemaFields = new LinkedHashMap<>();
for (String columnName : columnNames) {
schemaFields.put(columnName, DataTypes.STRING());
}
@@ -139,9 +139,11 @@ public class MongodbSchema {
private static MongodbSchema createMongodbSchema(
String databaseName, String collectionName, List<String>
columnNames) {
- Map<String, DataType> schemaFields = generateSchemaFields(columnNames);
return new MongodbSchema(
- databaseName, collectionName, schemaFields,
Collections.singletonList(ID_FIELD));
+ databaseName,
+ collectionName,
+ generateSchemaFields(columnNames),
+ Collections.singletonList(ID_FIELD));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index ffda81304..609e698c8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -39,10 +39,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.FIELD_NAME;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.PARSER_PATH;
import static
org.apache.paimon.flink.action.cdc.mongodb.MongoDBActionUtils.START_MODE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Interface for processing strategies tailored for different MongoDB
versions. */
public interface MongoVersionStrategy {
@@ -71,19 +72,6 @@ public interface MongoVersionStrategy {
return JsonSerdeUtil.parseJsonMap(record, String.class);
}
- default Map<String, String> keyCaseInsensitive(Map<String, String> origin)
{
- Map<String, String> keyCaseInsensitive = new HashMap<>();
- for (Map.Entry<String, String> entry : origin.entrySet()) {
- String fieldName = entry.getKey().toLowerCase();
- checkArgument(
- !keyCaseInsensitive.containsKey(fieldName),
- "Duplicate key appears when converting map keys to
case-insensitive form. Original map is:\n%s",
- origin);
- keyCaseInsensitive.put(fieldName, entry.getValue());
- }
- return keyCaseInsensitive;
- }
-
/**
* Determines the extraction mode and retrieves the row accordingly.
*
@@ -104,22 +92,23 @@ public interface MongoVersionStrategy {
SchemaAcquisitionMode.valueOf(mongodbConfig.getString(START_MODE).toUpperCase());
ObjectNode objectNode = (ObjectNode)
OBJECT_MAPPER.readTree(jsonNode.asText());
JsonNode document = objectNode.set("_id",
objectNode.get("_id").get("$oid"));
+ Map<String, String> row;
switch (mode) {
case SPECIFIED:
- Map<String, String> specifiedRow =
+ row =
parseFieldsFromJsonRecord(
document.toString(),
mongodbConfig.getString(PARSER_PATH),
mongodbConfig.getString(FIELD_NAME),
paimonFieldTypes);
- return caseSensitive ? specifiedRow :
keyCaseInsensitive(specifiedRow);
+ break;
case DYNAMIC:
- Map<String, String> dynamicRow =
- parseAndTypeJsonRow(document.toString(),
paimonFieldTypes);
- return caseSensitive ? dynamicRow :
keyCaseInsensitive(dynamicRow);
+ row = parseAndTypeJsonRow(document.toString(),
paimonFieldTypes);
+ break;
default:
throw new RuntimeException("Unsupported extraction mode: " +
mode);
}
+ return mapKeyCaseConvert(row, caseSensitive,
recordKeyDuplicateErrMsg(row));
}
/**
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 73e83a253..e8f3e7c3f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -19,17 +19,14 @@
package org.apache.paimon.flink.action.cdc.mysql;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchema;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlTableInfo;
-import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
-import org.apache.paimon.utils.Pair;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
@@ -44,8 +41,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.kafka.connect.json.JsonConverterConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -59,15 +54,16 @@ import java.util.Properties;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TINYINT1_NOT_BOOL;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Utils for MySQL Action. * */
+/** Utils for MySQL Action. */
public class MySqlActionUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(MySqlActionUtils.class);
public static final ConfigOption<Boolean> SCAN_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.newly-added-table.enabled")
.booleanType()
@@ -132,106 +128,29 @@ public class MySqlActionUtils {
return mySqlSchemasInfo;
}
- static void assertSchemaCompatible(TableSchema paimonSchema, Schema
mySqlSchema) {
- if (!schemaCompatible(paimonSchema, mySqlSchema)) {
- throw new IllegalArgumentException(
- "Paimon schema and MySQL schema are not compatible.\n"
- + "Paimon fields are: "
- + paimonSchema.fields()
- + ".\nMySQL fields are: "
- + mySqlSchema.fields());
- }
- }
-
- static boolean schemaCompatible(TableSchema paimonSchema, Schema
mySqlSchema) {
- for (DataField field : mySqlSchema.fields()) {
- int idx = paimonSchema.fieldNames().indexOf(field.name());
- if (idx < 0) {
- LOG.info("Cannot find field '{}' in Paimon table.",
field.name());
- return false;
- }
- DataType type = paimonSchema.fields().get(idx).type();
- if (UpdatedDataFieldsProcessFunction.canConvert(field.type(), type)
- != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT)
{
- LOG.info(
- "Cannot convert field '{}' from MySQL type '{}' to
Paimon type '{}'.",
- field.name(),
- field.type(),
- type);
- return false;
- }
- }
- return true;
- }
-
static Schema buildPaimonSchema(
MySqlTableInfo mySqlTableInfo,
List<String> specifiedPartitionKeys,
List<String> specifiedPrimaryKeys,
List<ComputedColumn> computedColumns,
- Map<String, String> paimonConfig,
+ Map<String, String> tableConfig,
boolean caseSensitive) {
- Schema.Builder builder = Schema.newBuilder();
- builder.options(paimonConfig);
MySqlSchema mySqlSchema = mySqlTableInfo.schema();
+ LinkedHashMap<String, DataType> sourceColumns =
+ mapKeyCaseConvert(
+ mySqlSchema.typeMapping(),
+ caseSensitive,
+ columnDuplicateErrMsg(mySqlTableInfo.location()));
+ List<String> primaryKeys = listCaseConvert(mySqlSchema.primaryKeys(),
caseSensitive);
- // build columns and primary keys from mySqlSchema
- LinkedHashMap<String, Pair<DataType, String>> mySqlFields;
- List<String> mySqlPrimaryKeys;
- if (caseSensitive) {
- mySqlFields = mySqlSchema.fields();
- mySqlPrimaryKeys = mySqlSchema.primaryKeys();
- } else {
- mySqlFields = new LinkedHashMap<>();
- for (Map.Entry<String, Pair<DataType, String>> entry :
- mySqlSchema.fields().entrySet()) {
- String fieldName = entry.getKey();
- checkArgument(
- !mySqlFields.containsKey(fieldName.toLowerCase()),
- String.format(
- "Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
- fieldName, mySqlTableInfo.location()));
- mySqlFields.put(fieldName.toLowerCase(), entry.getValue());
- }
- mySqlPrimaryKeys =
- mySqlSchema.primaryKeys().stream()
- .map(String::toLowerCase)
- .collect(Collectors.toList());
- }
-
- for (Map.Entry<String, Pair<DataType, String>> entry :
mySqlFields.entrySet()) {
- builder.column(entry.getKey(), entry.getValue().getLeft(),
entry.getValue().getRight());
- }
-
- for (ComputedColumn computedColumn : computedColumns) {
- builder.column(computedColumn.columnName(),
computedColumn.columnType());
- }
-
- if (specifiedPrimaryKeys.size() > 0) {
- for (String key : specifiedPrimaryKeys) {
- if (!mySqlFields.containsKey(key)
- && computedColumns.stream().noneMatch(c ->
c.columnName().equals(key))) {
- throw new IllegalArgumentException(
- "Specified primary key "
- + key
- + " does not exist in MySQL tables or
computed columns.");
- }
- }
- builder.primaryKey(specifiedPrimaryKeys);
- } else if (mySqlPrimaryKeys.size() > 0) {
- builder.primaryKey(mySqlPrimaryKeys);
- } else {
- throw new IllegalArgumentException(
- "Primary keys are not specified. "
- + "Also, can't infer primary keys from MySQL table
schemas because "
- + "MySQL tables have no primary keys or have
different primary keys.");
- }
-
- if (specifiedPartitionKeys.size() > 0) {
- builder.partitionKeys(specifiedPartitionKeys);
- }
-
- return builder.build();
+ return CdcActionCommonUtils.buildPaimonSchema(
+ specifiedPartitionKeys,
+ specifiedPrimaryKeys,
+ computedColumns,
+ tableConfig,
+ sourceColumns,
+ mySqlSchema.comments(),
+ primaryKeys);
}
static MySqlSource<String> buildMySqlSource(Configuration mySqlConfig,
String tableList) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 498e5f099..4bdfc3671 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -74,9 +74,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link EventParser} for MySQL Debezium JSON. */
public class MySqlDebeziumJsonEventParser implements EventParser<String> {
@@ -284,13 +285,13 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
Map<String, String> before = extractRow(payload.get("before"));
if (before.size() > 0) {
- before = caseSensitive ? before : keyCaseInsensitive(before);
+ before = mapKeyCaseConvert(before, caseSensitive,
recordKeyDuplicateErrMsg(before));
records.add(new CdcRecord(RowKind.DELETE, before));
}
Map<String, String> after = extractRow(payload.get("after"));
if (after.size() > 0) {
- after = caseSensitive ? after : keyCaseInsensitive(after);
+ after = mapKeyCaseConvert(after, caseSensitive,
recordKeyDuplicateErrMsg(after));
records.add(new CdcRecord(RowKind.INSERT, after));
}
@@ -461,19 +462,6 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
return resultMap;
}
- private Map<String, String> keyCaseInsensitive(Map<String, String> origin)
{
- Map<String, String> keyCaseInsensitive = new HashMap<>();
- for (Map.Entry<String, String> entry : origin.entrySet()) {
- String fieldName = entry.getKey().toLowerCase();
- checkArgument(
- !keyCaseInsensitive.containsKey(fieldName),
- "Duplicate key appears when converting map keys to
case-insensitive form. Original map is:\n%s",
- origin);
- keyCaseInsensitive.put(fieldName, entry.getValue());
- }
- return keyCaseInsensitive;
- }
-
private boolean shouldSynchronizeCurrentTable() {
if (excludedTables.contains(currentTable)) {
return false;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index d18755dd3..66eaa6614 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -56,8 +56,9 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.schemaCompatible;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.tableList;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -254,7 +255,14 @@ public class MySqlSyncDatabaseAction extends ActionBase {
+ "MySQL database are not compatible with those of
existed Paimon tables. Please check the log.");
MySqlSource<String> source =
- MySqlActionUtils.buildMySqlSource(mySqlConfig,
buildTableList());
+ MySqlActionUtils.buildMySqlSource(
+ mySqlConfig,
+ tableList(
+ mode,
+
mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
+ includingTables,
+ monitoredTables,
+ excludedTables));
String serverTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
ZoneId zoneId = serverTimeZone == null ? ZoneId.systemDefault() :
ZoneId.of(serverTimeZone);
@@ -338,7 +346,7 @@ public class MySqlSyncDatabaseAction extends ActionBase {
private boolean shouldMonitorTable(
TableSchema tableSchema, Schema mySqlSchema, Supplier<String>
errMsg) {
- if (MySqlActionUtils.schemaCompatible(tableSchema, mySqlSchema)) {
+ if (schemaCompatible(tableSchema, mySqlSchema.fields())) {
return true;
} else if (ignoreIncompatible) {
LOG.warn(errMsg.get() + "This table will be ignored.");
@@ -383,59 +391,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
return tableConfig;
}
- /**
- * See {@link
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils#discoverCapturedTables}
- * and {@code MySqlSyncDatabaseTableListITCase}.
- */
- private String buildTableList() {
- String separatorRex = "\\.";
- if (mode == DIVIDED) {
- // In DIVIDED mode, we only concern about existed tables
- return monitoredTables.stream()
- .map(t -> t.getDatabaseName() + separatorRex +
t.getObjectName())
- .collect(Collectors.joining("|"));
- } else if (mode == COMBINED) {
- // In COMBINED mode, we should consider both existed tables and
possible newly created
- // tables, so we should use regular expression to monitor all
valid tables and exclude
- // certain invalid tables
-
- // The table list is built by template:
- //
(?!(^db\\.tbl$)|(^...$))((databasePattern)\\.(including_pattern1|...))
-
- // The excluding pattern ?!(^db\\.tbl$)|(^...$) can exclude tables
whose qualified name
- // is exactly equal to 'db.tbl'
- // The including pattern
(databasePattern)\\.(including_pattern1|...) can include tables
- // whose qualified name matches one of the patterns
-
- // a table can be monitored only when its name meets the including
pattern and doesn't
- // be excluded by excluding pattern at the same time
- String includingPattern =
- String.format(
- "(%s)%s(%s)",
- mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME),
- separatorRex,
- includingTables);
- if (excludedTables.isEmpty()) {
- return includingPattern;
- }
-
- String excludingPattern =
- excludedTables.stream()
- .map(
- t ->
- String.format(
- "(^%s$)",
- t.getDatabaseName()
- + separatorRex
- +
t.getObjectName()))
- .collect(Collectors.joining("|"));
- excludingPattern = "?!" + excludingPattern;
- return String.format("(%s)(%s)", excludingPattern,
includingPattern);
- }
-
- throw new UnsupportedOperationException("Unknown DatabaseSyncMode: " +
mode);
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index aef0597ec..3fca52255 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
+import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.schema.MySqlSchemasInfo;
@@ -189,7 +190,7 @@ public class MySqlSyncTableAction extends ActionBase {
computedFields,
fieldNames);
}
- MySqlActionUtils.assertSchemaCompatible(table.schema(), fromMySql);
+ CdcActionCommonUtils.assertSchemaCompatible(table.schema(),
fromMySql.fields());
} catch (Catalog.TableNotExistException e) {
catalog.createTable(identifier, fromMySql, false);
table = (FileStoreTable) catalog.getTable(identifier);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index 3779d1101..888bbaf73 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -31,10 +31,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode> {
@@ -81,22 +82,8 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
primaryKeys.add(primary.asText());
}
- if (!caseSensitive) {
- LinkedHashMap<String, DataType> tmp = new LinkedHashMap<>();
- for (Map.Entry<String, DataType> entry : fields.entrySet()) {
- String fieldName = entry.getKey();
- checkArgument(
- !tmp.containsKey(fieldName.toLowerCase()),
- "Duplicate key '%s' in table '%s' appears when
converting fields map keys to case-insensitive form.",
- fieldName,
- tableName);
- tmp.put(fieldName.toLowerCase(), entry.getValue());
- }
- fields = tmp;
-
- primaryKeys =
-
primaryKeys.stream().map(String::toLowerCase).collect(Collectors.toList());
- }
+ fields = mapKeyCaseConvert(fields, caseSensitive,
columnDuplicateErrMsg(tableName));
+ primaryKeys = listCaseConvert(primaryKeys, caseSensitive);
Schema.Builder builder = Schema.newBuilder();
builder.options(tableConfig);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
index 5ca2d877b..310fe53ed 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/schema/MySqlSchema.java
@@ -28,7 +28,6 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -94,12 +93,18 @@ public class MySqlSchema {
return primaryKeys;
}
- public Map<String, DataType> typeMapping() {
- Map<String, DataType> typeMapping = new HashMap<>();
+ public LinkedHashMap<String, DataType> typeMapping() {
+ LinkedHashMap<String, DataType> typeMapping = new LinkedHashMap<>();
fields.forEach((name, pair) -> typeMapping.put(name, pair.getLeft()));
return typeMapping;
}
+ public List<String> comments() {
+ List<String> comments = new ArrayList<>();
+ fields.forEach((name, pair) -> comments.add(pair.getRight()));
+ return comments;
+ }
+
public MySqlSchema merge(String currentTable, String otherTable,
MySqlSchema other) {
for (Map.Entry<String, Pair<DataType, String>> entry :
other.fields.entrySet()) {
String fieldName = entry.getKey();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 520917dc1..6b50274e9 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -643,9 +643,9 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Paimon schema and Kafka schema are not
compatible.\n"
+ "Paimon schema and source table schema are not
compatible.\n"
+ "Paimon fields are: [`k` STRING NOT
NULL, `v1` STRING].\n"
- + "Kafka fields are: [`pt` INT NOT
NULL, `_id` INT NOT NULL, `v1` VARCHAR(10)]"));
+ + "Source table fields are: [`pt` INT
NOT NULL, `_id` INT NOT NULL, `v1` VARCHAR(10)]"));
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
index 380582c1b..3bebaddd3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaOggSyncTableActionITCase.java
@@ -202,9 +202,9 @@ public class KafkaOggSyncTableActionITCase extends
KafkaActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Paimon schema and Kafka schema are not
compatible.\n"
+ "Paimon schema and source table schema are not
compatible.\n"
+ "Paimon fields are: [`k` STRING NOT
NULL, `v1` STRING].\n"
- + "Kafka fields are: [`id` STRING NOT
NULL, `name` STRING, `description` STRING, `weight` STRING]"));
+ + "Source table fields are: [`id`
STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]"));
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 99c6b91d5..a22b568f0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -644,11 +644,11 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Paimon schema and MySQL schema are not
compatible."));
+ "Paimon schema and source table schema are not
compatible."));
}
@Test
- public void testInvalidPrimaryKey() {
+ public void testInvalidPrimaryKey() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "schema_evolution_\\d+");
@@ -660,7 +660,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
- "Specified primary key pk does not exist in
MySQL tables or computed columns."));
+ "Specified primary key 'pk' does not exist in
source tables or computed columns."));
}
@Test
@@ -676,8 +676,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
anyCauseMatches(
IllegalArgumentException.class,
"Primary keys are not specified. "
- + "Also, can't infer primary keys from
MySQL table schemas because "
- + "MySQL tables have no primary keys
or have different primary keys."));
+ + "Also, can't infer primary keys from
source table schemas because "
+ + "source tables have no primary keys
or have different primary keys."));
}
@Test