This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0537dbe66 [INLONG-7553][Sort] Mysql CDC support output DDL in all 
migrate mode (#7579)
0537dbe66 is described below

commit 0537dbe66f4f433c2edfcb160e9d2d5117cc7045
Author: Schnapps <[email protected]>
AuthorDate: Thu Mar 30 20:09:03 2023 +0800

    [INLONG-7553][Sort] Mysql CDC support output DDL in all migrate mode (#7579)
---
 .../org/apache/inlong/sort/base/Constants.java     |  2 +
 .../table/RowDataDebeziumDeserializeSchema.java    | 79 ++++++++++++++++------
 .../inlong/sort/cdc/base/util/RecordUtils.java     |  7 ++
 .../mysql/source/config/MySqlSourceOptions.java    |  6 ++
 .../mysql/source/reader/MySqlRecordEmitter.java    | 20 ++++--
 .../cdc/mysql/table/MySqlReadableMetadata.java     | 23 +++++--
 .../mysql/table/MySqlTableInlongSourceFactory.java |  4 ++
 .../sort/cdc/mysql/table/MySqlTableSource.java     | 68 ++-----------------
 .../apache/inlong/sort/parser/AllMigrateTest.java  |  1 +
 .../inlong/sort/formats/json/canal/CanalJson.java  |  2 +
 10 files changed, 119 insertions(+), 93 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index ac2a7c3a9..702329e6c 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -134,6 +134,8 @@ public final class Constants {
 
     public static final String AUTO_DESERIALIZE_FALSE = 
"autoDeserialize=false";
 
+    public static final String DDL_FIELD_NAME = "ddl";
+
     public static final ConfigOption<String> INLONG_METRIC =
             ConfigOptions.key("inlong.metric.labels")
                     .stringType()
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 49f9f630a..536ffb57e 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.cdc.base.debezium.table;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static 
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
 
 import io.debezium.data.Envelope;
 import io.debezium.data.SpecialValueDecimal;
@@ -43,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -55,6 +58,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.sort.base.filter.RowValidator;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
 import org.apache.inlong.sort.cdc.base.util.TemporalConversions;
 import org.apache.kafka.connect.data.ConnectSchema;
 import org.apache.kafka.connect.data.Decimal;
@@ -110,6 +114,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
 
     private ZoneId serverTimeZone;
 
+    public final ObjectMapper objectMapper = new ObjectMapper();
+
     RowDataDebeziumDeserializeSchema(
             RowType physicalDataType,
             MetadataConverter[] metadataConverters,
@@ -582,31 +588,44 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             @Override
             public Object convert(Object dbzObj, Schema schema) {
 
-                ConnectSchema connectSchema = (ConnectSchema) schema;
-                List<Field> fields = connectSchema.fields();
+                if (dbzObj instanceof Struct) {
+                    ConnectSchema connectSchema = (ConnectSchema) schema;
+                    List<Field> fields = connectSchema.fields();
 
-                Map<String, Object> data = new HashMap<>();
-                Struct struct = (Struct) dbzObj;
-
-                for (Field field : fields) {
-                    String fieldName = field.name();
-                    Object fieldValue = struct.getWithoutDefault(fieldName);
-                    Schema fieldSchema = schema.field(fieldName).schema();
-                    String schemaName = fieldSchema.name();
-                    if (schemaName != null) {
-                        fieldValue = getValueWithSchema(fieldValue, 
schemaName);
-                    }
-                    if (fieldValue instanceof ByteBuffer) {
-                        // binary data (blob or varbinary in mysql) are stored 
in bytebuffer
-                        // use utf-8 to decode as a string by default
-                        fieldValue = new String(((ByteBuffer) 
fieldValue).array());
+                    Map<String, Object> data = new HashMap<>();
+                    Struct struct = (Struct) dbzObj;
+
+                    for (Field field : fields) {
+                        String fieldName = field.name();
+                        Object fieldValue = 
struct.getWithoutDefault(fieldName);
+                        Schema fieldSchema = schema.field(fieldName).schema();
+                        String schemaName = fieldSchema.name();
+                        if (schemaName != null) {
+                            fieldValue = getValueWithSchema(fieldValue, 
schemaName);
+                        }
+                        if (fieldValue instanceof ByteBuffer) {
+                            // binary data (blob or varbinary in mysql) are 
stored in bytebuffer
+                            // use utf-8 to decode as a string by default
+                            fieldValue = new String(((ByteBuffer) 
fieldValue).array());
+                        }
+                        data.put(fieldName, fieldValue);
                     }
-                    data.put(fieldName, fieldValue);
+
+                    GenericRowData row = new GenericRowData(1);
+                    row.setField(0, data);
+                    return row;
                 }
 
+                return constructDdlRow(dbzObj);
+
+            }
+
+            private GenericRowData constructDdlRow(Object ddl) {
+                Map<String, Object> data = new HashMap<>();
                 GenericRowData row = new GenericRowData(1);
                 row.setField(0, data);
-
+                data.put(DDL_FIELD_NAME, ddl);
+                row.setField(0, data);
                 return row;
             }
         };
@@ -688,6 +707,12 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         Envelope.Operation op = Envelope.operationFor(record);
         Struct value = (Struct) record.value();
         Schema valueSchema = record.valueSchema();
+
+        if (RecordUtils.isDdlRecord(value)) {
+            extractDdlRecord(record, out, tableSchema, value);
+            return;
+        }
+
         if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
             GenericRowData insert = extractAfterRow(value, valueSchema);
             insert.setRowKind(RowKind.INSERT);
@@ -704,13 +729,27 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
                     emit(record, before, tableSchema, out);
                 }
             }
-
             GenericRowData after = extractAfterRow(value, valueSchema);
             after.setRowKind(RowKind.UPDATE_AFTER);
             emit(record, after, tableSchema, out);
         }
     }
 
+    private void extractDdlRecord(SourceRecord record, Collector<RowData> out, 
TableChange tableSchema,
+            Struct value) {
+
+        try {
+            GenericRowData insert = (GenericRowData) physicalConverter.convert(
+                    
objectMapper.readTree(value.get(HISTORY_RECORD_FIELD).toString()).get(DDL_FIELD_NAME).asText(),
+                    null);
+            insert.setRowKind(RowKind.INSERT);
+            emit(record, insert, tableSchema, out);
+        } catch (Exception e) {
+            LOG.error("Failed to extract DDL record {}", record, e);
+        }
+
+    }
+
     @Override
     public void deserialize(SourceRecord record, Collector<RowData> out, 
Boolean isStreamingPhase) throws Exception {
         this.deserialize(record, out);
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
index 91ecd992e..663df4783 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.cdc.base.util;
 
+import static 
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
 import io.debezium.relational.Column;
@@ -155,4 +157,9 @@ public class RecordUtils {
         String connector = source.getString(CONNECTOR);
         return MYSQL_CONNECTOR.equalsIgnoreCase(connector);
     }
+
+    public static boolean isDdlRecord(Struct value) {
+        return value.schema().field(HISTORY_RECORD_FIELD) != null;
+    }
+
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 0a09dd400..9ad1279e4 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -213,6 +213,12 @@ public class MySqlSourceOptions {
                     .withDescription("Whether include a incremental flag in 
data "
                             + "when migrating all databases");
 
+    public static final ConfigOption<Boolean> INCLUDE_SCHEMA_CHANGE =
+            ConfigOptions.key("include-schema-change")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether include schema change in cdc 
connector");
+
     // 
----------------------------------------------------------------------------
     // experimental options, won't add them to documentation
     // 
----------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index e36b0d80f..417cd6026 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -111,12 +111,17 @@ public final class MySqlRecordEmitter<T>
             TableChanges changes = 
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
             for (TableChange tableChange : changes) {
                 
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
+                if (includeSchemaChanges) {
+                    outputDdlElement(element, output, splitState, tableChange);
+                }
             }
-            if (includeSchemaChanges) {
-                BinlogOffset position = getBinlogPosition(element);
-                splitState.asBinlogSplitState().setStartingOffset(position);
-                emitElement(element, output, null);
+
+            // for create table ddl, there's no table change events
+            // still we need to generate a ddl message
+            if (tableChanges.isEmpty()) {
+                outputDdlElement(element, output, splitState, null);
             }
+
         } else if (isDataChangeRecord(element)) {
             if (splitState.isBinlogSplitState()) {
                 BinlogOffset position = getBinlogPosition(element);
@@ -175,6 +180,13 @@ public final class MySqlRecordEmitter<T>
         }
     }
 
+    private void outputDdlElement(SourceRecord element, SourceOutput<T> 
output, MySqlSplitState splitState,
+            TableChange tableChange) throws Exception {
+        BinlogOffset position = getBinlogPosition(element);
+        splitState.asBinlogSplitState().setStartingOffset(position);
+        emitElement(element, output, tableChange);
+    }
+
     private void updateStartingOffsetForSplit(MySqlSplitState splitState, 
SourceRecord element) {
         if (splitState.isBinlogSplitState()) {
             // record the time metric to enter the incremental phase
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 60c2c7e32..a44993891 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.cdc.mysql.table;
 
 import static 
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
 
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.data.Envelope;
 import io.debezium.data.Envelope.FieldName;
@@ -36,6 +38,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
 import org.apache.inlong.sort.formats.json.canal.CanalJson;
 import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
 import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
@@ -443,26 +446,34 @@ public enum MySqlReadableMetadata {
         String databaseName = getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY);
         // opTs
         long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
-        // ts
-        long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
         // actual data
         GenericRowData data = rowData;
         Map<String, Object> field = (Map<String, Object>) data.getField(0);
         List<Map<String, Object>> dataList = new ArrayList<>();
-        dataList.add(field);
 
         CanalJson canalJson = CanalJson.builder()
-                .data(dataList).database(databaseName)
-                .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
-                .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+                .database(databaseName)
+                .es(opTs).pkNames(getPkNames(tableSchema))
+                .mysqlType(getMysqlType(tableSchema)).table(tableName)
                 .type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema))
                 .incremental(isSnapshotRecord(sourceStruct)).build();
 
         try {
+            if (RecordUtils.isDdlRecord(messageStruct)) {
+                canalJson.setSql((String) field.get(DDL_FIELD_NAME));
+                canalJson.setDdl(true);
+                canalJson.setData(dataList);
+            } else {
+                canalJson.setDdl(false);
+                canalJson.setTs((Long) messageStruct.get(FieldName.TIMESTAMP));
+                dataList.add(field);
+                canalJson.setData(dataList);
+            }
             return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
         } catch (Exception e) {
             throw new IllegalStateException("exception occurs when get meta 
data", e);
         }
+
     }
 
     private final String key;
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index f9f78e2bb..cd65fcb05 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -49,6 +49,7 @@ import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INCLUDE_INCREMENTAL;
+import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INCLUDE_SCHEMA_CHANGE;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
 import static 
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
@@ -155,6 +156,7 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 ? ROW_KINDS_FILTERED.defaultValue()
                 : config.get(ROW_KINDS_FILTERED);
         boolean enableParallelRead = 
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+        final boolean includeSchemaChange = config.get(INCLUDE_SCHEMA_CHANGE);
         if (enableParallelRead) {
             validateStartupOptionIfEnableParallel(startupOptions);
             validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 
splitSize, 1);
@@ -195,6 +197,7 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
                 inlongMetric,
                 inlongAudit,
                 rowKindFiltered,
+                includeSchemaChange,
                 includeIncremental);
     }
 
@@ -242,6 +245,7 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         options.add(ROW_KINDS_FILTERED);
         options.add(AUDIT_KEYS);
         options.add(INCLUDE_INCREMENTAL);
+        options.add(INCLUDE_SCHEMA_CHANGE);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index fe283a04a..bd1c06e85 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -85,6 +85,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
     private final String inlongMetric;
     private final String inlongAudit;
     private final boolean includeIncremental;
+    private final boolean includeSchemaChange;
     // 
--------------------------------------------------------------------------------------------
     // Mutable attributes
     // 
--------------------------------------------------------------------------------------------
@@ -99,69 +100,6 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
      */
     protected List<String> metadataKeys;
 
-    /**
-     * Constructor of MySqlTableSource.
-     */
-    public MySqlTableSource(
-            ResolvedSchema physicalSchema,
-            int port,
-            String hostname,
-            String database,
-            String tableName,
-            String username,
-            String password,
-            ZoneId serverTimeZone,
-            Properties dbzProperties,
-            @Nullable String serverId,
-            boolean enableParallelRead,
-            int splitSize,
-            int splitMetaGroupSize,
-            int fetchSize,
-            Duration connectTimeout,
-            int connectMaxRetries,
-            int connectionPoolSize,
-            double distributionFactorUpper,
-            double distributionFactorLower,
-            boolean appendSource,
-            StartupOptions startupOptions,
-            Duration heartbeatInterval,
-            boolean migrateAll,
-            String inlongMetric,
-            String inlongAudit,
-            String rowKindsFiltered,
-            boolean includeIncremental) {
-        this(
-                physicalSchema,
-                port,
-                hostname,
-                database,
-                tableName,
-                username,
-                password,
-                serverTimeZone,
-                dbzProperties,
-                serverId,
-                enableParallelRead,
-                splitSize,
-                splitMetaGroupSize,
-                fetchSize,
-                connectTimeout,
-                connectMaxRetries,
-                connectionPoolSize,
-                distributionFactorUpper,
-                distributionFactorLower,
-                appendSource,
-                startupOptions,
-                false,
-                new Properties(),
-                heartbeatInterval,
-                migrateAll,
-                inlongMetric,
-                inlongAudit,
-                rowKindsFiltered,
-                includeIncremental);
-    }
-
     /**
      * Constructor of MySqlTableSource.
      */
@@ -194,6 +132,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
             String inlongMetric,
             String inlongAudit,
             String rowKindsFiltered,
+            boolean includeSchemaChange,
             boolean includeIncremental) {
         this.physicalSchema = physicalSchema;
         this.port = port;
@@ -227,6 +166,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
         this.inlongAudit = inlongAudit;
         this.rowKindsFiltered = rowKindsFiltered;
         this.includeIncremental = includeIncremental;
+        this.includeSchemaChange = includeSchemaChange;
     }
 
     @Override
