This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b378dcf7 [cdc] When the cdc source modifies field comments, all field 
comments in the existing paimon table will be set to empty. (#3088)
7b378dcf7 is described below

commit 7b378dcf7da08d84aa0f9b88eb6c98a20cc985e7
Author: Kerwin <[email protected]>
AuthorDate: Tue Apr 30 17:02:22 2024 +0800

    [cdc] When the cdc source modifies field comments, all field comments in 
the existing paimon table will be set to empty. (#3088)
---
 .../flink/action/cdc/CdcActionCommonUtils.java     | 20 ++++++++
 .../flink/action/cdc/format/RecordParser.java      | 55 ++++++++++------------
 .../action/cdc/format/canal/CanalRecordParser.java | 19 ++++----
 .../cdc/format/debezium/DebeziumRecordParser.java  | 11 ++---
 .../mongodb/strategy/Mongo4VersionStrategy.java    | 18 +++----
 .../cdc/mongodb/strategy/MongoVersionStrategy.java | 27 +++++------
 .../flink/action/cdc/mysql/MySqlRecordParser.java  | 20 ++++----
 .../action/cdc/postgres/PostgresRecordParser.java  | 18 +++----
 .../cdc/CdcDynamicTableParsingProcessFunction.java |  2 +-
 .../flink/sink/cdc/NewTableSchemaBuilder.java      |  9 ++--
 .../flink/sink/cdc/RichCdcMultiplexRecord.java     | 23 +++++----
 .../cdc/RichCdcMultiplexRecordEventParser.java     |  2 +-
 .../paimon/flink/sink/cdc/RichCdcRecord.java       | 40 ++++++++++------
 .../paimon/flink/sink/cdc/RichEventParser.java     | 15 +++---
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |  8 +++-
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |  2 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 40 ++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  |  7 +++
 18 files changed, 202 insertions(+), 134 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index bc41683fd..20740d1a7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -100,6 +100,26 @@ public class CdcActionCommonUtils {
         return true;
     }
 
+    public static List<DataField> fieldNameCaseConvert(
+            List<DataField> origin, boolean caseSensitive, String tableName) {
+        Set<String> existedFields = new HashSet<>();
+        Function<String, String> columnDuplicateErrMsg =
+                columnDuplicateErrMsg(tableName == null ? "UNKNOWN" : 
tableName);
+        return origin.stream()
+                .map(
+                        field -> {
+                            if (caseSensitive) {
+                                return field;
+                            }
+                            String columnLowerCase = 
field.name().toLowerCase();
+                            checkArgument(
+                                    existedFields.add(columnLowerCase),
+                                    columnDuplicateErrMsg.apply(field.name()));
+                            return field.newName(columnLowerCase);
+                        })
+                .collect(Collectors.toList());
+    }
+
     public static <T> LinkedHashMap<String, T> mapKeyCaseConvert(
             LinkedHashMap<String, T> origin,
             boolean caseSensitive,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
index f4543759a..38076428f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java
@@ -24,9 +24,10 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TypeUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -43,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -51,7 +51,7 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
@@ -102,7 +102,13 @@ public abstract class RecordParser
             }
 
             Schema.Builder builder = Schema.newBuilder();
-            recordOpt.get().fieldTypes().forEach(builder::column);
+            recordOpt
+                    .get()
+                    .fields()
+                    .forEach(
+                            field ->
+                                    builder.column(
+                                            field.name(), field.type(), 
field.description()));
             builder.primaryKey(extractPrimaryKeys());
             return builder.build();
         } catch (Exception e) {
@@ -122,10 +128,9 @@ public abstract class RecordParser
     }
 
     // use STRING type in default when we cannot get origin data types (most 
cases)
-    protected LinkedHashMap<String, DataType> fillDefaultTypes(JsonNode 
record) {
-        LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>();
-        record.fieldNames().forEachRemaining(name -> fieldTypes.put(name, 
DataTypes.STRING()));
-        return fieldTypes;
+    protected void fillDefaultTypes(JsonNode record, RowType.Builder 
rowTypeBuilder) {
+        record.fieldNames()
+                .forEachRemaining(name -> rowTypeBuilder.field(name, 
DataTypes.STRING()));
     }
 
