This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7b378dcf7 [cdc] When the cdc source modifies field comments, all field
comments in the existing paimon table will be set to empty. (#3088)
7b378dcf7 is described below
commit 7b378dcf7da08d84aa0f9b88eb6c98a20cc985e7
Author: Kerwin <[email protected]>
AuthorDate: Tue Apr 30 17:02:22 2024 +0800
[cdc] When the cdc source modifies field comments, all field comments in
the existing paimon table will be set to empty. (#3088)
---
.../flink/action/cdc/CdcActionCommonUtils.java | 20 ++++++++
.../flink/action/cdc/format/RecordParser.java | 55 ++++++++++------------
.../action/cdc/format/canal/CanalRecordParser.java | 19 ++++----
.../cdc/format/debezium/DebeziumRecordParser.java | 11 ++---
.../mongodb/strategy/Mongo4VersionStrategy.java | 18 +++----
.../cdc/mongodb/strategy/MongoVersionStrategy.java | 27 +++++------
.../flink/action/cdc/mysql/MySqlRecordParser.java | 20 ++++----
.../action/cdc/postgres/PostgresRecordParser.java | 18 +++----
.../cdc/CdcDynamicTableParsingProcessFunction.java | 2 +-
.../flink/sink/cdc/NewTableSchemaBuilder.java | 9 ++--
.../flink/sink/cdc/RichCdcMultiplexRecord.java | 23 +++++----
.../cdc/RichCdcMultiplexRecordEventParser.java | 2 +-
.../paimon/flink/sink/cdc/RichCdcRecord.java | 40 ++++++++++------
.../paimon/flink/sink/cdc/RichEventParser.java | 15 +++---
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 8 +++-
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 2 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 40 ++++++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 7 +++
18 files changed, 202 insertions(+), 134 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index bc41683fd..20740d1a7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -100,6 +100,26 @@ public class CdcActionCommonUtils {
return true;
}
+ public static List<DataField> fieldNameCaseConvert(
+ List<DataField> origin, boolean caseSensitive, String tableName) {
+ Set<String> existedFields = new HashSet<>();
+ Function<String, String> columnDuplicateErrMsg =
+ columnDuplicateErrMsg(tableName == null ? "UNKNOWN" :
tableName);
+ return origin.stream()
+ .map(
+ field -> {
+ if (caseSensitive) {
+ return field;
+ }
+ String columnLowerCase =
field.name().toLowerCase();
+ checkArgument(
+ existedFields.add(columnLowerCase),
+ columnDuplicateErrMsg.apply(field.name()));
+ return field.newName(columnLowerCase);
+ })
+ .collect(Collectors.toList());
+ }
+
public static <T> LinkedHashMap<String, T> mapKeyCaseConvert(
LinkedHashMap<String, T> origin,
boolean caseSensitive,
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index f4543759a..38076428f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -24,9 +24,10 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TypeUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -51,7 +51,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
@@ -102,7 +102,13 @@ public abstract class RecordParser
}
Schema.Builder builder = Schema.newBuilder();
- recordOpt.get().fieldTypes().forEach(builder::column);
+ recordOpt
+ .get()
+ .fields()
+ .forEach(
+ field ->
+ builder.column(
+ field.name(), field.type(),
field.description()));
builder.primaryKey(extractPrimaryKeys());
return builder.build();
} catch (Exception e) {
@@ -122,10 +128,9 @@ public abstract class RecordParser
}
// use STRING type in default when we cannot get origin data types (most
cases)
- protected LinkedHashMap<String, DataType> fillDefaultTypes(JsonNode
record) {
- LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
- record.fieldNames().forEachRemaining(name -> fieldTypes.put(name,
DataTypes.STRING()));
- return fieldTypes;
+ protected void fillDefaultTypes(JsonNode record, RowType.Builder
rowTypeBuilder) {
+ record.fieldNames()
+ .forEachRemaining(name -> rowTypeBuilder.field(name,
DataTypes.STRING()));
}
@Override
@@ -139,9 +144,8 @@ public abstract class RecordParser
}
}
- protected Map<String, String> extractRowData(
- JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes)
{
- paimonFieldTypes.putAll(fillDefaultTypes(record));
+ protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
+ fillDefaultTypes(record, rowTypeBuilder);
Map<String, Object> recordMap =
convertValue(record, new TypeReference<Map<String, Object>>()
{});
Map<String, String> rowData =
@@ -162,19 +166,19 @@ public abstract class RecordParser
}
return
Objects.toString(entry.getValue());
}));
- evalComputedColumns(rowData, paimonFieldTypes);
+ evalComputedColumns(rowData, rowTypeBuilder);
return rowData;
}
// generate values for computed columns
protected void evalComputedColumns(
- Map<String, String> rowData, LinkedHashMap<String, DataType>
paimonFieldTypes) {
+ Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
computedColumns.forEach(
computedColumn -> {
rowData.put(
computedColumn.columnName(),
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
- paimonFieldTypes.put(computedColumn.columnName(),
computedColumn.columnType());
+ rowTypeBuilder.field(computedColumn.columnName(),
computedColumn.columnType());
});
}
@@ -191,32 +195,23 @@ public abstract class RecordParser
protected void processRecord(
JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord>
records) {
- LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>(jsonNode.size());
- Map<String, String> rowData = this.extractRowData(jsonNode,
paimonFieldTypes);
- records.add(createRecord(rowKind, rowData, paimonFieldTypes));
+ RowType.Builder rowTypeBuilder = RowType.builder();
+ Map<String, String> rowData = this.extractRowData(jsonNode,
rowTypeBuilder);
+ records.add(createRecord(rowKind, rowData,
rowTypeBuilder.build().getFields()));
}
/** Handle case sensitivity here. */
private RichCdcMultiplexRecord createRecord(
- RowKind rowKind,
- Map<String, String> data,
- LinkedHashMap<String, DataType> paimonFieldTypes) {
+ RowKind rowKind, Map<String, String> data, List<DataField>
paimonFields) {
String databaseName = getDatabaseName();
String tableName = getTableName();
- paimonFieldTypes =
- mapKeyCaseConvert(
- paimonFieldTypes,
- caseSensitive,
- columnDuplicateErrMsg(tableName == null ? "UNKNOWN" :
tableName));
+ paimonFields = fieldNameCaseConvert(paimonFields, caseSensitive,
tableName);
+
data = mapKeyCaseConvert(data, caseSensitive,
recordKeyDuplicateErrMsg(data));
List<String> primaryKeys = listCaseConvert(extractPrimaryKeys(),
caseSensitive);
return new RichCdcMultiplexRecord(
- databaseName,
- tableName,
- paimonFieldTypes,
- primaryKeys,
- new CdcRecord(rowKind, data));
+ databaseName, tableName, paimonFields, primaryKeys, new
CdcRecord(rowKind, data));
}
protected void setRoot(CdcSourceRecord record) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index c7cd06c18..642dc4241 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -23,8 +23,8 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -140,13 +140,11 @@ public class CanalRecordParser extends RecordParser {
schema, new TypeReference<LinkedHashMap<String, String>>() {});
}
- private LinkedHashMap<String, DataType> toPaimonFieldTypes(
- LinkedHashMap<String, String> originalFieldTypes) {
- LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
+ private void toPaimonFieldTypes(
+ LinkedHashMap<String, String> originalFieldTypes, RowType.Builder
rowTypeBuilder) {
originalFieldTypes.forEach(
(name, type) ->
- paimonFieldTypes.put(name,
MySqlTypeUtils.toDataType(type, typeMapping)));
- return paimonFieldTypes;
+ rowTypeBuilder.field(name,
MySqlTypeUtils.toDataType(type, typeMapping)));
}
@Override
@@ -160,15 +158,14 @@ public class CanalRecordParser extends RecordParser {
}
@Override
- protected Map<String, String> extractRowData(
- JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes)
{
+ protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
LinkedHashMap<String, String> originalFieldTypes =
tryExtractOriginalFieldTypes();
Map<String, Object> recordMap =
JsonSerdeUtil.convertValue(record, new
TypeReference<Map<String, Object>>() {});
Map<String, String> rowData = new HashMap<>();
if (originalFieldTypes != null) {
- paimonFieldTypes.putAll(toPaimonFieldTypes(originalFieldTypes));
+ toPaimonFieldTypes(originalFieldTypes, rowTypeBuilder);
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
String fieldName = entry.getKey();
String originalType = originalFieldTypes.get(fieldName);
@@ -177,13 +174,13 @@ public class CanalRecordParser extends RecordParser {
rowData.put(fieldName, newValue);
}
} else {
- paimonFieldTypes.putAll(fillDefaultTypes(record));
+ fillDefaultTypes(record, rowTypeBuilder);
for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
rowData.put(entry.getKey(), Objects.toString(entry.getValue(),
null));
}
}
- evalComputedColumns(rowData, paimonFieldTypes);
+ evalComputedColumns(rowData, rowTypeBuilder);
return rowData;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index 471856462..ec23132ff 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -23,8 +23,8 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
@@ -183,10 +183,9 @@ public class DebeziumRecordParser extends RecordParser {
}
@Override
- protected Map<String, String> extractRowData(
- JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes)
{
+ protected Map<String, String> extractRowData(JsonNode record,
RowType.Builder rowTypeBuilder) {
if (!hasSchema) {
- return super.extractRowData(record, paimonFieldTypes);
+ return super.extractRowData(record, rowTypeBuilder);
}
Map<String, Object> recordMap =
@@ -208,13 +207,13 @@ public class DebeziumRecordParser extends RecordParser {
ZoneOffset.UTC);
resultMap.put(fieldName, transformed);
- paimonFieldTypes.put(
+ rowTypeBuilder.field(
fieldName,
DebeziumSchemaUtils.toDataType(
debeziumType, className,
parameters.get(fieldName)));
}
- evalComputedColumns(resultMap, paimonFieldTypes);
+ evalComputedColumns(resultMap, rowTypeBuilder);
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 395ec939e..e64cf3e8a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -21,8 +21,9 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -30,11 +31,10 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
@@ -130,19 +130,19 @@ public class Mongo4VersionStrategy implements
MongoVersionStrategy {
*/
private RichCdcMultiplexRecord processRecord(JsonNode fullDocument,
RowKind rowKind)
throws JsonProcessingException {
- LinkedHashMap<String, DataType> paimonFieldTypes = new
LinkedHashMap<>();
+ RowType.Builder rowTypeBuilder = RowType.builder();
Map<String, String> record =
- getExtractRow(fullDocument, paimonFieldTypes, computedColumns,
mongodbConfig);
+ getExtractRow(fullDocument, rowTypeBuilder, computedColumns,
mongodbConfig);
record = mapKeyCaseConvert(record, caseSensitive,
recordKeyDuplicateErrMsg(record));
- paimonFieldTypes =
- mapKeyCaseConvert(
- paimonFieldTypes, caseSensitive,
columnDuplicateErrMsg(collection));
+
+ List<DataField> fields = rowTypeBuilder.build().getFields();
+ fields = fieldNameCaseConvert(fields, caseSensitive, collection);
return new RichCdcMultiplexRecord(
databaseName,
collection,
- paimonFieldTypes,
+ fields,
extractPrimaryKeys(),
new CdcRecord(rowKind, record));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 3908c0e68..64f127571 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -34,7 +34,6 @@ import org.apache.flink.configuration.Configuration;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -72,14 +71,14 @@ public interface MongoVersionStrategy {
* Determines the extraction mode and retrieves the row accordingly.
*
* @param jsonNode The JsonNode representing the MongoDB document.
- * @param paimonFieldTypes A map to store the field types.
+ * @param rowTypeBuilder row type builder.
* @param mongodbConfig Configuration for the MongoDB connection.
* @return A map representing the extracted row.
* @throws JsonProcessingException If there's an error during JSON
processing.
*/
default Map<String, String> getExtractRow(
JsonNode jsonNode,
- LinkedHashMap<String, DataType> paimonFieldTypes,
+ RowType.Builder rowTypeBuilder,
List<ComputedColumn> computedColumns,
Configuration mongodbConfig)
throws JsonProcessingException {
@@ -105,9 +104,9 @@ public interface MongoVersionStrategy {
mongodbConfig.getString(PARSER_PATH),
mongodbConfig.getString(FIELD_NAME),
computedColumns,
- paimonFieldTypes);
+ rowTypeBuilder);
case DYNAMIC:
- return parseAndTypeJsonRow(document.toString(),
paimonFieldTypes, computedColumns);
+ return parseAndTypeJsonRow(document.toString(),
rowTypeBuilder, computedColumns);
default:
throw new RuntimeException("Unsupported extraction mode: " +
mode);
}
@@ -115,11 +114,9 @@ public interface MongoVersionStrategy {
/** Parses and types a JSON row based on the given parameters. */
default Map<String, String> parseAndTypeJsonRow(
- String evaluate,
- LinkedHashMap<String, DataType> paimonFieldTypes,
- List<ComputedColumn> computedColumns) {
+ String evaluate, RowType.Builder rowTypeBuilder,
List<ComputedColumn> computedColumns) {
Map<String, String> parsedRow = JsonSerdeUtil.parseJsonMap(evaluate,
String.class);
- return processParsedData(parsedRow, paimonFieldTypes, computedColumns);
+ return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
}
/** Parses fields from a JSON record based on the given parameters. */
@@ -128,7 +125,7 @@ public interface MongoVersionStrategy {
String fieldPaths,
String fieldNames,
List<ComputedColumn> computedColumns,
- LinkedHashMap<String, DataType> fieldTypes) {
+ RowType.Builder rowTypeBuilder) {
String[] columnNames = fieldNames.split(",");
String[] parseNames = fieldPaths.split(",");
Map<String, String> parsedRow = new HashMap<>();
@@ -138,20 +135,20 @@ public interface MongoVersionStrategy {
parsedRow.put(columnNames[i],
Optional.ofNullable(evaluate).orElse("{}"));
}
- return processParsedData(parsedRow, fieldTypes, computedColumns);
+ return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
}
/** Processes the parsed data to generate the result map and update field
types. */
static Map<String, String> processParsedData(
Map<String, String> parsedRow,
- LinkedHashMap<String, DataType> fieldTypes,
+ RowType.Builder rowTypeBuilder,
List<ComputedColumn> computedColumns) {
int initialCapacity = parsedRow.size() + computedColumns.size();
Map<String, String> resultMap = new HashMap<>(initialCapacity);
parsedRow.forEach(
(column, value) -> {
- fieldTypes.put(column, DataTypes.STRING());
+ rowTypeBuilder.field(column, DataTypes.STRING());
resultMap.put(column, value);
});
computedColumns.forEach(
@@ -161,7 +158,7 @@ public interface MongoVersionStrategy {
String computedValue =
computedColumn.eval(parsedRow.get(fieldReference));
resultMap.put(columnName, computedValue);
- fieldTypes.put(columnName, computedColumn.columnType());
+ rowTypeBuilder.field(columnName,
computedColumn.columnType());
});
return resultMap;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 45dcaa3c2..745a7932d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -26,8 +26,10 @@ import
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
@@ -159,22 +161,18 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
Table table = tableChange.getTable();
- LinkedHashMap<String, DataType> fieldTypes = extractFieldTypes(table);
+ List<DataField> fields = extractFields(table);
List<String> primaryKeys =
listCaseConvert(table.primaryKeyColumnNames(), caseSensitive);
// TODO : add table comment and column comment when we upgrade flink
cdc to 2.4
return Collections.singletonList(
new RichCdcMultiplexRecord(
- databaseName,
- currentTable,
- fieldTypes,
- primaryKeys,
- CdcRecord.emptyRecord()));
+ databaseName, currentTable, fields, primaryKeys,
CdcRecord.emptyRecord()));
}
- private LinkedHashMap<String, DataType> extractFieldTypes(Table table) {
+ private List<DataField> extractFields(Table table) {
+ RowType.Builder rowType = RowType.builder();
List<Column> columns = table.columns();
- LinkedHashMap<String, DataType> fieldTypes = new
LinkedHashMap<>(columns.size());
Set<String> existedFields = new HashSet<>();
Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(table.id().toString());
@@ -192,9 +190,9 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
typeMapping);
dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) ||
column.isOptional());
- fieldTypes.put(columnName, dataType);
+ rowType.field(columnName, dataType);
}
- return fieldTypes;
+ return rowType.build().getFields();
}
private List<RichCdcMultiplexRecord> extractRecords() {
@@ -274,7 +272,7 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
return new RichCdcMultiplexRecord(
databaseName,
currentTable,
- new LinkedHashMap<>(0),
+ Collections.emptyList(),
Collections.emptyList(),
new CdcRecord(rowKind, data));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index de54b97ae..0299d57f7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -25,9 +25,11 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -144,7 +146,7 @@ public class PostgresRecordParser
extractRecords().forEach(out::collect);
}
- private LinkedHashMap<String, DataType>
extractFieldTypes(DebeziumEvent.Field schema) {
+ private List<DataField> extractFields(DebeziumEvent.Field schema) {
Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
Preconditions.checkArgument(
!afterFields.isEmpty(),
@@ -152,7 +154,7 @@ public class PostgresRecordParser
+ "Please make sure that `includeSchema` is true "
+ "in the JsonDebeziumDeserializationSchema you
created");
- LinkedHashMap<String, DataType> fieldTypes = new
LinkedHashMap<>(afterFields.size());
+ RowType.Builder rowType = RowType.builder();
Set<String> existedFields = new HashSet<>();
Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(currentTable);
afterFields.forEach(
@@ -166,13 +168,13 @@ public class PostgresRecordParser
dataType.copy(
typeMapping.containsMode(TO_NULLABLE) ||
value.optional());
- fieldTypes.put(columnName, dataType);
+ rowType.field(columnName, dataType);
});
- return fieldTypes;
+ return rowType.build().getFields();
}
/**
- * Extract field types from json records, see <a
+ * Extract fields from json records, see <a
*
href="https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types">postgresql-data-types</a>.
*/
private DataType extractFieldType(DebeziumEvent.Field field) {
@@ -246,12 +248,12 @@ public class PostgresRecordParser
Map<String, String> after = extractRow(root.payload().after());
if (!after.isEmpty()) {
after = mapKeyCaseConvert(after, caseSensitive,
recordKeyDuplicateErrMsg(after));
- LinkedHashMap<String, DataType> fieldTypes =
extractFieldTypes(root.schema());
+ List<DataField> fields = extractFields(root.schema());
records.add(
new RichCdcMultiplexRecord(
databaseName,
currentTable,
- fieldTypes,
+ fields,
Collections.emptyList(),
new CdcRecord(RowKind.INSERT, after)));
}
@@ -398,7 +400,7 @@ public class PostgresRecordParser
return new RichCdcMultiplexRecord(
databaseName,
currentTable,
- new LinkedHashMap<>(0),
+ Collections.emptyList(),
Collections.emptyList(),
new CdcRecord(rowKind, data));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 777f82a95..87aefeb58 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -107,7 +107,7 @@ public class CdcDynamicTableParsingProcessFunction<T>
extends ProcessFunction<T,
});
List<DataField> schemaChange = parser.parseSchemaChange();
- if (schemaChange.size() > 0) {
+ if (!schemaChange.isEmpty()) {
context.output(
DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG,
Tuple2.of(Identifier.create(database, tableName),
schemaChange));
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index 18324abed..795fdb0f9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
import java.io.Serializable;
import java.util.HashSet;
@@ -60,12 +60,11 @@ public class NewTableSchemaBuilder implements Serializable {
Set<String> existedFields = new HashSet<>();
Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(tableName);
- for (Map.Entry<String, DataType> entry :
record.fieldTypes().entrySet()) {
+ for (DataField dataField : record.fields()) {
String fieldName =
columnCaseConvertAndDuplicateCheck(
- entry.getKey(), existedFields, caseSensitive,
columnDuplicateErrMsg);
-
- builder.column(fieldName, entry.getValue());
+ dataField.name(), existedFields, caseSensitive,
columnDuplicateErrMsg);
+ builder.column(fieldName, dataField.type(),
dataField.description());
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
index 0fe0c9eab..44a228e4c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -18,12 +18,11 @@
package org.apache.paimon.flink.sink.cdc;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
@@ -34,19 +33,19 @@ public class RichCdcMultiplexRecord implements Serializable
{
@Nullable private final String databaseName;
@Nullable private final String tableName;
- private final LinkedHashMap<String, DataType> fieldTypes;
+ private final List<DataField> fields;
private final List<String> primaryKeys;
private final CdcRecord cdcRecord;
public RichCdcMultiplexRecord(
@Nullable String databaseName,
@Nullable String tableName,
- LinkedHashMap<String, DataType> fieldTypes,
+ List<DataField> fields,
List<String> primaryKeys,
CdcRecord cdcRecord) {
this.databaseName = databaseName;
this.tableName = tableName;
- this.fieldTypes = fieldTypes;
+ this.fields = fields;
this.primaryKeys = primaryKeys;
this.cdcRecord = cdcRecord;
}
@@ -61,8 +60,8 @@ public class RichCdcMultiplexRecord implements Serializable {
return tableName;
}
- public LinkedHashMap<String, DataType> fieldTypes() {
- return fieldTypes;
+ public List<DataField> fields() {
+ return fields;
}
public List<String> primaryKeys() {
@@ -70,12 +69,12 @@ public class RichCdcMultiplexRecord implements Serializable
{
}
public RichCdcRecord toRichCdcRecord() {
- return new RichCdcRecord(cdcRecord, fieldTypes);
+ return new RichCdcRecord(cdcRecord, fields);
}
@Override
public int hashCode() {
- return Objects.hash(databaseName, tableName, fieldTypes, primaryKeys,
cdcRecord);
+ return Objects.hash(databaseName, tableName, fields, primaryKeys,
cdcRecord);
}
@Override
@@ -89,7 +88,7 @@ public class RichCdcMultiplexRecord implements Serializable {
RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
return Objects.equals(databaseName, that.databaseName)
&& Objects.equals(tableName, that.tableName)
- && Objects.equals(fieldTypes, that.fieldTypes)
+ && Objects.equals(fields, that.fields)
&& Objects.equals(primaryKeys, that.primaryKeys)
&& Objects.equals(cdcRecord, that.cdcRecord);
}
@@ -101,8 +100,8 @@ public class RichCdcMultiplexRecord implements Serializable
{
+ databaseName
+ ", tableName="
+ tableName
- + ", fieldTypes="
- + fieldTypes
+ + ", fields="
+ + fields
+ ", primaryKeys="
+ primaryKeys
+ ", cdcRecord="
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 23488de1e..d0298e2b9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -157,7 +157,7 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private boolean shouldCreateCurrentTable() {
return shouldSynchronizeCurrentTable
- && !record.fieldTypes().isEmpty()
+ && !record.fields().isEmpty()
&& createdTables.add(parseTableName());
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 37e9271d0..7fc0c3ff7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -19,14 +19,19 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
/** A change message contains schema and data. */
@Experimental
@@ -35,11 +40,11 @@ public class RichCdcRecord implements Serializable {
private static final long serialVersionUID = 1L;
private final CdcRecord cdcRecord;
- private final LinkedHashMap<String, DataType> fieldTypes;
+ private final List<DataField> fields;
- public RichCdcRecord(CdcRecord cdcRecord, LinkedHashMap<String, DataType>
fieldTypes) {
+ public RichCdcRecord(CdcRecord cdcRecord, List<DataField> fields) {
this.cdcRecord = cdcRecord;
- this.fieldTypes = fieldTypes;
+ this.fields = fields;
}
public boolean hasPayload() {
@@ -50,8 +55,8 @@ public class RichCdcRecord implements Serializable {
return cdcRecord.kind();
}
- public LinkedHashMap<String, DataType> fieldTypes() {
- return fieldTypes;
+ public List<DataField> fields() {
+ return fields;
}
public CdcRecord toCdcRecord() {
@@ -67,42 +72,49 @@ public class RichCdcRecord implements Serializable {
return false;
}
RichCdcRecord that = (RichCdcRecord) o;
- return cdcRecord == that.cdcRecord && Objects.equals(fieldTypes,
that.fieldTypes);
+ return cdcRecord == that.cdcRecord && Objects.equals(fields,
that.fields);
}
@Override
public int hashCode() {
- return Objects.hash(cdcRecord, fieldTypes);
+ return Objects.hash(cdcRecord, fields);
}
@Override
public String toString() {
- return "{" + "cdcRecord=" + cdcRecord + ", fieldTypes=" + fieldTypes +
'}';
+ return "{" + "cdcRecord=" + cdcRecord + ", fields=" + fields + '}';
}
public static Builder builder(RowKind kind) {
- return new Builder(kind);
+ return new Builder(kind, new AtomicInteger(-1));
}
/** Builder for {@link RichCdcRecord}. */
public static class Builder {
private final RowKind kind;
- private final LinkedHashMap<String, DataType> fieldTypes = new
LinkedHashMap<>();
+ private final AtomicInteger fieldId;
+ private final List<DataField> fields = new ArrayList<>();
private final Map<String, String> fieldValues = new HashMap<>();
- public Builder(RowKind kind) {
+ public Builder(RowKind kind, AtomicInteger fieldId) {
this.kind = kind;
+ this.fieldId = fieldId;
}
public Builder field(String name, DataType type, String value) {
- fieldTypes.put(name, type);
+ return field(name, type, value, null);
+ }
+
+ public Builder field(
+ String name, DataType type, String value, @Nullable String
description) {
+ fields.add(new DataField(fieldId.incrementAndGet(), name, type,
description));
fieldValues.put(name, value);
return this;
}
public RichCdcRecord build() {
- return new RichCdcRecord(new CdcRecord(kind, fieldValues),
fieldTypes);
+ return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields);
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
index c98ead49f..756875781 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,7 +31,7 @@ public class RichEventParser implements
EventParser<RichCdcRecord> {
private RichCdcRecord record;
- private final LinkedHashMap<String, DataType> previousDataFields = new
LinkedHashMap<>();
+ private final LinkedHashMap<String, DataField> previousDataFields = new
LinkedHashMap<>();
@Override
public void setRawEvent(RichCdcRecord rawEvent) {
@@ -42,13 +41,13 @@ public class RichEventParser implements
EventParser<RichCdcRecord> {
@Override
public List<DataField> parseSchemaChange() {
List<DataField> change = new ArrayList<>();
- record.fieldTypes()
+ record.fields()
.forEach(
- (field, type) -> {
- DataType previous = previousDataFields.get(field);
- if (!Objects.equals(previous, type)) {
- previousDataFields.put(field, type);
- change.add(new DataField(0, field, type));
+ dataField -> {
+ DataField previous =
previousDataFields.get(dataField.name());
+ if (!Objects.equals(previous, dataField)) {
+ previousDataFields.put(dataField.name(),
dataField);
+ change.add(dataField);
}
});
return change;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 3c1d6e15b..bc31a05e2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -40,7 +40,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
/** Base class for update data fields process function. */
public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends
ProcessFunction<I, O> {
@@ -193,13 +192,17 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
// we compare by ignoring nullable, because partition keys and
primary keys might be
// nullable in source database, but they can't be null in
Paimon
if (oldField.type().equalsIgnoreNullable(newField.type())) {
- if (!Objects.equals(oldField.description(),
newField.description())) {
+ // update column comment
+ if (newField.description() != null
+ &&
!newField.description().equals(oldField.description())) {
result.add(
SchemaChange.updateColumnComment(
new String[] {newField.name()},
newField.description()));
}
} else {
+ // update column type
result.add(SchemaChange.updateColumnType(newField.name(),
newField.type()));
+ // update column comment
if (newField.description() != null) {
result.add(
SchemaChange.updateColumnComment(
@@ -207,6 +210,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
}
} else {
+ // add column
result.add(
SchemaChange.addColumn(
newField.name(), newField.type(),
newField.description(), null));
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index fa6bd9301..96070048b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -573,7 +573,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
IllegalArgumentException.class,
"Cannot synchronize record when database name
or table name is unknown. "
+ "Invalid record is:\n"
- + "{databaseName=null, tableName=null,
fieldTypes={k=STRING, v0=STRING, v1=STRING}, "
+ + "{databaseName=null, tableName=null,
fields=[`k` STRING, `v0` STRING, `v1` STRING], "
+ "primaryKeys=[], cdcRecord=+I
{v0=five, k=5, v1=50}}"));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 67358f848..58ed2098e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
@@ -1324,4 +1325,43 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
FileStoreTable table = getFileStoreTable();
assertThat(table.options().get(BUCKET.key())).isEqualTo("1");
}
+
+ @Test
+ @Timeout(60)
+ public void testColumnCommentChangeInExistingTable() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "1");
+ options.put("sink.parallelism", "1");
+
+ RowType rowType =
+ RowType.builder()
+ .field("pk", DataTypes.INT().notNull(), "pk comment")
+ .field("c1", DataTypes.DATE(), "c1 comment")
+ .field("c2", DataTypes.VARCHAR(10).notNull(), "c2
comment")
+ .build();
+
+ createFileStoreTable(
+ rowType, Collections.emptyList(),
Collections.singletonList("pk"), options);
+
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_exist_column_comment_change");
+
+ // Flink cdc 2.3 does not support collecting field comments, and
existing paimon table field
+ // comments will not be changed.
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withPrimaryKeys("pk")
+ .withTableConfig(getBasicTableConfig())
+ .build();
+ runActionWithDefaultEnv(action);
+
+ FileStoreTable table = getFileStoreTable();
+ Map<String, DataField> actual =
+ table.schema().fields().stream()
+ .collect(Collectors.toMap(DataField::name,
Function.identity()));
+ assertThat(actual.get("pk").description()).isEqualTo("pk comment");
+ assertThat(actual.get("c1").description()).isEqualTo("c1 comment");
+ assertThat(actual.get("c2").description()).isEqualTo("c2 comment");
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index b69661b60..5cf9cc1d9 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -316,6 +316,13 @@ CREATE TABLE test_exist_options_change (
PRIMARY KEY (pk)
);
+CREATE TABLE test_exist_column_comment_change (
+ pk INT,
+ c1 DATE,
+ c2 VARCHAR(10) not null comment 'c2 comment',
+ PRIMARY KEY (pk)
+);
+
--
################################################################################
-- testSyncShard
--
################################################################################