This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new de458dcf9 [flink] MySql CDC can now deal with multiple changes in one 
ALTER TABLE statement (#912)
de458dcf9 is described below

commit de458dcf99a31b98ace66fc0b95c6ea75f0fed39
Author: tsreaper <[email protected]>
AuthorDate: Mon Apr 17 19:20:26 2023 +0800

    [flink] MySql CDC can now deal with multiple changes in one ALTER TABLE 
statement (#912)
---
 .../flink/action/cdc/mysql/MySqlActionUtils.java   |   5 +-
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    |  71 +++++++------
 .../paimon/flink/action/cdc/mysql/MySqlSchema.java |  28 ++---
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |  16 ---
 .../cdc/CdcMultiTableParsingProcessFunction.java   |  29 +++---
 .../flink/sink/cdc/CdcParsingProcessFunction.java  |  21 ++--
 .../apache/paimon/flink/sink/cdc/EventParser.java  |   9 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |   6 +-
 .../sink/cdc/FlinkCdcSyncTableSinkBuilder.java     |   4 +-
 ....java => UpdatedDataFieldsProcessFunction.java} | 116 +++++++++++++++++----
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  93 +++++++++++++++++
 .../sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java   |   2 +-
 .../apache/paimon/flink/sink/cdc/TestCdcEvent.java |  18 ++--
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |  12 +--
 .../apache/paimon/flink/sink/cdc/TestTable.java    |  30 ++++--
 .../src/test/resources/mysql/setup.sql             |   9 +-
 16 files changed, 324 insertions(+), 145 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
index 11aca6b06..bbbaaeb26 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlActionUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
-import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.DataType;
@@ -72,7 +72,8 @@ class MySqlActionUtils {
                 return false;
             }
             DataType type = tableSchema.fields().get(idx).type();
-            if (!SchemaChangeProcessFunction.canConvert(entry.getValue(), 
type)) {
+            if (UpdatedDataFieldsProcessFunction.canConvert(entry.getValue(), 
type)
+                    != UpdatedDataFieldsProcessFunction.ConvertAction.CONVERT) 
{
                 return false;
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 4855eda4d..83f98b0e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -20,10 +20,11 @@ package org.apache.paimon.flink.action.cdc.mysql;
 
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.EventParser;
-import org.apache.paimon.schema.SchemaChange;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
@@ -38,12 +39,10 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Base64;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.Optional;
 
 /**
  * {@link EventParser} for MySQL Debezium JSON.
@@ -55,13 +54,8 @@ import java.util.regex.Pattern;
 public class MySqlDebeziumJsonEventParser implements EventParser<String> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlDebeziumJsonEventParser.class);
-    private static final String SCHEMA_CHANGE_REGEX =
-            
"ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP|MODIFY)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s\\(]+))?\\s*(\\((.*?)\\))?.*";
 
     private final ObjectMapper objectMapper = new ObjectMapper();
-    private final Pattern schemaChangePattern =
-            Pattern.compile(SCHEMA_CHANGE_REGEX, Pattern.CASE_INSENSITIVE);
-
     private final ZoneId serverTimeZone;
 
     private JsonNode payload;
@@ -88,7 +82,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                     + "in the 
JsonDebeziumDeserializationSchema you created");
             payload = root.get("payload");
 
-            if (!isSchemaChange()) {
+            if (!isUpdatedDataFields()) {
                 updateFieldTypes(schema);
             }
         } catch (Exception e) {
@@ -125,45 +119,54 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     }
 
     @Override
-    public boolean isSchemaChange() {
+    public boolean isUpdatedDataFields() {
         return payload.get("op") == null;
     }
 
     @Override
-    public List<SchemaChange> getSchemaChanges() {
+    public Optional<List<DataField>> getUpdatedDataFields() {
         JsonNode historyRecord = payload.get("historyRecord");
         if (historyRecord == null) {
-            return Collections.emptyList();
+            return Optional.empty();
         }
 
-        JsonNode ddlNode;
+        JsonNode columns;
         try {
-            ddlNode = objectMapper.readTree(historyRecord.asText()).get("ddl");
+            String historyRecordString = historyRecord.asText();
+            JsonNode tableChanges = 
objectMapper.readTree(historyRecordString).get("tableChanges");
+            if (tableChanges.size() != 1) {
+                throw new IllegalArgumentException(
+                        "Invalid historyRecord, because tableChanges should 
contain exactly 1 item.\n"
+                                + historyRecordString);
+            }
+            columns = tableChanges.get(0).get("table").get("columns");
         } catch (Exception e) {
-            LOG.debug("Failed to parse history record for schema changes", e);
-            return Collections.emptyList();
+            LOG.info("Failed to parse history record for schema changes", e);
+            return Optional.empty();
         }
-        if (ddlNode == null) {
-            return Collections.emptyList();
+        if (columns == null) {
+            return Optional.empty();
         }
-        String ddl = ddlNode.asText();
 
-        Matcher matcher = schemaChangePattern.matcher(ddl);
-        if (matcher.find()) {
-            String op = matcher.group(1);
-            String column = matcher.group(3);
-            String type = matcher.group(5);
-            String len = matcher.group(7);
-            if ("add".equalsIgnoreCase(op)) {
-                return Collections.singletonList(
-                        SchemaChange.addColumn(column, 
MySqlTypeUtils.toDataType(type, len)));
-            } else if ("modify".equalsIgnoreCase(op)) {
-                return Collections.singletonList(
-                        SchemaChange.updateColumnType(
-                                column, MySqlTypeUtils.toDataType(type, len)));
+        List<DataField> result = new ArrayList<>();
+        for (int i = 0; i < columns.size(); i++) {
+            JsonNode column = columns.get(i);
+            JsonNode length = column.get("length");
+            JsonNode scale = column.get("scale");
+            DataType type =
+                    MySqlTypeUtils.toDataType(
+                            column.get("typeName").asText(),
+                            length == null ? null : length.asInt(),
+                            scale == null ? null : scale.asInt());
+            if (column.get("optional").asBoolean()) {
+                type = type.nullable();
+            } else {
+                type = type.notNull();
             }
+
+            result.add(new DataField(i, column.get("name").asText(), type));
         }
-        return Collections.emptyList();
+        return Optional.of(result);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
index fc06a1efb..a9b5b6a44 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSchema.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
-import org.apache.paimon.flink.sink.cdc.SchemaChangeProcessFunction;
+import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.types.DataType;
 
 import java.sql.DatabaseMetaData;
@@ -86,19 +86,19 @@ public class MySqlSchema {
             DataType newType = entry.getValue();
             if (fields.containsKey(fieldName)) {
                 DataType oldType = fields.get(fieldName);
-                if (SchemaChangeProcessFunction.canConvert(oldType, newType)) {
-                    fields.put(fieldName, newType);
-                } else if (SchemaChangeProcessFunction.canConvert(newType, 
oldType)) {
-                    // nothing to do
-                } else {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "Column %s have different types in table 
%s.%s and table %s.%s",
-                                    fieldName,
-                                    databaseName,
-                                    tableName,
-                                    other.databaseName,
-                                    other.tableName));
+                switch (UpdatedDataFieldsProcessFunction.canConvert(oldType, 
newType)) {
+                    case CONVERT:
+                        fields.put(fieldName, newType);
+                        break;
+                    case EXCEPTION:
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Column %s have different types in 
table %s.%s and table %s.%s",
+                                        fieldName,
+                                        databaseName,
+                                        tableName,
+                                        other.databaseName,
+                                        other.tableName));
                 }
             } else {
                 fields.put(fieldName, newType);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index d508852af..1c1f813db 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -25,9 +25,6 @@ import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Converts from MySQL type to {@link DataType}.
  *
@@ -106,19 +103,6 @@ public class MySqlTypeUtils {
     // The base length of a timestamp is 19, for example "2023-03-23 17:20:00".
     private static final int JDBC_TIMESTAMP_BASE_LENGTH = 19;
 
-    public static DataType toDataType(String type, String params) {
-        List<Integer> paramList = new ArrayList<>();
-        if (params != null) {
-            for (String s : params.split(",")) {
-                paramList.add(Integer.parseInt(s.trim()));
-            }
-        }
-        return toDataType(
-                type,
-                paramList.size() > 0 ? paramList.get(0) : null,
-                paramList.size() > 1 ? paramList.get(1) : null);
-    }
-
     public static DataType toDataType(
             String type, @Nullable Integer length, @Nullable Integer scale) {
         switch (type.toUpperCase()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index 36e61b443..815513ac2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -18,20 +18,22 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
- * A {@link ProcessFunction} to parse CDC change event to either {@link 
SchemaChange} or {@link
- * CdcRecord} and send them to different side outputs according to table name.
+ * A {@link ProcessFunction} to parse CDC change event to either a list of 
{@link DataField}s or
+ * {@link CdcRecord} and send them to different side outputs according to 
table name.
  *
  * <p>This {@link ProcessFunction} can handle records for different tables at 
the same time.
  *
@@ -42,7 +44,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
     private final EventParser.Factory<T> parserFactory;
 
     private transient EventParser<T> parser;
-    private transient Map<String, OutputTag<SchemaChange>> 
schemaChangeOutputTags;
+    private transient Map<String, OutputTag<List<DataField>>> 
updatedDataFieldsOutputTags;
     private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
 
     public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> 
parserFactory) {
@@ -52,7 +54,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
     @Override
     public void open(Configuration parameters) throws Exception {
         parser = parserFactory.create();
-        schemaChangeOutputTags = new HashMap<>();
+        updatedDataFieldsOutputTags = new HashMap<>();
         recordOutputTags = new HashMap<>();
     }
 
@@ -61,10 +63,9 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
         parser.setRawEvent(raw);
         String tableName = parser.tableName();
 
-        if (parser.isSchemaChange()) {
-            for (SchemaChange schemaChange : parser.getSchemaChanges()) {
-                context.output(getSchemaChangeOutputTag(tableName), 
schemaChange);
-            }
+        if (parser.isUpdatedDataFields()) {
+            parser.getUpdatedDataFields()
+                    .ifPresent(t -> 
context.output(getUpdatedDataFieldsOutputTag(tableName), t));
         } else {
             for (CdcRecord record : parser.getRecords()) {
                 context.output(getRecordOutputTag(tableName), record);
@@ -72,14 +73,14 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
         }
     }
 
-    private OutputTag<SchemaChange> getSchemaChangeOutputTag(String tableName) 
{
-        return schemaChangeOutputTags.computeIfAbsent(
-                tableName, 
CdcMultiTableParsingProcessFunction::createSchemaChangeOutputTag);
+    private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String 
tableName) {
+        return updatedDataFieldsOutputTags.computeIfAbsent(
+                tableName, 
CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
     }
 
-    public static OutputTag<SchemaChange> createSchemaChangeOutputTag(String 
tableName) {
+    public static OutputTag<List<DataField>> 
createUpdatedDataFieldsOutputTag(String tableName) {
         return new OutputTag<>(
-                "schema-change-" + tableName, 
TypeInformation.of(SchemaChange.class));
+                "new-data-field-list-" + tableName, new 
ListTypeInfo<>(DataField.class));
     }
 
     private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index ba3f97cad..b40c160dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -18,17 +18,19 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.util.List;
+
 /**
- * A {@link ProcessFunction} to parse CDC change event to either {@link 
SchemaChange} or {@link
- * CdcRecord} and send them to different downstreams.
+ * A {@link ProcessFunction} to parse CDC change event to either a list of 
{@link DataField}s or
+ * {@link CdcRecord} and send them to different downstreams.
  *
  * <p>This {@link ProcessFunction} can only handle records for a single 
constant table. To handle
  * records for different tables, see {@link 
CdcMultiTableParsingProcessFunction}.
@@ -37,8 +39,8 @@ import org.apache.flink.util.OutputTag;
  */
 public class CdcParsingProcessFunction<T> extends ProcessFunction<T, 
CdcRecord> {
 
-    public static final OutputTag<SchemaChange> SCHEMA_CHANGE_OUTPUT_TAG =
-            new OutputTag<>("schema-change", 
TypeInformation.of(SchemaChange.class));
+    public static final OutputTag<List<DataField>> 
NEW_DATA_FIELD_LIST_OUTPUT_TAG =
+            new OutputTag<>("new-data-field-list", new 
ListTypeInfo<>(DataField.class));
 
     private final EventParser.Factory<T> parserFactory;
 
@@ -57,10 +59,9 @@ public class CdcParsingProcessFunction<T> extends 
ProcessFunction<T, CdcRecord>
     public void processElement(T raw, Context context, Collector<CdcRecord> 
collector)
             throws Exception {
         parser.setRawEvent(raw);
-        if (parser.isSchemaChange()) {
-            for (SchemaChange schemaChange : parser.getSchemaChanges()) {
-                context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
-            }
+        if (parser.isUpdatedDataFields()) {
+            parser.getUpdatedDataFields()
+                    .ifPresent(t -> 
context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, t));
         } else {
             for (CdcRecord record : parser.getRecords()) {
                 collector.collect(record);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index e4f192d1b..161eb5d1d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -18,13 +18,14 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Optional;
 
 /**
- * Parse a CDC change event to a list of {@link SchemaChange} or {@link 
CdcRecord}.
+ * Parse a CDC change event to a list of {@link DataField}s or {@link 
CdcRecord}.
  *
  * @param <T> CDC change event type
  */
@@ -34,9 +35,9 @@ public interface EventParser<T> {
 
     String tableName();
 
-    boolean isSchemaChange();
+    boolean isUpdatedDataFields();
 
-    List<SchemaChange> getSchemaChanges();
+    Optional<List<DataField>> getUpdatedDataFields();
 
     List<CdcRecord> getRecords();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index e48f929bc..b62992d08 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -94,10 +94,10 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
             DataStream<Void> schemaChangeProcessFunction =
                     SingleOutputStreamOperatorUtils.getSideOutput(
                                     parsed,
-                                    
CdcMultiTableParsingProcessFunction.createSchemaChangeOutputTag(
-                                            table.name()))
+                                    CdcMultiTableParsingProcessFunction
+                                            
.createUpdatedDataFieldsOutputTag(table.name()))
                             .process(
-                                    new SchemaChangeProcessFunction(
+                                    new UpdatedDataFieldsProcessFunction(
                                             new SchemaManager(table.fileIO(), 
table.location())));
             schemaChangeProcessFunction.getTransformation().setParallelism(1);
             
schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
index 7c770b045..6847f5405 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkBuilder.java
@@ -84,9 +84,9 @@ public class FlinkCdcSyncTableSinkBuilder<T> {
 
         DataStream<Void> schemaChangeProcessFunction =
                 SingleOutputStreamOperatorUtils.getSideOutput(
-                                parsed, 
CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
+                                parsed, 
CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG)
                         .process(
-                                new SchemaChangeProcessFunction(
+                                new UpdatedDataFieldsProcessFunction(
                                         new SchemaManager(table.fileIO(), 
table.location())));
         schemaChangeProcessFunction.getTransformation().setParallelism(1);
         schemaChangeProcessFunction.getTransformation().setMaxParallelism(1);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
similarity index 55%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
rename to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 4c94444f0..cce1ee7e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/SchemaChangeProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -22,9 +22,11 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -32,29 +34,77 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
- * A {@link ProcessFunction} to handle {@link SchemaChange}.
+ * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a list of {@link
+ * DataField}s.
  *
  * <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link 
ProcessFunction} must
  * be 1.
  */
-public class SchemaChangeProcessFunction extends ProcessFunction<SchemaChange, 
Void> {
+public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataField>, Void> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaChangeProcessFunction.class);
+    private static final Logger LOG =
+            LoggerFactory.getLogger(UpdatedDataFieldsProcessFunction.class);
 
     private final SchemaManager schemaManager;
 
-    public SchemaChangeProcessFunction(SchemaManager schemaManager) {
+    public UpdatedDataFieldsProcessFunction(SchemaManager schemaManager) {
         this.schemaManager = schemaManager;
     }
 
     @Override
     public void processElement(
-            SchemaChange schemaChange, Context context, Collector<Void> 
collector)
+            List<DataField> updatedDataFields, Context context, 
Collector<Void> collector)
             throws Exception {
+        for (SchemaChange schemaChange : 
extractSchemaChanges(updatedDataFields)) {
+            applySchemaChange(schemaChange);
+        }
+    }
+
+    private List<SchemaChange> extractSchemaChanges(List<DataField> 
updatedDataFields) {
+        RowType oldRowType = schemaManager.latest().get().logicalRowType();
+        Map<String, DataField> oldFields = new HashMap<>();
+        for (DataField oldField : oldRowType.getFields()) {
+            oldFields.put(oldField.name(), oldField);
+        }
+
+        List<SchemaChange> result = new ArrayList<>();
+        for (DataField newField : updatedDataFields) {
+            if (oldFields.containsKey(newField.name())) {
+                DataField oldField = oldFields.get(newField.name());
+                // we compare by ignoring nullable, because partition keys and 
primary keys might be
+                // nullable in source database, but they can't be null in 
Paimon
+                if (oldField.type().equalsIgnoreNullable(newField.type())) {
+                    if (!Objects.equals(oldField.description(), 
newField.description())) {
+                        result.add(
+                                SchemaChange.updateColumnComment(
+                                        new String[] {newField.name()}, 
newField.description()));
+                    }
+                } else {
+                    result.add(SchemaChange.updateColumnType(newField.name(), 
newField.type()));
+                    if (newField.description() != null) {
+                        result.add(
+                                SchemaChange.updateColumnComment(
+                                        new String[] {newField.name()}, 
newField.description()));
+                    }
+                }
+            } else {
+                result.add(
+                        SchemaChange.addColumn(
+                                newField.name(), newField.type(), 
newField.description(), null));
+            }
+        }
+        return result;
+    }
+
+    private void applySchemaChange(SchemaChange schemaChange) throws Exception 
{
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
                 schemaManager.commitChanges(schemaChange);
@@ -88,18 +138,18 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
                             + " does not exist in table. This is unexpected.");
             DataType oldType = schema.fields().get(idx).type();
             DataType newType = updateColumnType.newDataType();
-            if (canConvert(oldType, newType)) {
-                schemaManager.commitChanges(schemaChange);
-            } else {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Cannot convert field %s from type %s to %s",
-                                updateColumnType.fieldName(), oldType, 
newType));
+            switch (canConvert(oldType, newType)) {
+                case CONVERT:
+                    schemaManager.commitChanges(schemaChange);
+                    break;
+                case EXCEPTION:
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Cannot convert field %s from type %s to 
%s",
+                                    updateColumnType.fieldName(), oldType, 
newType));
             }
         } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
             schemaManager.commitChanges(schemaChange);
-        } else if (schemaChange instanceof 
SchemaChange.UpdateColumnNullability) {
-            schemaManager.commitChanges(schemaChange);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported schema change class "
@@ -122,31 +172,57 @@ public class SchemaChangeProcessFunction extends 
ProcessFunction<SchemaChange, V
     private static final List<DataTypeRoot> FLOATING_POINT_TYPES =
             Arrays.asList(DataTypeRoot.FLOAT, DataTypeRoot.DOUBLE);
 
-    public static boolean canConvert(DataType oldType, DataType newType) {
+    public static ConvertAction canConvert(DataType oldType, DataType newType) 
{
         int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
         int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
-            return DataTypeChecks.getLength(oldType) <= 
DataTypeChecks.getLength(newType);
+            return DataTypeChecks.getLength(oldType) <= 
DataTypeChecks.getLength(newType)
+                    ? ConvertAction.CONVERT
+                    : ConvertAction.IGNORE;
         }
 
         oldIdx = BINARY_TYPES.indexOf(oldType.getTypeRoot());
         newIdx = BINARY_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
-            return DataTypeChecks.getLength(oldType) <= 
DataTypeChecks.getLength(newType);
+            return DataTypeChecks.getLength(oldType) <= 
DataTypeChecks.getLength(newType)
+                    ? ConvertAction.CONVERT
+                    : ConvertAction.IGNORE;
         }
 
         oldIdx = INTEGER_TYPES.indexOf(oldType.getTypeRoot());
         newIdx = INTEGER_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
-            return oldIdx <= newIdx;
+            return oldIdx <= newIdx ? ConvertAction.CONVERT : 
ConvertAction.IGNORE;
         }
 
         oldIdx = FLOATING_POINT_TYPES.indexOf(oldType.getTypeRoot());
         newIdx = FLOATING_POINT_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
-            return oldIdx <= newIdx;
+            return oldIdx <= newIdx ? ConvertAction.CONVERT : 
ConvertAction.IGNORE;
         }
 
-        return false;
+        return ConvertAction.EXCEPTION;
+    }
+
+    /**
+     * Return type of {@link 
UpdatedDataFieldsProcessFunction#canConvert(DataType, DataType)}. This
+     * enum indicates the action to perform.
+     */
+    public enum ConvertAction {
+
+        /** {@code oldType} can be converted to {@code newType}. */
+        CONVERT,
+
+        /**
+         * {@code oldType} and {@code newType} belongs to the same type 
family, but old type has
+         * higher precision than new type. Ignore this convert request.
+         */
+        IGNORE,
+
+        /**
+         * {@code oldType} and {@code newType} belongs to different type 
family. Throw an exception
+         * indicating that this convert request cannot be handled.
+         */
+        EXCEPTION
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index ed0e9fba7..44f6a5f83 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -243,6 +243,99 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, primaryKeys);
     }
 
+    @Test
+    @Timeout(60)
+    public void testMultipleSchemaEvolutions() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "schema_evolution_multiple");
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(1000);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        MySqlSyncTableAction action =
+                new MySqlSyncTableAction(
+                        mySqlConfig,
+                        warehouse,
+                        database,
+                        tableName,
+                        Collections.emptyList(),
+                        Collections.singletonList("_id"),
+                        new HashMap<>());
+        action.build(env);
+        JobClient client = env.executeAsync();
+
+        while (true) {
+            JobStatus status = client.getJobStatus().get();
+            if (status == JobStatus.RUNNING) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+
+        try (Connection conn =
+                DriverManager.getConnection(
+                        MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME),
+                        MYSQL_CONTAINER.getUsername(),
+                        MYSQL_CONTAINER.getPassword())) {
+            try (Statement statement = conn.createStatement()) {
+                testSchemaEvolutionMultipleImpl(statement);
+            }
+        }
+    }
+
+    private void testSchemaEvolutionMultipleImpl(Statement statement) throws 
Exception {
+        FileStoreTable table = getFileStoreTable();
+        statement.executeUpdate("USE paimon_sync_table");
+
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_multiple VALUES (1, 'one', 10, 
'string_1')");
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.INT(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"_id", "v1", "v2", "v3"});
+        List<String> primaryKeys = Collections.singletonList("_id");
+        List<String> expected = Collections.singletonList("+I[1, one, 10, 
string_1]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate(
+                "ALTER TABLE schema_evolution_multiple "
+                        + "ADD v4 INT, "
+                        + "MODIFY COLUMN v1 VARCHAR(20), "
+                        // I'd love to change COMMENT to DEFAULT
+                        // however debezium parser seems to have a bug here
+                        + "ADD COLUMN (v5 DOUBLE, v6 DECIMAL(5, 3), `$% ^,& 
*(` VARCHAR(10) COMMENT 'Hi, v700 DOUBLE \\', v701 INT a test'), "
+                        + "MODIFY v2 BIGINT");
+        statement.executeUpdate(
+                "INSERT INTO schema_evolution_multiple VALUES "
+                        + "(2, 'long_string_two', 2000000000000, 'string_2', 
20, 20.5, 20.002, 'test_2')");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(20),
+                            DataTypes.BIGINT(),
+                            DataTypes.VARCHAR(10),
+                            DataTypes.INT(),
+                            DataTypes.DOUBLE(),
+                            DataTypes.DECIMAL(5, 3),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"_id", "v1", "v2", "v3", "v4", "v5", 
"v6", "$% ^,& *("});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]",
+                        "+I[2, long_string_two, 2000000000000, string_2, 20, 
20.5, 20.002, test_2]");
+        waitForResult(expected, table, rowType, primaryKeys);
+    }
+
     @Test
     @Timeout(30)
     public void testAllTypes() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
index 5eab8d1e5..6d8e93daf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkITCase.java
@@ -69,7 +69,7 @@ public class FlinkCdcSyncDatabaseSinkITCase extends 
AbstractTestBase {
         int numTables = random.nextInt(3) + 1;
         boolean enableFailure = random.nextBoolean();
 
-        int maxEvents = 1500;
+        int maxEvents = 1000;
         int maxSchemaChanges = 10;
         int maxPartitions = 3;
         int maxKeys = 150;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index f7a933951..01d766edf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
 
 import java.io.Serializable;
 import java.util.List;
@@ -29,20 +29,20 @@ public class TestCdcEvent implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String tableName;
-    private final SchemaChange schemaChange;
+    private final List<DataField> updatedDataFields;
     private final List<CdcRecord> records;
     private final int keyHash;
 
-    public TestCdcEvent(String tableName, SchemaChange schemaChange) {
+    public TestCdcEvent(String tableName, List<DataField> updatedDataFields) {
         this.tableName = tableName;
-        this.schemaChange = schemaChange;
+        this.updatedDataFields = updatedDataFields;
         this.records = null;
         this.keyHash = 0;
     }
 
     public TestCdcEvent(String tableName, List<CdcRecord> records, int 
keyHash) {
         this.tableName = tableName;
-        this.schemaChange = null;
+        this.updatedDataFields = null;
         this.records = records;
         this.keyHash = keyHash;
     }
@@ -51,8 +51,8 @@ public class TestCdcEvent implements Serializable {
         return tableName;
     }
 
-    public SchemaChange schemaChange() {
-        return schemaChange;
+    public List<DataField> updatedDataFields() {
+        return updatedDataFields;
     }
 
     public List<CdcRecord> records() {
@@ -67,7 +67,7 @@ public class TestCdcEvent implements Serializable {
     @Override
     public String toString() {
         return String.format(
-                "{tableName = %s, schemChange = %s, records = %s}",
-                tableName, schemaChange, records);
+                "{tableName = %s, updatedDataFields = %s, records = %s}",
+                tableName, updatedDataFields, records);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 9772d3292..ab03ea1f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,10 +18,10 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.types.DataField;
 
-import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 /** Testing {@link EventParser} for {@link TestCdcEvent}. */
 public class TestCdcEventParser implements EventParser<TestCdcEvent> {
@@ -39,13 +39,13 @@ public class TestCdcEventParser implements 
EventParser<TestCdcEvent> {
     }
 
     @Override
-    public boolean isSchemaChange() {
-        return raw.schemaChange() != null;
+    public boolean isUpdatedDataFields() {
+        return raw.updatedDataFields() != null;
     }
 
     @Override
-    public List<SchemaChange> getSchemaChanges() {
-        return Collections.singletonList(raw.schemaChange());
+    public Optional<List<DataField>> getUpdatedDataFields() {
+        return Optional.ofNullable(raw.updatedDataFields());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index f5be3dca3..650cf3ccf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -19,8 +19,8 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -102,19 +102,12 @@ public class TestTable {
                 if (random.nextBoolean()) {
                     int idx = random.nextInt(fieldNames.size());
                     isBigInt.set(idx, true);
-                    events.add(
-                            new TestCdcEvent(
-                                    tableName,
-                                    SchemaChange.updateColumnType(
-                                            fieldNames.get(idx), 
DataTypes.BIGINT())));
                 } else {
                     String newName = "v" + fieldNames.size();
                     fieldNames.add(newName);
                     isBigInt.add(false);
-                    events.add(
-                            new TestCdcEvent(
-                                    tableName, SchemaChange.addColumn(newName, 
DataTypes.INT())));
                 }
+                events.add(new TestCdcEvent(tableName, 
currentDataFieldList(fieldNames, isBigInt)));
             } else {
                 Map<String, String> fields = new HashMap<>();
                 int key = random.nextInt(numKeys);
@@ -142,6 +135,25 @@ public class TestTable {
         }
     }
 
+    private List<DataField> currentDataFieldList(List<String> fieldNames, 
List<Boolean> isBigInt) {
+        List<DataField> fields = new ArrayList<>();
+
+        // pt
+        fields.add(initialRowType.getFields().get(0));
+        // k
+        fields.add(initialRowType.getFields().get(1));
+
+        for (int i = 0; i < fieldNames.size(); i++) {
+            fields.add(
+                    new DataField(
+                            2 + i,
+                            fieldNames.get(i),
+                            isBigInt.get(i) ? DataTypes.BIGINT() : 
DataTypes.INT()));
+        }
+
+        return fields;
+    }
+
     public RowType initialRowType() {
         return initialRowType;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql 
b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
index 902dd9d4c..f2e125c89 100644
--- a/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
+++ b/paimon-flink/paimon-flink-common/src/test/resources/mysql/setup.sql
@@ -41,7 +41,14 @@ CREATE TABLE schema_evolution_2 (
     PRIMARY KEY (_id)
 );
 
--- add comment lines for the convenience of reading
+CREATE TABLE schema_evolution_multiple (
+    _id INT,
+    v1 VARCHAR(10),
+    v2 INT,
+    v3 VARCHAR(10),
+    PRIMARY KEY (_id)
+);
+
 CREATE TABLE all_types_table (
     _id INT,
     -- TINYINT


Reply via email to