     @Override
@@ -139,9 +144,8 @@ public abstract class RecordParser
         }
     }
 
-    protected Map<String, String> extractRowData(
-            JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
-        paimonFieldTypes.putAll(fillDefaultTypes(record));
+    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+        fillDefaultTypes(record, rowTypeBuilder);
         Map<String, Object> recordMap =
                 convertValue(record, new TypeReference<Map<String, Object>>() 
{});
         Map<String, String> rowData =
@@ -162,19 +166,19 @@ public abstract class RecordParser
                                             }
                                             return 
Objects.toString(entry.getValue());
                                         }));
-        evalComputedColumns(rowData, paimonFieldTypes);
+        evalComputedColumns(rowData, rowTypeBuilder);
         return rowData;
     }
 
     // generate values for computed columns
     protected void evalComputedColumns(
-            Map<String, String> rowData, LinkedHashMap<String, DataType> 
paimonFieldTypes) {
+            Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
         computedColumns.forEach(
                 computedColumn -> {
                     rowData.put(
                             computedColumn.columnName(),
                             
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
-                    paimonFieldTypes.put(computedColumn.columnName(), 
computedColumn.columnType());
+                    rowTypeBuilder.field(computedColumn.columnName(), 
computedColumn.columnType());
                 });
     }
 
@@ -191,32 +195,23 @@ public abstract class RecordParser
 
     protected void processRecord(
             JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> 
records) {
-        LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>(jsonNode.size());
-        Map<String, String> rowData = this.extractRowData(jsonNode, 
paimonFieldTypes);
-        records.add(createRecord(rowKind, rowData, paimonFieldTypes));
+        RowType.Builder rowTypeBuilder = RowType.builder();
+        Map<String, String> rowData = this.extractRowData(jsonNode, 
rowTypeBuilder);
+        records.add(createRecord(rowKind, rowData, 
rowTypeBuilder.build().getFields()));
     }
 
     /** Handle case sensitivity here. */
     private RichCdcMultiplexRecord createRecord(
-            RowKind rowKind,
-            Map<String, String> data,
-            LinkedHashMap<String, DataType> paimonFieldTypes) {
+            RowKind rowKind, Map<String, String> data, List<DataField> 
paimonFields) {
         String databaseName = getDatabaseName();
         String tableName = getTableName();
-        paimonFieldTypes =
-                mapKeyCaseConvert(
-                        paimonFieldTypes,
-                        caseSensitive,
-                        columnDuplicateErrMsg(tableName == null ? "UNKNOWN" : 
tableName));
+        paimonFields = fieldNameCaseConvert(paimonFields, caseSensitive, 
tableName);
+
         data = mapKeyCaseConvert(data, caseSensitive, 
recordKeyDuplicateErrMsg(data));
         List<String> primaryKeys = listCaseConvert(extractPrimaryKeys(), 
caseSensitive);
 
         return new RichCdcMultiplexRecord(
-                databaseName,
-                tableName,
-                paimonFieldTypes,
-                primaryKeys,
-                new CdcRecord(rowKind, data));
+                databaseName, tableName, paimonFields, primaryKeys, new 
CdcRecord(rowKind, data));
     }
 
     protected void setRoot(CdcSourceRecord record) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index c7cd06c18..642dc4241 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -23,8 +23,8 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.RecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -140,13 +140,11 @@ public class CanalRecordParser extends RecordParser {
                 schema, new TypeReference<LinkedHashMap<String, String>>() {});
     }
 
-    private LinkedHashMap<String, DataType> toPaimonFieldTypes(
-            LinkedHashMap<String, String> originalFieldTypes) {
-        LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
+    private void toPaimonFieldTypes(
+            LinkedHashMap<String, String> originalFieldTypes, RowType.Builder 
rowTypeBuilder) {
         originalFieldTypes.forEach(
                 (name, type) ->
-                        paimonFieldTypes.put(name, 
MySqlTypeUtils.toDataType(type, typeMapping)));
-        return paimonFieldTypes;
+                        rowTypeBuilder.field(name, 
MySqlTypeUtils.toDataType(type, typeMapping)));
     }
 
     @Override
