This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6a1093123 [cdc] Handle case-insensitive in separate operator instead
of record parsers (#3551)
6a1093123 is described below
commit 6a1093123ad3457ca34205352b3362367a9ab3f9
Author: yuzelin <[email protected]>
AuthorDate: Thu Jun 20 11:22:42 2024 +0800
[cdc] Handle case-insensitive in separate operator instead of record
parsers (#3551)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 131 +++++----------------
.../flink/action/cdc/MessageQueueSchemaUtils.java | 3 +-
.../flink/action/cdc/SyncDatabaseActionBase.java | 2 +-
.../paimon/flink/action/cdc/SyncJobHandler.java | 17 +--
.../flink/action/cdc/SyncTableActionBase.java | 3 +-
.../paimon/flink/action/cdc/format/DataFormat.java | 5 +-
.../flink/action/cdc/format/RecordParser.java | 22 +---
.../action/cdc/format/RecordParserFactory.java | 4 +-
.../action/cdc/format/canal/CanalRecordParser.java | 5 +-
.../cdc/format/debezium/DebeziumRecordParser.java | 5 +-
.../action/cdc/format/json/JsonRecordParser.java | 5 +-
.../cdc/format/maxwell/MaxwellRecordParser.java | 5 +-
.../action/cdc/format/ogg/OggRecordParser.java | 5 +-
.../action/cdc/mongodb/MongoDBRecordParser.java | 12 +-
.../mongodb/strategy/Mongo4VersionStrategy.java | 11 --
.../flink/action/cdc/mysql/MySqlRecordParser.java | 22 +---
.../action/cdc/postgres/PostgresRecordParser.java | 33 +-----
.../paimon/flink/sink/cdc/CaseSensitiveUtils.java | 75 ++++++++++++
.../paimon/flink/sink/cdc/CdcMultiplexRecord.java | 4 +
.../apache/paimon/flink/sink/cdc/CdcRecord.java | 14 ++-
.../paimon/flink/sink/cdc/CdcSinkBuilder.java | 13 +-
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 18 ++-
.../flink/sink/cdc/NewTableSchemaBuilder.java | 29 ++---
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 20 ++--
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 12 +-
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 119 +++++++++++--------
.../database/case-insensitive/canal-data-1.txt | 1 +
.../test/resources/mysql/sync_database_setup.sql | 19 ++-
28 files changed, 293 insertions(+), 321 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 3c5d586db..bc307cca4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -31,20 +31,19 @@ import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
/** Common utils for CDC Action. */
public class CdcActionCommonUtils {
@@ -100,95 +99,12 @@ public class CdcActionCommonUtils {
return true;
}
- public static List<DataField> fieldNameCaseConvert(
- List<DataField> origin, boolean caseSensitive, String tableName) {
- Set<String> existedFields = new HashSet<>();
- Function<String, String> columnDuplicateErrMsg =
- columnDuplicateErrMsg(tableName == null ? "UNKNOWN" :
tableName);
- return origin.stream()
- .map(
- field -> {
- if (caseSensitive) {
- return field;
- }
- String columnLowerCase =
field.name().toLowerCase();
- if (!existedFields.add(columnLowerCase)) {
- throw new IllegalArgumentException(
-
columnDuplicateErrMsg.apply(field.name()));
- }
- return field.newName(columnLowerCase);
- })
- .collect(Collectors.toList());
- }
-
- public static <T> LinkedHashMap<String, T> mapKeyCaseConvert(
- LinkedHashMap<String, T> origin,
- boolean caseSensitive,
- Function<String, String> duplicateErrMsg) {
- return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg,
LinkedHashMap::new);
- }
-
- public static <T> Map<String, T> mapKeyCaseConvert(
- Map<String, T> origin,
- boolean caseSensitive,
- Function<String, String> duplicateErrMsg) {
- return mapKeyCaseConvert(origin, caseSensitive, duplicateErrMsg,
HashMap::new);
- }
-
- private static <T, M extends Map<String, T>> M mapKeyCaseConvert(
- M origin,
- boolean caseSensitive,
- Function<String, String> duplicateErrMsg,
- Supplier<M> mapSupplier) {
- if (caseSensitive) {
- return origin;
- } else {
- M newMap = mapSupplier.get();
- for (Map.Entry<String, T> entry : origin.entrySet()) {
- String key = entry.getKey();
- if (newMap.containsKey(key.toLowerCase())) {
- throw new
IllegalArgumentException(duplicateErrMsg.apply(key));
- }
- newMap.put(key.toLowerCase(), entry.getValue());
- }
- return newMap;
- }
- }
-
- public static Function<String, String> columnDuplicateErrMsg(String
tableName) {
- return column ->
- String.format(
- "Failed to convert columns of table '%s' to
case-insensitive form because duplicate column found: '%s'.",
- tableName, column);
- }
-
- public static Function<String, String>
recordKeyDuplicateErrMsg(Map<String, String> record) {
- return column ->
- "Failed to convert record map to case-insensitive form because
duplicate column found. Original record map is:\n"
- + record;
- }
-
public static List<String> listCaseConvert(List<String> origin, boolean
caseSensitive) {
return caseSensitive
? origin
:
origin.stream().map(String::toLowerCase).collect(Collectors.toList());
}
- public static String columnCaseConvertAndDuplicateCheck(
- String column,
- Set<String> existedFields,
- boolean caseSensitive,
- Function<String, String> columnDuplicateErrMsg) {
- if (caseSensitive) {
- return column;
- }
- String columnLowerCase = column.toLowerCase();
- if (!existedFields.add(columnLowerCase)) {
- throw new
IllegalArgumentException(columnDuplicateErrMsg.apply(column));
- }
- return columnLowerCase;
- }
-
public static Schema buildPaimonSchema(
String tableName,
List<String> specifiedPartitionKeys,
@@ -206,36 +122,30 @@ public class CdcActionCommonUtils {
builder.options(sourceSchema.options());
// fields
- Set<String> existedFields = new HashSet<>();
- Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(tableName);
+ List<String> allFieldNames = new ArrayList<>();
for (DataField field : sourceSchema.fields()) {
- String fieldName =
- columnCaseConvertAndDuplicateCheck(
- field.name(), existedFields, caseSensitive,
columnDuplicateErrMsg);
+ String fieldName = caseSensitiveConversion(field.name(),
caseSensitive);
+ allFieldNames.add(fieldName);
builder.column(fieldName, field.type(), field.description());
}
for (ComputedColumn computedColumn : computedColumns) {
String computedColumnName =
- columnCaseConvertAndDuplicateCheck(
- computedColumn.columnName(),
- existedFields,
- caseSensitive,
- columnDuplicateErrMsg);
+ caseSensitiveConversion(computedColumn.columnName(),
caseSensitive);
+ allFieldNames.add(computedColumnName);
builder.column(computedColumnName, computedColumn.columnType());
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
String metadataColumnName =
- columnCaseConvertAndDuplicateCheck(
- metadataConverter.columnName(),
- existedFields,
- caseSensitive,
- columnDuplicateErrMsg);
+ caseSensitiveConversion(metadataConverter.columnName(),
caseSensitive);
+ allFieldNames.add(metadataColumnName);
builder.column(metadataColumnName, metadataConverter.dataType());
}
+ checkDuplicateFields(tableName, allFieldNames);
+
// primary keys
if (!specifiedPrimaryKeys.isEmpty()) {
Set<String> sourceColumns =
@@ -272,6 +182,21 @@ public class CdcActionCommonUtils {
return builder.build();
}
+ public static void checkDuplicateFields(String tableName, List<String>
fieldNames) {
+ List<String> duplicates =
+ fieldNames.stream()
+ .filter(name -> Collections.frequency(fieldNames,
name) > 1)
+ .collect(Collectors.toList());
+ checkState(
+ duplicates.isEmpty(),
+ "Table %s contains duplicate columns: %s.\n"
+ + "Possible causes are: "
+ + "1. computed columns or metadata columns contain
duplicate fields; "
+ + "2. the catalog is case-insensitive and the table
columns duplicate after they are all converted to lower-case.",
+ tableName,
+ duplicates);
+ }
+
public static String tableList(
MultiTablesSinkMode mode,
String databasePattern,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
index 73e63bcd0..c517670e4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java
@@ -49,8 +49,7 @@ public class MessageQueueSchemaUtils {
int retry = 0;
int retryInterval = 1000;
- RecordParser recordParser =
- dataFormat.createParser(true, typeMapping,
Collections.emptyList());
+ RecordParser recordParser = dataFormat.createParser(typeMapping,
Collections.emptyList());
while (true) {
Optional<Schema> schema =
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 4fa6ca76c..4c7f8ea27 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -112,7 +112,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
recordParse() {
return syncJobHandler.provideRecordParser(
- caseSensitive, Collections.emptyList(), typeMapping,
metadataConverters);
+ Collections.emptyList(), typeMapping, metadataConverters);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
index c674e560b..6116d8ba5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncJobHandler.java
@@ -193,31 +193,22 @@ public class SyncJobHandler {
}
public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
provideRecordParser(
- boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
switch (sourceType) {
case MYSQL:
return new MySqlRecordParser(
- cdcSourceConfig,
- caseSensitive,
- computedColumns,
- typeMapping,
- metadataConverters);
+ cdcSourceConfig, computedColumns, typeMapping,
metadataConverters);
case POSTGRES:
return new PostgresRecordParser(
- cdcSourceConfig,
- caseSensitive,
- computedColumns,
- typeMapping,
- metadataConverters);
+ cdcSourceConfig, computedColumns, typeMapping,
metadataConverters);
case KAFKA:
case PULSAR:
DataFormat dataFormat = provideDataFormat();
- return dataFormat.createParser(caseSensitive, typeMapping,
computedColumns);
+ return dataFormat.createParser(typeMapping, computedColumns);
case MONGODB:
- return new MongoDBRecordParser(caseSensitive, computedColumns,
cdcSourceConfig);
+ return new MongoDBRecordParser(computedColumns,
cdcSourceConfig);
default:
throw new UnsupportedOperationException("Unknown source type "
+ sourceType);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index f4e5bfe6a..1a847d3a0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -156,8 +156,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
recordParse() {
- return syncJobHandler.provideRecordParser(
- caseSensitive, computedColumns, typeMapping,
metadataConverters);
+ return syncJobHandler.provideRecordParser(computedColumns,
typeMapping, metadataConverters);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
index e7e588e4e..2af44c902 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java
@@ -53,13 +53,12 @@ public enum DataFormat {
* Creates a new instance of {@link RecordParser} for this data format
with the specified
* configurations.
*
- * @param caseSensitive Indicates whether the parser should be
case-sensitive.
* @param computedColumns List of computed columns to be considered by the
parser.
* @return A new instance of {@link RecordParser}.
*/
public RecordParser createParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- return parser.createParser(caseSensitive, typeMapping,
computedColumns);
+ TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
+ return parser.createParser(typeMapping, computedColumns);
}
public static DataFormat fromConfigString(String format) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index 38076428f..ea9fa5492 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -51,10 +51,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static org.apache.paimon.utils.JsonSerdeUtil.convertValue;
import static org.apache.paimon.utils.JsonSerdeUtil.getNodeAs;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
@@ -75,15 +71,12 @@ public abstract class RecordParser
protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";
- private final boolean caseSensitive;
protected final TypeMapping typeMapping;
protected final List<ComputedColumn> computedColumns;
protected JsonNode root;
- public RecordParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- this.caseSensitive = caseSensitive;
+ public RecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
}
@@ -203,15 +196,12 @@ public abstract class RecordParser
/** Handle case sensitivity here. */
private RichCdcMultiplexRecord createRecord(
RowKind rowKind, Map<String, String> data, List<DataField>
paimonFields) {
- String databaseName = getDatabaseName();
- String tableName = getTableName();
- paimonFields = fieldNameCaseConvert(paimonFields, caseSensitive,
tableName);
-
- data = mapKeyCaseConvert(data, caseSensitive,
recordKeyDuplicateErrMsg(data));
- List<String> primaryKeys = listCaseConvert(extractPrimaryKeys(),
caseSensitive);
-
return new RichCdcMultiplexRecord(
- databaseName, tableName, paimonFields, primaryKeys, new
CdcRecord(rowKind, data));
+ getDatabaseName(),
+ getTableName(),
+ paimonFields,
+ extractPrimaryKeys(),
+ new CdcRecord(rowKind, data));
}
protected void setRoot(CdcSourceRecord record) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
index 29c3aafb2..612dfefd6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParserFactory.java
@@ -37,11 +37,9 @@ public interface RecordParserFactory {
/**
* Creates a new instance of {@link RecordParser} with the specified
configurations.
*
- * @param caseSensitive Indicates whether the parser should be
case-sensitive.
* @param typeMapping Data type mapping options.
* @param computedColumns List of computed columns to be considered by the
parser.
* @return A new instance of {@link RecordParser}.
*/
- RecordParser createParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns);
+ RecordParser createParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 642dc4241..76671739a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -82,9 +82,8 @@ public class CanalRecordParser extends RecordParser {
return !isNull(node) && node.asBoolean();
}
- public CanalRecordParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, computedColumns);
+ public CanalRecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
+ super(typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index ec23132ff..c2b658754 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -83,9 +83,8 @@ public class DebeziumRecordParser extends RecordParser {
private final Map<String, String> classNames = new HashMap<>();
private final Map<String, Map<String, String>> parameters = new
HashMap<>();
- public DebeziumRecordParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, computedColumns);
+ public DebeziumRecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
+ super(typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
index e7f30c0b0..1a67f82f4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/json/JsonRecordParser.java
@@ -36,9 +36,8 @@ import java.util.List;
*/
public class JsonRecordParser extends RecordParser {
- public JsonRecordParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, computedColumns);
+ public JsonRecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
+ super(typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
index 7379f81a5..0c9393ee6 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/maxwell/MaxwellRecordParser.java
@@ -50,9 +50,8 @@ public class MaxwellRecordParser extends RecordParser {
private static final String OP_UPDATE = "update";
private static final String OP_DELETE = "delete";
- public MaxwellRecordParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, computedColumns);
+ public MaxwellRecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
+ super(typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
index 019ae1562..bc1efdf7c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/ogg/OggRecordParser.java
@@ -58,9 +58,8 @@ public class OggRecordParser extends RecordParser {
private static final String OP_INSERT = "I";
private static final String OP_DELETE = "D";
- public OggRecordParser(
- boolean caseSensitive, TypeMapping typeMapping,
List<ComputedColumn> computedColumns) {
- super(caseSensitive, typeMapping, computedColumns);
+ public OggRecordParser(TypeMapping typeMapping, List<ComputedColumn>
computedColumns) {
+ super(typeMapping, computedColumns);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
index 9264e409f..251fd93bb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBRecordParser.java
@@ -60,15 +60,10 @@ public class MongoDBRecordParser
private static final String FIELD_NAMESPACE = "ns";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final List<ComputedColumn> computedColumns;
- private final boolean caseSensitive;
private final Configuration mongodbConfig;
private JsonNode root;
- public MongoDBRecordParser(
- boolean caseSensitive,
- List<ComputedColumn> computedColumns,
- Configuration mongodbConfig) {
- this.caseSensitive = caseSensitive;
+ public MongoDBRecordParser(List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
this.computedColumns = computedColumns;
this.mongodbConfig = mongodbConfig;
}
@@ -81,7 +76,7 @@ public class MongoDBRecordParser
String collection = extractString(FIELD_TABLE);
MongoVersionStrategy versionStrategy =
VersionStrategyFactory.create(
- databaseName, collection, caseSensitive,
computedColumns, mongodbConfig);
+ databaseName, collection, computedColumns,
mongodbConfig);
versionStrategy.extractRecords(root).forEach(out::collect);
}
@@ -93,7 +88,6 @@ public class MongoDBRecordParser
static MongoVersionStrategy create(
String databaseName,
String collection,
- boolean caseSensitive,
List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
// TODO: When MongoDB CDC is upgraded to 2.5, uncomment the
version check logic
@@ -101,7 +95,7 @@ public class MongoDBRecordParser
// return new Mongo6VersionStrategy(databaseName, collection,
caseSensitive);
// }
return new Mongo4VersionStrategy(
- databaseName, collection, caseSensitive, computedColumns,
mongodbConfig);
+ databaseName, collection, computedColumns, mongodbConfig);
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index e64cf3e8a..450eacaef 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -34,10 +34,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
-
/**
* Implementation class for extracting records from MongoDB versions greater
than 4.x and less than
* 6.x.
@@ -53,19 +49,16 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
private static final String OP_DELETE = "delete";
private final String databaseName;
private final String collection;
- private final boolean caseSensitive;
private final Configuration mongodbConfig;
private final List<ComputedColumn> computedColumns;
public Mongo4VersionStrategy(
String databaseName,
String collection,
- boolean caseSensitive,
List<ComputedColumn> computedColumns,
Configuration mongodbConfig) {
this.databaseName = databaseName;
this.collection = collection;
- this.caseSensitive = caseSensitive;
this.mongodbConfig = mongodbConfig;
this.computedColumns = computedColumns;
}
@@ -133,11 +126,7 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
RowType.Builder rowTypeBuilder = RowType.builder();
Map<String, String> record =
getExtractRow(fullDocument, rowTypeBuilder, computedColumns,
mongodbConfig);
-
- record = mapKeyCaseConvert(record, caseSensitive,
recordKeyDuplicateErrMsg(record));
-
List<DataField> fields = rowTypeBuilder.build().getFields();
- fields = fieldNameCaseConvert(fields, caseSensitive, collection);
return new RichCdcMultiplexRecord(
databaseName,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 745a7932d..3f54db541 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -58,13 +58,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
@@ -78,7 +72,6 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
- private final boolean caseSensitive;
private final List<ComputedColumn> computedColumns;
private final TypeMapping typeMapping;
@@ -93,11 +86,9 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
public MySqlRecordParser(
Configuration mySqlConfig,
- boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
- this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.typeMapping = typeMapping;
this.metadataConverters = metadataConverters;
@@ -162,7 +153,7 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
Table table = tableChange.getTable();
List<DataField> fields = extractFields(table);
- List<String> primaryKeys =
listCaseConvert(table.primaryKeyColumnNames(), caseSensitive);
+ List<String> primaryKeys = table.primaryKeyColumnNames();
// TODO : add table comment and column comment when we upgrade flink
cdc to 2.4
return Collections.singletonList(
@@ -173,15 +164,8 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
private List<DataField> extractFields(Table table) {
RowType.Builder rowType = RowType.builder();
List<Column> columns = table.columns();
- Set<String> existedFields = new HashSet<>();
- Function<String, String> columnDuplicateErrMsg =
- columnDuplicateErrMsg(table.id().toString());
for (Column column : columns) {
- String columnName =
- columnCaseConvertAndDuplicateCheck(
- column.name(), existedFields, caseSensitive,
columnDuplicateErrMsg);
-
DataType dataType =
MySqlTypeUtils.toDataType(
column.typeExpression(),
@@ -190,7 +174,7 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
typeMapping);
dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) ||
column.isOptional());
- rowType.field(columnName, dataType);
+ rowType.field(column.name(), dataType);
}
return rowType.build().getFields();
}
@@ -203,13 +187,11 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
Map<String, String> before = extractRow(root.payload().before());
if (!before.isEmpty()) {
- before = mapKeyCaseConvert(before, caseSensitive,
recordKeyDuplicateErrMsg(before));
records.add(createRecord(RowKind.DELETE, before));
}
Map<String, String> after = extractRow(root.payload().after());
if (!after.isEmpty()) {
- after = mapKeyCaseConvert(after, caseSensitive,
recordKeyDuplicateErrMsg(after));
records.add(createRecord(RowKind.INSERT, after));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 0299d57f7..0d672e85d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -64,17 +64,10 @@ import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.decimalLogicalName;
@@ -91,7 +84,6 @@ public class PostgresRecordParser
private final ObjectMapper objectMapper = new ObjectMapper();
private final ZoneId serverTimeZone;
- private final boolean caseSensitive;
private final List<ComputedColumn> computedColumns;
private final TypeMapping typeMapping;
@@ -104,24 +96,9 @@ public class PostgresRecordParser
public PostgresRecordParser(
Configuration postgresConfig,
- boolean caseSensitive,
- TypeMapping typeMapping,
- CdcMetadataConverter[] metadataConverters) {
- this(
- postgresConfig,
- caseSensitive,
- Collections.emptyList(),
- typeMapping,
- metadataConverters);
- }
-
- public PostgresRecordParser(
- Configuration postgresConfig,
- boolean caseSensitive,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
- this.caseSensitive = caseSensitive;
this.computedColumns = computedColumns;
this.typeMapping = typeMapping;
this.metadataConverters = metadataConverters;
@@ -155,20 +132,14 @@ public class PostgresRecordParser
+ "in the JsonDebeziumDeserializationSchema you
created");
RowType.Builder rowType = RowType.builder();
- Set<String> existedFields = new HashSet<>();
- Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(currentTable);
afterFields.forEach(
(key, value) -> {
- String columnName =
- columnCaseConvertAndDuplicateCheck(
- key, existedFields, caseSensitive,
columnDuplicateErrMsg);
-
DataType dataType = extractFieldType(value);
dataType =
dataType.copy(
typeMapping.containsMode(TO_NULLABLE) ||
value.optional());
- rowType.field(columnName, dataType);
+ rowType.field(key, dataType);
});
return rowType.build().getFields();
}
@@ -241,13 +212,11 @@ public class PostgresRecordParser
Map<String, String> before = extractRow(root.payload().before());
if (!before.isEmpty()) {
- before = mapKeyCaseConvert(before, caseSensitive,
recordKeyDuplicateErrMsg(before));
records.add(createRecord(RowKind.DELETE, before));
}
Map<String, String> after = extractRow(root.payload().after());
if (!after.isEmpty()) {
- after = mapKeyCaseConvert(after, caseSensitive,
recordKeyDuplicateErrMsg(after));
List<DataField> fields = extractFields(root.schema());
records.add(
new RichCdcMultiplexRecord(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
new file mode 100644
index 000000000..49890a11c
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.catalog.Catalog;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+/** Add convert operator if the catalog is case-insensitive. */
+public class CaseSensitiveUtils {
+
+ public static DataStream<CdcRecord> cdcRecordConvert(
+ Catalog.Loader catalogLoader, DataStream<CdcRecord> input) {
+ if (caseSensitive(catalogLoader)) {
+ return input;
+ }
+
+ return input.forward()
+ .process(
+ new ProcessFunction<CdcRecord, CdcRecord>() {
+ @Override
+ public void processElement(
+ CdcRecord record, Context ctx,
Collector<CdcRecord> out) {
+ out.collect(record.fieldNameLowerCase());
+ }
+ })
+ .name("Case-insensitive Convert");
+ }
+
+ public static DataStream<CdcMultiplexRecord> cdcMultiplexRecordConvert(
+ Catalog.Loader catalogLoader, DataStream<CdcMultiplexRecord>
input) {
+ if (caseSensitive(catalogLoader)) {
+ return input;
+ }
+
+ return input.forward()
+ .process(
+ new ProcessFunction<CdcMultiplexRecord,
CdcMultiplexRecord>() {
+ @Override
+ public void processElement(
+ CdcMultiplexRecord record,
+ Context ctx,
+ Collector<CdcMultiplexRecord> out) {
+ out.collect(record.fieldNameLowerCase());
+ }
+ })
+ .name("Case-insensitive Convert");
+ }
+
+ private static boolean caseSensitive(Catalog.Loader catalogLoader) {
+ try (Catalog catalog = catalogLoader.load()) {
+ return catalog.caseSensitive();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
index ed97907ce..ce68f300a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecord.java
@@ -55,6 +55,10 @@ public class CdcMultiplexRecord implements Serializable {
return record;
}
+ public CdcMultiplexRecord fieldNameLowerCase() {
+ return new CdcMultiplexRecord(databaseName, tableName,
record.fieldNameLowerCase());
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof CdcMultiplexRecord)) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
index 61d4ec7a1..9adca753d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecord.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.RowKind;
import java.io.Serializable;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -41,11 +42,6 @@ public class CdcRecord implements Serializable {
this.fields = fields;
}
- public CdcRecord setRowKind(RowKind kind) {
- this.kind = kind;
- return this;
- }
-
public static CdcRecord emptyRecord() {
return new CdcRecord(RowKind.INSERT, Collections.emptyMap());
}
@@ -58,6 +54,14 @@ public class CdcRecord implements Serializable {
return fields;
}
+ public CdcRecord fieldNameLowerCase() {
+ Map<String, String> newFields = new HashMap<>();
+ for (Map.Entry<String, String> entry : fields.entrySet()) {
+ newFields.put(entry.getKey().toLowerCase(), entry.getValue());
+ }
+ return new CdcRecord(kind, newFields);
+ }
+
@Override
public boolean equals(Object o) {
if (!(o instanceof CdcRecord)) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 87d42105a..f45b4f96d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -99,6 +99,7 @@ public class CdcSinkBuilder<T> {
SingleOutputStreamOperator<CdcRecord> parsed =
input.forward()
.process(new
CdcParsingProcessFunction<>(parserFactory))
+ .name("Side Output")
.setParallelism(input.getParallelism());
DataStream<Void> schemaChangeProcessFunction =
@@ -108,18 +109,22 @@ public class CdcSinkBuilder<T> {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(dataTable.fileIO(),
dataTable.location()),
identifier,
- catalogLoader));
+ catalogLoader))
+ .name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
+ DataStream<CdcRecord> converted =
+ CaseSensitiveUtils.cdcRecordConvert(catalogLoader, parsed);
BucketMode bucketMode = dataTable.bucketMode();
switch (bucketMode) {
case HASH_FIXED:
- return buildForFixedBucket(parsed);
+ return buildForFixedBucket(converted);
case HASH_DYNAMIC:
- return new CdcDynamicBucketSink((FileStoreTable)
table).build(parsed, parallelism);
+ return new CdcDynamicBucketSink((FileStoreTable) table)
+ .build(converted, parallelism);
case BUCKET_UNAWARE:
- return buildForUnawareBucket(parsed);
+ return buildForUnawareBucket(converted);
default:
throw new UnsupportedOperationException("Unsupported bucket
mode: " + bucketMode);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index be13e7f67..629b7845d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -158,9 +158,12 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
.process(new
MultiTableUpdatedDataFieldsProcessFunction(catalogLoader))
.name("Schema Evolution");
+ DataStream<CdcMultiplexRecord> converted =
+ CaseSensitiveUtils.cdcMultiplexRecordConvert(catalogLoader,
newlyAddedTableStream);
+
DataStream<CdcMultiplexRecord> partitioned =
partition(
- newlyAddedTableStream,
+ converted,
new CdcMultiplexRecordChannelComputer(catalogLoader),
parallelism);
@@ -186,6 +189,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
SingleOutputStreamOperator<Void> parsed =
input.forward()
.process(new
CdcMultiTableParsingProcessFunction<>(parserFactory))
+ .name("Side Output")
.setParallelism(input.getParallelism());
for (FileStoreTable table : tables) {
@@ -198,7 +202,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
new UpdatedDataFieldsProcessFunction(
new SchemaManager(table.fileIO(),
table.location()),
Identifier.create(database,
table.name()),
- catalogLoader));
+ catalogLoader))
+ .name("Schema Evolution");
schemaChangeProcessFunction.getTransformation().setParallelism(1);
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
@@ -208,16 +213,19 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
CdcMultiTableParsingProcessFunction.createRecordOutputTag(
table.name()));
+ DataStream<CdcRecord> converted =
+ CaseSensitiveUtils.cdcRecordConvert(catalogLoader,
parsedForTable);
+
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
case HASH_FIXED:
- buildForFixedBucket(table, parsedForTable);
+ buildForFixedBucket(table, converted);
break;
case HASH_DYNAMIC:
- new CdcDynamicBucketSink(table).build(parsedForTable,
parallelism);
+ new CdcDynamicBucketSink(table).build(converted,
parallelism);
break;
case BUCKET_UNAWARE:
- buildForUnawareBucket(table, parsedForTable);
+ buildForUnawareBucket(table, converted);
break;
case CROSS_PARTITION:
default:
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index 795fdb0f9..cef7f011a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -23,15 +23,14 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import java.io.Serializable;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.checkDuplicateFields;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
+import static org.apache.paimon.utils.StringUtils.caseSensitiveConversion;
/** Build schema for new table found in database synchronization. */
public class NewTableSchemaBuilder implements Serializable {
@@ -53,30 +52,24 @@ public class NewTableSchemaBuilder implements Serializable {
Schema.Builder builder = Schema.newBuilder();
builder.options(tableConfig);
- String tableName = record.tableName();
- tableName = tableName == null ? "UNKNOWN" : tableName;
-
// fields
- Set<String> existedFields = new HashSet<>();
- Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(tableName);
+ List<String> allFieldNames = new ArrayList<>();
for (DataField dataField : record.fields()) {
- String fieldName =
- columnCaseConvertAndDuplicateCheck(
- dataField.name(), existedFields, caseSensitive,
columnDuplicateErrMsg);
+ String fieldName = caseSensitiveConversion(dataField.name(),
caseSensitive);
+ allFieldNames.add(fieldName);
builder.column(fieldName, dataField.type(),
dataField.description());
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
String metadataColumnName =
- columnCaseConvertAndDuplicateCheck(
- metadataConverter.columnName(),
- existedFields,
- caseSensitive,
- columnDuplicateErrMsg);
+ caseSensitiveConversion(metadataConverter.columnName(),
caseSensitive);
+ allFieldNames.add(metadataColumnName);
builder.column(metadataColumnName, metadataConverter.dataType());
}
+ checkDuplicateFields(record.tableName(), allFieldNames);
+
builder.primaryKey(listCaseConvert(record.primaryKeys(),
caseSensitive));
return Optional.of(builder.build());
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 8fefcab63..3d832d339 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -46,8 +47,10 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
private static final Logger LOG =
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
- protected Catalog catalog;
protected final Catalog.Loader catalogLoader;
+ protected Catalog catalog;
+ private boolean caseSensitive;
+
private static final List<DataTypeRoot> STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
private static final List<DataTypeRoot> BINARY_TYPES =
@@ -73,6 +76,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase<I,
O> extends Process
@Override
public void open(Configuration parameters) {
this.catalog = catalogLoader.load();
+ this.caseSensitive = this.catalog.caseSensitive();
}
protected void applySchemaChange(
@@ -198,8 +202,10 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
List<SchemaChange> result = new ArrayList<>();
for (DataField newField : updatedDataFields) {
- if (oldFields.containsKey(newField.name())) {
- DataField oldField = oldFields.get(newField.name());
+ String newFieldName =
+ StringUtils.caseSensitiveConversion(newField.name(),
caseSensitive);
+ if (oldFields.containsKey(newFieldName)) {
+ DataField oldField = oldFields.get(newFieldName);
// we compare by ignoring nullable, because partition keys and
primary keys might be
// nullable in source database, but they can't be null in
Paimon
if (oldField.type().equalsIgnoreNullable(newField.type())) {
@@ -208,23 +214,23 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
&&
!newField.description().equals(oldField.description())) {
result.add(
SchemaChange.updateColumnComment(
- new String[] {newField.name()},
newField.description()));
+ new String[] {newFieldName},
newField.description()));
}
} else {
// update column type
- result.add(SchemaChange.updateColumnType(newField.name(),
newField.type()));
+ result.add(SchemaChange.updateColumnType(newFieldName,
newField.type()));
// update column comment
if (newField.description() != null) {
result.add(
SchemaChange.updateColumnComment(
- new String[] {newField.name()},
newField.description()));
+ new String[] {newFieldName},
newField.description()));
}
}
} else {
// add column
result.add(
SchemaChange.addColumn(
- newField.name(), newField.type(),
newField.description(), null));
+ newFieldName, newField.type(),
newField.description(), null));
}
}
return result;
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 9ed85c71d..01cb78dae 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -542,11 +542,17 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
RowType rowType =
RowType.of(
new DataType[] {
- DataTypes.INT().notNull(), DataTypes.VARCHAR(10),
DataTypes.INT()
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.INT(),
+ DataTypes.VARCHAR(10)
},
- new String[] {"k1", "v0", "v1"});
+ new String[] {"k1", "v0", "v1", "v2"});
waitForResult(
- Arrays.asList("+I[5, five, 50]", "+I[7, seven, 70]"),
+ Arrays.asList(
+ "+I[5, five, 50, NULL]",
+ "+I[7, seven, 70, NULL]",
+ "+I[8, eight, 80, added]"),
table,
rowType,
Collections.singletonList("k1"));
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index 5d458ae60..b1dda4736 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -27,7 +27,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.JobClient;
@@ -468,9 +467,9 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
@Test
@Timeout(60)
- public void testIgnoreCase() throws Exception {
+ public void testIgnoreCaseDivided() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
- mySqlConfig.put("database-name", "paimon_ignore_CASE");
+ mySqlConfig.put("database-name", "paimon_ignore_CASE_divided");
MySqlSyncDatabaseAction action =
syncDatabaseActionBuilder(mySqlConfig)
@@ -481,55 +480,83 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
.build();
runActionWithDefaultEnv(action);
- // check table schema
- FileStoreTable table = getFileStoreTable("t");
- assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields()))
- .isEqualTo(
- "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT
NULL\",\"description\":\"\"},"
- +
"{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
-
- // check sync schema changes and records
try (Statement statement = getStatement()) {
- statement.executeUpdate("USE paimon_ignore_CASE");
- statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi')");
- RowType rowType1 =
- RowType.of(
- new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(20)},
- new String[] {"k", "uppercase_v0"});
- waitForResult(
- Collections.singletonList("+I[1, Hi]"),
- table,
- rowType1,
- Collections.singletonList("k"));
+ statement.executeUpdate("USE paimon_ignore_CASE_divided");
+ ignoreCaseTableCheck(statement, "T");
+ }
+ }
- statement.executeUpdate("ALTER TABLE T MODIFY COLUMN UPPERCASE_V0
VARCHAR(30)");
- statement.executeUpdate("INSERT INTO T VALUES (2, 'Paimon')");
- RowType rowType2 =
- RowType.of(
- new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(30)},
- new String[] {"k", "uppercase_v0"});
- waitForResult(
- Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"),
- table,
- rowType2,
- Collections.singletonList("k"));
+ @Test
+ @Timeout(60)
+ public void testIgnoreCaseCombined() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", "paimon_ignore_CASE_combined");
- statement.executeUpdate("ALTER TABLE T ADD COLUMN UPPERCASE_V1
DOUBLE");
- statement.executeUpdate("INSERT INTO T VALUES (3, 'Test', 0.5)");
- RowType rowType3 =
- RowType.of(
- new DataType[] {
- DataTypes.INT().notNull(),
DataTypes.VARCHAR(30), DataTypes.DOUBLE()
- },
- new String[] {"k", "uppercase_v0",
"uppercase_v1"});
- waitForResult(
- Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]",
"+I[3, Test, 0.5]"),
- table,
- rowType3,
- Collections.singletonList("k"));
+ MySqlSyncDatabaseAction action =
+ syncDatabaseActionBuilder(mySqlConfig)
+ .withCatalogConfig(
+ Collections.singletonMap(
+
FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+ .withMode(COMBINED.configString())
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.executeUpdate("USE paimon_ignore_CASE_combined");
+ ignoreCaseTableCheck(statement, "T");
+
+ // check new table
+ statement.executeUpdate(
+ "CREATE TABLE T1 (k INT, UPPERCASE_V0 VARCHAR(20), PRIMARY
KEY (k))");
+ waitingTables("t1");
+ ignoreCaseTableCheck(statement, "T1");
}
}
+ private void ignoreCaseTableCheck(Statement statement, String tableName)
throws Exception {
+ FileStoreTable table = getFileStoreTable(tableName.toLowerCase());
+
+ // check sync schema changes and records
+ statement.executeUpdate("INSERT INTO " + tableName + " VALUES (1,
'Hi')");
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(20)},
+ new String[] {"k", "uppercase_v0"});
+ waitForResult(
+ Collections.singletonList("+I[1, Hi]"),
+ table,
+ rowType1,
+ Collections.singletonList("k"));
+
+ statement.executeUpdate(
+ "ALTER TABLE " + tableName + " MODIFY COLUMN UPPERCASE_V0
VARCHAR(30)");
+ statement.executeUpdate("INSERT INTO " + tableName + " VALUES (2,
'Paimon')");
+ RowType rowType2 =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(30)},
+ new String[] {"k", "uppercase_v0"});
+ waitForResult(
+ Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"),
+ table,
+ rowType2,
+ Collections.singletonList("k"));
+
+ statement.executeUpdate("ALTER TABLE " + tableName + " ADD COLUMN
UPPERCASE_V1 DOUBLE");
+ statement.executeUpdate("INSERT INTO " + tableName + " VALUES (3,
'Test', 0.5)");
+ RowType rowType3 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(30),
DataTypes.DOUBLE()
+ },
+ new String[] {"k", "uppercase_v0", "uppercase_v1"});
+ waitForResult(
+ Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]", "+I[3,
Test, 0.5]"),
+ table,
+ rowType3,
+ Collections.singletonList("k"));
+ }
+
@Test
@Timeout(600)
public void testNewlyAddedTables() throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
index ebf431813..998ba5141 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/case-insensitive/canal-data-1.txt
@@ -18,3 +18,4 @@
{"data":[{"k1":"5","v0":"five","V1":"50"}],"database":"paimon_sync_database_affix","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770072286,"type":"INSERT"}
{"data":[{"K1":"7","v0":"seven","V1":"70"}],"database":"paimon_sync_database_affix","es":1684770073000,"id":84,"isDdl":false,"mysqlType":{"K1":"INT","v0":"VARCHAR(10)","V1":"INT"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4},"table":"t1","ts":1684770073254,"type":"INSERT"}
+{"data":[{"K1":"8","v0":"eight","V1":"80",
"V2":"added"}],"database":"paimon_sync_database_affix","es":1684770074000,"id":84,"isDdl":false,"mysqlType":{"K1":"INT","v0":"VARCHAR(10)","V1":"INT","V2":"VARCHAR(10)"},"old":null,"pkNames":["k1"],"sql":"","sqlType":{"k1":4,"v0":12,"v1":4,"v2":12},"table":"t1","ts":1684770073254,"type":"INSERT"}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
index bd70146be..b882e6d0a 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_database_setup.sql
@@ -191,11 +191,24 @@ CREATE TABLE test (
);
--
################################################################################
--- MySqlSyncDatabaseActionITCase#testIgnoreCase
+-- MySqlSyncDatabaseActionITCase#testIgnoreCaseDivided
--
################################################################################
-CREATE DATABASE paimon_ignore_CASE;
-USE paimon_ignore_CASE;
+CREATE DATABASE paimon_ignore_CASE_divided;
+USE paimon_ignore_CASE_divided;
+
+CREATE TABLE T (
+ k INT,
+ UPPERCASE_V0 VARCHAR(20),
+ PRIMARY KEY (k)
+);
+
+--
################################################################################
+-- MySqlSyncDatabaseActionITCase#testIgnoreCaseCombined
+--
################################################################################
+
+CREATE DATABASE paimon_ignore_CASE_combined;
+USE paimon_ignore_CASE_combined;
CREATE TABLE T (
k INT,