@@ -284,6 +224,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .startupOptions(startupOptions)
                             .deserializer(deserializer)
                             
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+                            .includeSchemaChanges(includeSchemaChange)
                             .jdbcProperties(jdbcProperties)
                             .heartbeatInterval(heartbeatInterval)
                             .inlongMetric(inlongMetric)
@@ -379,6 +320,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         inlongMetric,
                         inlongAudit,
                         rowKindsFiltered,
+                        includeSchemaChange,
                         includeIncremental);
         source.metadataKeys = metadataKeys;
         source.producedDataType = producedDataType;
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index e131971f3..43598bf93 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -55,6 +55,7 @@ public class AllMigrateTest {
         option.put("append-mode", "true");
         option.put("migrate-all", "true");
         option.put("include-incremental", "true");
+        option.put("include-schema-change", "true");
         List<String> tables = new ArrayList(10);
         tables.add("test.*");
         List<FieldInfo> fields = Collections.singletonList(
diff --git 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
index 7ad548aa6..c3cafc2b5 100644
--- 
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++ 
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -20,12 +20,14 @@ package org.apache.inlong.sort.formats.json.canal;
 import java.util.List;
 import java.util.Map;
 import lombok.Builder;
+import lombok.Data;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 @Builder
 @JsonInclude(Include.NON_NULL)
+@Data
 public class CanalJson {
 
     @JsonProperty("data")

Reply via email to