@@ -160,15 +158,14 @@ public class CanalRecordParser extends RecordParser {
     }
 
     @Override
-    protected Map<String, String> extractRowData(
-            JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
+    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
         LinkedHashMap<String, String> originalFieldTypes = 
tryExtractOriginalFieldTypes();
         Map<String, Object> recordMap =
                 JsonSerdeUtil.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
         Map<String, String> rowData = new HashMap<>();
 
         if (originalFieldTypes != null) {
-            paimonFieldTypes.putAll(toPaimonFieldTypes(originalFieldTypes));
+            toPaimonFieldTypes(originalFieldTypes, rowTypeBuilder);
             for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
                 String fieldName = entry.getKey();
                 String originalType = originalFieldTypes.get(fieldName);
@@ -177,13 +174,13 @@ public class CanalRecordParser extends RecordParser {
                 rowData.put(fieldName, newValue);
             }
         } else {
-            paimonFieldTypes.putAll(fillDefaultTypes(record));
+            fillDefaultTypes(record, rowTypeBuilder);
             for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
                 rowData.put(entry.getKey(), Objects.toString(entry.getValue(), 
null));
             }
         }
 
-        evalComputedColumns(rowData, paimonFieldTypes);
+        evalComputedColumns(rowData, rowTypeBuilder);
         return rowData;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
index 471856462..ec23132ff 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java
@@ -23,8 +23,8 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.RecordParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
@@ -183,10 +183,9 @@ public class DebeziumRecordParser extends RecordParser {
     }
 
     @Override
-    protected Map<String, String> extractRowData(
-            JsonNode record, LinkedHashMap<String, DataType> paimonFieldTypes) 
{
+    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
         if (!hasSchema) {
-            return super.extractRowData(record, paimonFieldTypes);
+            return super.extractRowData(record, rowTypeBuilder);
         }
 
         Map<String, Object> recordMap =
@@ -208,13 +207,13 @@ public class DebeziumRecordParser extends RecordParser {
                             ZoneOffset.UTC);
             resultMap.put(fieldName, transformed);
 
-            paimonFieldTypes.put(
+            rowTypeBuilder.field(
                     fieldName,
                     DebeziumSchemaUtils.toDataType(
                             debeziumType, className, 
parameters.get(fieldName)));
         }
 
-        evalComputedColumns(resultMap, paimonFieldTypes);
+        evalComputedColumns(resultMap, rowTypeBuilder);
 
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 395ec939e..e64cf3e8a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -21,8 +21,9 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -30,11 +31,10 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.configuration.Configuration;
 
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
+import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.fieldNameCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
 
@@ -130,19 +130,19 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
      */
     private RichCdcMultiplexRecord processRecord(JsonNode fullDocument, 
RowKind rowKind)
             throws JsonProcessingException {
-        LinkedHashMap<String, DataType> paimonFieldTypes = new 
LinkedHashMap<>();
+        RowType.Builder rowTypeBuilder = RowType.builder();
         Map<String, String> record =
-                getExtractRow(fullDocument, paimonFieldTypes, computedColumns, 
mongodbConfig);
+                getExtractRow(fullDocument, rowTypeBuilder, computedColumns, 
mongodbConfig);
 
         record = mapKeyCaseConvert(record, caseSensitive, 
recordKeyDuplicateErrMsg(record));
-        paimonFieldTypes =
-                mapKeyCaseConvert(
-                        paimonFieldTypes, caseSensitive, 
columnDuplicateErrMsg(collection));
+
+        List<DataField> fields = rowTypeBuilder.build().getFields();
+        fields = fieldNameCaseConvert(fields, caseSensitive, collection);
 
         return new RichCdcMultiplexRecord(
                 databaseName,
                 collection,
-                paimonFieldTypes,
+                fields,
                 extractPrimaryKeys(),
                 new CdcRecord(rowKind, record));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index 3908c0e68..64f127571 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -34,7 +34,6 @@ import org.apache.flink.configuration.Configuration;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -72,14 +71,14 @@ public interface MongoVersionStrategy {
      * Determines the extraction mode and retrieves the row accordingly.
      *
      * @param jsonNode The JsonNode representing the MongoDB document.
-     * @param paimonFieldTypes A map to store the field types.
+     * @param rowTypeBuilder row type builder.
      * @param mongodbConfig Configuration for the MongoDB connection.
      * @return A map representing the extracted row.
      * @throws JsonProcessingException If there's an error during JSON 
processing.
      */
     default Map<String, String> getExtractRow(
             JsonNode jsonNode,
-            LinkedHashMap<String, DataType> paimonFieldTypes,
+            RowType.Builder rowTypeBuilder,
             List<ComputedColumn> computedColumns,
             Configuration mongodbConfig)
             throws JsonProcessingException {
@@ -105,9 +104,9 @@ public interface MongoVersionStrategy {
                         mongodbConfig.getString(PARSER_PATH),
                         mongodbConfig.getString(FIELD_NAME),
                         computedColumns,
-                        paimonFieldTypes);
+                        rowTypeBuilder);
             case DYNAMIC:
-                return parseAndTypeJsonRow(document.toString(), 
paimonFieldTypes, computedColumns);
+                return parseAndTypeJsonRow(document.toString(), 
rowTypeBuilder, computedColumns);
             default:
                 throw new RuntimeException("Unsupported extraction mode: " + 
mode);
         }
@@ -115,11 +114,9 @@ public interface MongoVersionStrategy {
 
     /** Parses and types a JSON row based on the given parameters. */
     default Map<String, String> parseAndTypeJsonRow(
-            String evaluate,
-            LinkedHashMap<String, DataType> paimonFieldTypes,
-            List<ComputedColumn> computedColumns) {
+            String evaluate, RowType.Builder rowTypeBuilder, 
List<ComputedColumn> computedColumns) {
         Map<String, String> parsedRow = JsonSerdeUtil.parseJsonMap(evaluate, 
String.class);
-        return processParsedData(parsedRow, paimonFieldTypes, computedColumns);
+        return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
     }
 
     /** Parses fields from a JSON record based on the given parameters. */
@@ -128,7 +125,7 @@ public interface MongoVersionStrategy {
             String fieldPaths,
             String fieldNames,
             List<ComputedColumn> computedColumns,
-            LinkedHashMap<String, DataType> fieldTypes) {
+            RowType.Builder rowTypeBuilder) {
         String[] columnNames = fieldNames.split(",");
         String[] parseNames = fieldPaths.split(",");
         Map<String, String> parsedRow = new HashMap<>();
@@ -138,20 +135,20 @@ public interface MongoVersionStrategy {
             parsedRow.put(columnNames[i], 
Optional.ofNullable(evaluate).orElse("{}"));
         }
 
-        return processParsedData(parsedRow, fieldTypes, computedColumns);
+        return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
     }
 
     /** Processes the parsed data to generate the result map and update field 
types. */
     static Map<String, String> processParsedData(
             Map<String, String> parsedRow,
-            LinkedHashMap<String, DataType> fieldTypes,
+            RowType.Builder rowTypeBuilder,
             List<ComputedColumn> computedColumns) {
         int initialCapacity = parsedRow.size() + computedColumns.size();
         Map<String, String> resultMap = new HashMap<>(initialCapacity);
 
         parsedRow.forEach(
                 (column, value) -> {
-                    fieldTypes.put(column, DataTypes.STRING());
+                    rowTypeBuilder.field(column, DataTypes.STRING());
                     resultMap.put(column, value);
                 });
         computedColumns.forEach(
@@ -161,7 +158,7 @@ public interface MongoVersionStrategy {
                     String computedValue = 
computedColumn.eval(parsedRow.get(fieldReference));
 
                     resultMap.put(columnName, computedValue);
-                    fieldTypes.put(columnName, computedColumn.columnType());
+                    rowTypeBuilder.field(columnName, 
computedColumn.columnType());
                 });
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 45dcaa3c2..745a7932d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -26,8 +26,10 @@ import 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
 import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
@@ -159,22 +161,18 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
         Table table = tableChange.getTable();
 
-        LinkedHashMap<String, DataType> fieldTypes = extractFieldTypes(table);
+        List<DataField> fields = extractFields(table);
         List<String> primaryKeys = 
listCaseConvert(table.primaryKeyColumnNames(), caseSensitive);
 
         // TODO : add table comment and column comment when we upgrade flink 
cdc to 2.4
         return Collections.singletonList(
                 new RichCdcMultiplexRecord(
-                        databaseName,
-                        currentTable,
-                        fieldTypes,
-                        primaryKeys,
-                        CdcRecord.emptyRecord()));
+                        databaseName, currentTable, fields, primaryKeys, 
CdcRecord.emptyRecord()));
     }
 
-    private LinkedHashMap<String, DataType> extractFieldTypes(Table table) {
+    private List<DataField> extractFields(Table table) {
+        RowType.Builder rowType = RowType.builder();
         List<Column> columns = table.columns();
-        LinkedHashMap<String, DataType> fieldTypes = new 
LinkedHashMap<>(columns.size());
         Set<String> existedFields = new HashSet<>();
         Function<String, String> columnDuplicateErrMsg =
                 columnDuplicateErrMsg(table.id().toString());
@@ -192,9 +190,9 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
                             typeMapping);
             dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || 
column.isOptional());
 
-            fieldTypes.put(columnName, dataType);
+            rowType.field(columnName, dataType);
         }
-        return fieldTypes;
+        return rowType.build().getFields();
     }
 
     private List<RichCdcMultiplexRecord> extractRecords() {
@@ -274,7 +272,7 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
         return new RichCdcMultiplexRecord(
                 databaseName,
                 currentTable,
-                new LinkedHashMap<>(0),
+                Collections.emptyList(),
                 Collections.emptyList(),
                 new CdcRecord(rowKind, data));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index de54b97ae..0299d57f7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -25,9 +25,11 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
@@ -144,7 +146,7 @@ public class PostgresRecordParser
         extractRecords().forEach(out::collect);
     }
 
-    private LinkedHashMap<String, DataType> 
extractFieldTypes(DebeziumEvent.Field schema) {
+    private List<DataField> extractFields(DebeziumEvent.Field schema) {
         Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
         Preconditions.checkArgument(
                 !afterFields.isEmpty(),
@@ -152,7 +154,7 @@ public class PostgresRecordParser
                         + "Please make sure that `includeSchema` is true "
                         + "in the JsonDebeziumDeserializationSchema you 
created");
 
-        LinkedHashMap<String, DataType> fieldTypes = new 
LinkedHashMap<>(afterFields.size());
+        RowType.Builder rowType = RowType.builder();
         Set<String> existedFields = new HashSet<>();
         Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(currentTable);
         afterFields.forEach(
@@ -166,13 +168,13 @@ public class PostgresRecordParser
                             dataType.copy(
                                     typeMapping.containsMode(TO_NULLABLE) || 
value.optional());
 
-                    fieldTypes.put(columnName, dataType);
+                    rowType.field(columnName, dataType);
                 });
-        return fieldTypes;
+        return rowType.build().getFields();
     }
 
     /**
-     * Extract field types from json records, see <a
+     * Extract fields from json records, see <a
      * 
href="https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types";>postgresql-data-types</a>.
      */
     private DataType extractFieldType(DebeziumEvent.Field field) {
@@ -246,12 +248,12 @@ public class PostgresRecordParser
         Map<String, String> after = extractRow(root.payload().after());
         if (!after.isEmpty()) {
             after = mapKeyCaseConvert(after, caseSensitive, 
recordKeyDuplicateErrMsg(after));
-            LinkedHashMap<String, DataType> fieldTypes = 
extractFieldTypes(root.schema());
+            List<DataField> fields = extractFields(root.schema());
             records.add(
                     new RichCdcMultiplexRecord(
                             databaseName,
                             currentTable,
-                            fieldTypes,
+                            fields,
                             Collections.emptyList(),
                             new CdcRecord(RowKind.INSERT, after)));
         }
@@ -398,7 +400,7 @@ public class PostgresRecordParser
         return new RichCdcMultiplexRecord(
                 databaseName,
                 currentTable,
-                new LinkedHashMap<>(0),
+                Collections.emptyList(),
                 Collections.emptyList(),
                 new CdcRecord(rowKind, data));
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 777f82a95..87aefeb58 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -107,7 +107,7 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
                         });
 
         List<DataField> schemaChange = parser.parseSchemaChange();
-        if (schemaChange.size() > 0) {
+        if (!schemaChange.isEmpty()) {
             context.output(
                     DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG,
                     Tuple2.of(Identifier.create(database, tableName), 
schemaChange));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index 18324abed..795fdb0f9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -20,7 +20,7 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.flink.action.cdc.CdcMetadataConverter;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 
 import java.io.Serializable;
 import java.util.HashSet;
@@ -60,12 +60,11 @@ public class NewTableSchemaBuilder implements Serializable {
         Set<String> existedFields = new HashSet<>();
         Function<String, String> columnDuplicateErrMsg = 
columnDuplicateErrMsg(tableName);
 
-        for (Map.Entry<String, DataType> entry : 
record.fieldTypes().entrySet()) {
+        for (DataField dataField : record.fields()) {
             String fieldName =
                     columnCaseConvertAndDuplicateCheck(
-                            entry.getKey(), existedFields, caseSensitive, 
columnDuplicateErrMsg);
-
-            builder.column(fieldName, entry.getValue());
+                            dataField.name(), existedFields, caseSensitive, 
columnDuplicateErrMsg);
+            builder.column(fieldName, dataField.type(), 
dataField.description());
         }
 
         for (CdcMetadataConverter metadataConverter : metadataConverters) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
index 0fe0c9eab..44a228e4c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -18,12 +18,11 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Objects;
 
@@ -34,19 +33,19 @@ public class RichCdcMultiplexRecord implements Serializable 
{
 
     @Nullable private final String databaseName;
     @Nullable private final String tableName;
-    private final LinkedHashMap<String, DataType> fieldTypes;
+    private final List<DataField> fields;
     private final List<String> primaryKeys;
     private final CdcRecord cdcRecord;
 
     public RichCdcMultiplexRecord(
             @Nullable String databaseName,
             @Nullable String tableName,
-            LinkedHashMap<String, DataType> fieldTypes,
+            List<DataField> fields,
             List<String> primaryKeys,
             CdcRecord cdcRecord) {
         this.databaseName = databaseName;
         this.tableName = tableName;
-        this.fieldTypes = fieldTypes;
+        this.fields = fields;
         this.primaryKeys = primaryKeys;
         this.cdcRecord = cdcRecord;
     }
@@ -61,8 +60,8 @@ public class RichCdcMultiplexRecord implements Serializable {
         return tableName;
     }
 
-    public LinkedHashMap<String, DataType> fieldTypes() {
-        return fieldTypes;
+    public List<DataField> fields() {
+        return fields;
     }
 
     public List<String> primaryKeys() {
@@ -70,12 +69,12 @@ public class RichCdcMultiplexRecord implements Serializable 
{
     }
 
     public RichCdcRecord toRichCdcRecord() {
-        return new RichCdcRecord(cdcRecord, fieldTypes);
+        return new RichCdcRecord(cdcRecord, fields);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(databaseName, tableName, fieldTypes, primaryKeys, 
cdcRecord);
+        return Objects.hash(databaseName, tableName, fields, primaryKeys, 
cdcRecord);
     }
 
     @Override
@@ -89,7 +88,7 @@ public class RichCdcMultiplexRecord implements Serializable {
         RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
         return Objects.equals(databaseName, that.databaseName)
                 && Objects.equals(tableName, that.tableName)
-                && Objects.equals(fieldTypes, that.fieldTypes)
+                && Objects.equals(fields, that.fields)
                 && Objects.equals(primaryKeys, that.primaryKeys)
                 && Objects.equals(cdcRecord, that.cdcRecord);
     }
@@ -101,8 +100,8 @@ public class RichCdcMultiplexRecord implements Serializable 
{
                 + databaseName
                 + ", tableName="
                 + tableName
-                + ", fieldTypes="
-                + fieldTypes
+                + ", fields="
+                + fields
                 + ", primaryKeys="
                 + primaryKeys
                 + ", cdcRecord="
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 23488de1e..d0298e2b9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -157,7 +157,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
 
     private boolean shouldCreateCurrentTable() {
         return shouldSynchronizeCurrentTable
-                && !record.fieldTypes().isEmpty()
+                && !record.fields().isEmpty()
                 && createdTables.add(parseTableName());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 37e9271d0..7fc0c3ff7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -19,14 +19,19 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /** A change message contains schema and data. */
 @Experimental
@@ -35,11 +40,11 @@ public class RichCdcRecord implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final CdcRecord cdcRecord;
-    private final LinkedHashMap<String, DataType> fieldTypes;
+    private final List<DataField> fields;
 
-    public RichCdcRecord(CdcRecord cdcRecord, LinkedHashMap<String, DataType> 
fieldTypes) {
+    public RichCdcRecord(CdcRecord cdcRecord, List<DataField> fields) {
         this.cdcRecord = cdcRecord;
-        this.fieldTypes = fieldTypes;
+        this.fields = fields;
     }
 
     public boolean hasPayload() {
@@ -50,8 +55,8 @@ public class RichCdcRecord implements Serializable {
         return cdcRecord.kind();
     }
 
-    public LinkedHashMap<String, DataType> fieldTypes() {
-        return fieldTypes;
+    public List<DataField> fields() {
+        return fields;
     }
 
     public CdcRecord toCdcRecord() {
@@ -67,42 +72,49 @@ public class RichCdcRecord implements Serializable {
             return false;
         }
         RichCdcRecord that = (RichCdcRecord) o;
-        return cdcRecord == that.cdcRecord && Objects.equals(fieldTypes, 
that.fieldTypes);
+        return cdcRecord == that.cdcRecord && Objects.equals(fields, 
that.fields);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(cdcRecord, fieldTypes);
+        return Objects.hash(cdcRecord, fields);
     }
 
     @Override
     public String toString() {
-        return "{" + "cdcRecord=" + cdcRecord + ", fieldTypes=" + fieldTypes + 
'}';
+        return "{" + "cdcRecord=" + cdcRecord + ", fields=" + fields + '}';
     }
 
     public static Builder builder(RowKind kind) {
-        return new Builder(kind);
+        return new Builder(kind, new AtomicInteger(-1));
     }
 
     /** Builder for {@link RichCdcRecord}. */
     public static class Builder {
 
         private final RowKind kind;
-        private final LinkedHashMap<String, DataType> fieldTypes = new 
LinkedHashMap<>();
+        private final AtomicInteger fieldId;
+        private final List<DataField> fields = new ArrayList<>();
         private final Map<String, String> fieldValues = new HashMap<>();
 
-        public Builder(RowKind kind) {
+        public Builder(RowKind kind, AtomicInteger fieldId) {
             this.kind = kind;
+            this.fieldId = fieldId;
         }
 
         public Builder field(String name, DataType type, String value) {
-            fieldTypes.put(name, type);
+            return field(name, type, value, null);
+        }
+
+        public Builder field(
+                String name, DataType type, String value, @Nullable String 
description) {
+            fields.add(new DataField(fieldId.incrementAndGet(), name, type, 
description));
             fieldValues.put(name, value);
             return this;
         }
 
         public RichCdcRecord build() {
-            return new RichCdcRecord(new CdcRecord(kind, fieldValues), 
fieldTypes);
+            return new RichCdcRecord(new CdcRecord(kind, fieldValues), fields);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
index c98ead49f..756875781 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -32,7 +31,7 @@ public class RichEventParser implements 
EventParser<RichCdcRecord> {
 
     private RichCdcRecord record;
 
-    private final LinkedHashMap<String, DataType> previousDataFields = new 
LinkedHashMap<>();
+    private final LinkedHashMap<String, DataField> previousDataFields = new 
LinkedHashMap<>();
 
     @Override
     public void setRawEvent(RichCdcRecord rawEvent) {
@@ -42,13 +41,13 @@ public class RichEventParser implements 
EventParser<RichCdcRecord> {
     @Override
     public List<DataField> parseSchemaChange() {
         List<DataField> change = new ArrayList<>();
-        record.fieldTypes()
+        record.fields()
                 .forEach(
-                        (field, type) -> {
-                            DataType previous = previousDataFields.get(field);
-                            if (!Objects.equals(previous, type)) {
-                                previousDataFields.put(field, type);
-                                change.add(new DataField(0, field, type));
+                        dataField -> {
+                            DataField previous = 
previousDataFields.get(dataField.name());
+                            if (!Objects.equals(previous, dataField)) {
+                                previousDataFields.put(dataField.name(), 
dataField);
+                                change.add(dataField);
                             }
                         });
         return change;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 3c1d6e15b..bc31a05e2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -40,7 +40,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 /** Base class for update data fields process function. */
 public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends 
ProcessFunction<I, O> {
@@ -193,13 +192,17 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                 // we compare by ignoring nullable, because partition keys and 
primary keys might be
                 // nullable in source database, but they can't be null in 
Paimon
                 if (oldField.type().equalsIgnoreNullable(newField.type())) {
-                    if (!Objects.equals(oldField.description(), 
newField.description())) {
+                    // update column comment
+                    if (newField.description() != null
+                            && 
!newField.description().equals(oldField.description())) {
                         result.add(
                                 SchemaChange.updateColumnComment(
                                         new String[] {newField.name()}, 
newField.description()));
                     }
                 } else {
+                    // update column type
                     result.add(SchemaChange.updateColumnType(newField.name(), 
newField.type()));
+                    // update column comment
                     if (newField.description() != null) {
                         result.add(
                                 SchemaChange.updateColumnComment(
@@ -207,6 +210,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                     }
                 }
             } else {
+                // add column
                 result.add(
                         SchemaChange.addColumn(
                                 newField.name(), newField.type(), 
newField.description(), null));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index fa6bd9301..96070048b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -573,7 +573,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                                 IllegalArgumentException.class,
                                 "Cannot synchronize record when database name 
or table name is unknown. "
                                         + "Invalid record is:\n"
-                                        + "{databaseName=null, tableName=null, 
fieldTypes={k=STRING, v0=STRING, v1=STRING}, "
+                                        + "{databaseName=null, tableName=null, 
fields=[`k` STRING, `v0` STRING, `v1` STRING], "
                                         + "primaryKeys=[], cdcRecord=+I 
{v0=five, k=5, v1=50}}"));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 67358f848..58ed2098e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
@@ -1324,4 +1325,43 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         FileStoreTable table = getFileStoreTable();
         assertThat(table.options().get(BUCKET.key())).isEqualTo("1");
     }
+
+    @Test
+    @Timeout(60)
+    public void testColumnCommentChangeInExistingTable() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put("bucket", "1");
+        options.put("sink.parallelism", "1");
+
+        RowType rowType =
+                RowType.builder()
+                        .field("pk", DataTypes.INT().notNull(), "pk comment")
+                        .field("c1", DataTypes.DATE(), "c1 comment")
+                        .field("c2", DataTypes.VARCHAR(10).notNull(), "c2 
comment")
+                        .build();
+
+        createFileStoreTable(
+                rowType, Collections.emptyList(), 
Collections.singletonList("pk"), options);
+
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "test_exist_column_comment_change");
+
+        // Flink cdc 2.3 does not support collecting field comments, and 
existing paimon table field
+        // comments will not be changed.
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withPrimaryKeys("pk")
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        FileStoreTable table = getFileStoreTable();
+        Map<String, DataField> actual =
+                table.schema().fields().stream()
+                        .collect(Collectors.toMap(DataField::name, 
Function.identity()));
+        assertThat(actual.get("pk").description()).isEqualTo("pk comment");
+        assertThat(actual.get("c1").description()).isEqualTo("c1 comment");
+        assertThat(actual.get("c2").description()).isEqualTo("c2 comment");
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index b69661b60..5cf9cc1d9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -316,6 +316,13 @@ CREATE TABLE test_exist_options_change (
     PRIMARY KEY (pk)
 );
 
+CREATE TABLE test_exist_column_comment_change (
+    pk INT,
+    c1 DATE,
+    c2 VARCHAR(10) not null comment 'c2 comment',
+    PRIMARY KEY (pk)
+);
+
 -- 
################################################################################
 --  testSyncShard
 -- 
################################################################################

Reply via email to