This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0537dbe66 [INLONG-7553][Sort] Mysql CDC support output DDL in all
migrate mode (#7579)
0537dbe66 is described below
commit 0537dbe66f4f433c2edfcb160e9d2d5117cc7045
Author: Schnapps <[email protected]>
AuthorDate: Thu Mar 30 20:09:03 2023 +0800
[INLONG-7553][Sort] Mysql CDC support output DDL in all migrate mode (#7579)
---
.../org/apache/inlong/sort/base/Constants.java | 2 +
.../table/RowDataDebeziumDeserializeSchema.java | 79 ++++++++++++++++------
.../inlong/sort/cdc/base/util/RecordUtils.java | 7 ++
.../mysql/source/config/MySqlSourceOptions.java | 6 ++
.../mysql/source/reader/MySqlRecordEmitter.java | 20 ++++--
.../cdc/mysql/table/MySqlReadableMetadata.java | 23 +++++--
.../mysql/table/MySqlTableInlongSourceFactory.java | 4 ++
.../sort/cdc/mysql/table/MySqlTableSource.java | 68 ++-----------------
.../apache/inlong/sort/parser/AllMigrateTest.java | 1 +
.../inlong/sort/formats/json/canal/CanalJson.java | 2 +
10 files changed, 119 insertions(+), 93 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index ac2a7c3a9..702329e6c 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -134,6 +134,8 @@ public final class Constants {
public static final String AUTO_DESERIALIZE_FALSE =
"autoDeserialize=false";
+ public static final String DDL_FIELD_NAME = "ddl";
+
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric.labels")
.stringType()
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 49f9f630a..536ffb57e 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.cdc.base.debezium.table;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+import static
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
@@ -43,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -55,6 +58,7 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.inlong.sort.base.filter.RowValidator;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
import org.apache.inlong.sort.cdc.base.util.TemporalConversions;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Decimal;
@@ -110,6 +114,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
private ZoneId serverTimeZone;
+ public final ObjectMapper objectMapper = new ObjectMapper();
+
RowDataDebeziumDeserializeSchema(
RowType physicalDataType,
MetadataConverter[] metadataConverters,
@@ -582,31 +588,44 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
@Override
public Object convert(Object dbzObj, Schema schema) {
- ConnectSchema connectSchema = (ConnectSchema) schema;
- List<Field> fields = connectSchema.fields();
+ if (dbzObj instanceof Struct) {
+ ConnectSchema connectSchema = (ConnectSchema) schema;
+ List<Field> fields = connectSchema.fields();
- Map<String, Object> data = new HashMap<>();
- Struct struct = (Struct) dbzObj;
-
- for (Field field : fields) {
- String fieldName = field.name();
- Object fieldValue = struct.getWithoutDefault(fieldName);
- Schema fieldSchema = schema.field(fieldName).schema();
- String schemaName = fieldSchema.name();
- if (schemaName != null) {
- fieldValue = getValueWithSchema(fieldValue,
schemaName);
- }
- if (fieldValue instanceof ByteBuffer) {
- // binary data (blob or varbinary in mysql) are stored
in bytebuffer
- // use utf-8 to decode as a string by default
- fieldValue = new String(((ByteBuffer)
fieldValue).array());
+ Map<String, Object> data = new HashMap<>();
+ Struct struct = (Struct) dbzObj;
+
+ for (Field field : fields) {
+ String fieldName = field.name();
+ Object fieldValue =
struct.getWithoutDefault(fieldName);
+ Schema fieldSchema = schema.field(fieldName).schema();
+ String schemaName = fieldSchema.name();
+ if (schemaName != null) {
+ fieldValue = getValueWithSchema(fieldValue,
schemaName);
+ }
+ if (fieldValue instanceof ByteBuffer) {
+ // binary data (blob or varbinary in mysql) are
stored in bytebuffer
+ // use utf-8 to decode as a string by default
+ fieldValue = new String(((ByteBuffer)
fieldValue).array());
+ }
+ data.put(fieldName, fieldValue);
}
- data.put(fieldName, fieldValue);
+
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, data);
+ return row;
}
+ return constructDdlRow(dbzObj);
+
+ }
+
+ private GenericRowData constructDdlRow(Object ddl) {
+ Map<String, Object> data = new HashMap<>();
GenericRowData row = new GenericRowData(1);
row.setField(0, data);
-
+ data.put(DDL_FIELD_NAME, ddl);
+ row.setField(0, data);
return row;
}
};
@@ -688,6 +707,12 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
+
+ if (RecordUtils.isDdlRecord(value)) {
+ extractDdlRecord(record, out, tableSchema, value);
+ return;
+ }
+
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
insert.setRowKind(RowKind.INSERT);
@@ -704,13 +729,27 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
emit(record, before, tableSchema, out);
}
}
-
GenericRowData after = extractAfterRow(value, valueSchema);
after.setRowKind(RowKind.UPDATE_AFTER);
emit(record, after, tableSchema, out);
}
}
+ private void extractDdlRecord(SourceRecord record, Collector<RowData> out,
TableChange tableSchema,
+ Struct value) {
+
+ try {
+ GenericRowData insert = (GenericRowData) physicalConverter.convert(
+
objectMapper.readTree(value.get(HISTORY_RECORD_FIELD).toString()).get(DDL_FIELD_NAME).asText(),
+ null);
+ insert.setRowKind(RowKind.INSERT);
+ emit(record, insert, tableSchema, out);
+ } catch (Exception e) {
+ LOG.error("Failed to extract DDL record {}", record, e);
+ }
+
+ }
+
@Override
public void deserialize(SourceRecord record, Collector<RowData> out,
Boolean isStreamingPhase) throws Exception {
this.deserialize(record, out);
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
index 91ecd992e..663df4783 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/RecordUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.cdc.base.util;
+import static
org.apache.inlong.sort.cdc.base.relational.JdbcSourceEventDispatcher.HISTORY_RECORD_FIELD;
+
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
@@ -155,4 +157,9 @@ public class RecordUtils {
String connector = source.getString(CONNECTOR);
return MYSQL_CONNECTOR.equalsIgnoreCase(connector);
}
+
+ public static boolean isDdlRecord(Struct value) {
+ return value.schema().field(HISTORY_RECORD_FIELD) != null;
+ }
+
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index 0a09dd400..9ad1279e4 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -213,6 +213,12 @@ public class MySqlSourceOptions {
.withDescription("Whether include a incremental flag in
data "
+ "when migrating all databases");
+ public static final ConfigOption<Boolean> INCLUDE_SCHEMA_CHANGE =
+ ConfigOptions.key("include-schema-change")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether include schema change in cdc
connector");
+
//
----------------------------------------------------------------------------
// experimental options, won't add them to documentation
//
----------------------------------------------------------------------------
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index e36b0d80f..417cd6026 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -111,12 +111,17 @@ public final class MySqlRecordEmitter<T>
TableChanges changes =
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
for (TableChange tableChange : changes) {
splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
+ if (includeSchemaChanges) {
+ outputDdlElement(element, output, splitState, tableChange);
+ }
}
- if (includeSchemaChanges) {
- BinlogOffset position = getBinlogPosition(element);
- splitState.asBinlogSplitState().setStartingOffset(position);
- emitElement(element, output, null);
+
+ // for create table ddl, there's no table change events
+ // still we need to generate a ddl message
+ if (tableChanges.isEmpty()) {
+ outputDdlElement(element, output, splitState, null);
}
+
} else if (isDataChangeRecord(element)) {
if (splitState.isBinlogSplitState()) {
BinlogOffset position = getBinlogPosition(element);
@@ -175,6 +180,13 @@ public final class MySqlRecordEmitter<T>
}
}
+ private void outputDdlElement(SourceRecord element, SourceOutput<T>
output, MySqlSplitState splitState,
+ TableChange tableChange) throws Exception {
+ BinlogOffset position = getBinlogPosition(element);
+ splitState.asBinlogSplitState().setStartingOffset(position);
+ emitElement(element, output, tableChange);
+ }
+
private void updateStartingOffsetForSplit(MySqlSplitState splitState,
SourceRecord element) {
if (splitState.isBinlogSplitState()) {
// record the time metric to enter the incremental phase
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
index 60c2c7e32..a44993891 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.cdc.mysql.table;
import static
org.apache.inlong.sort.cdc.mysql.source.utils.RecordUtils.isSnapshotRecord;
+import static org.apache.inlong.sort.base.Constants.DDL_FIELD_NAME;
+
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.FieldName;
@@ -36,6 +38,7 @@ import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.inlong.sort.cdc.base.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.cdc.base.util.RecordUtils;
import org.apache.inlong.sort.formats.json.canal.CanalJson;
import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
@@ -443,26 +446,34 @@ public enum MySqlReadableMetadata {
String databaseName = getMetaData(record,
AbstractSourceInfo.DATABASE_NAME_KEY);
// opTs
long opTs = (Long) sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY);
- // ts
- long ts = (Long) messageStruct.get(FieldName.TIMESTAMP);
// actual data
GenericRowData data = rowData;
Map<String, Object> field = (Map<String, Object>) data.getField(0);
List<Map<String, Object>> dataList = new ArrayList<>();
- dataList.add(field);
CanalJson canalJson = CanalJson.builder()
- .data(dataList).database(databaseName)
- .sql("").es(opTs).isDdl(false).pkNames(getPkNames(tableSchema))
- .mysqlType(getMysqlType(tableSchema)).table(tableName).ts(ts)
+ .database(databaseName)
+ .es(opTs).pkNames(getPkNames(tableSchema))
+ .mysqlType(getMysqlType(tableSchema)).table(tableName)
.type(getCanalOpType(rowData)).sqlType(getSqlType(tableSchema))
.incremental(isSnapshotRecord(sourceStruct)).build();
try {
+ if (RecordUtils.isDdlRecord(messageStruct)) {
+ canalJson.setSql((String) field.get(DDL_FIELD_NAME));
+ canalJson.setDdl(true);
+ canalJson.setData(dataList);
+ } else {
+ canalJson.setDdl(false);
+ canalJson.setTs((Long) messageStruct.get(FieldName.TIMESTAMP));
+ dataList.add(field);
+ canalJson.setData(dataList);
+ }
return
StringData.fromString(OBJECT_MAPPER.writeValueAsString(canalJson));
} catch (Exception e) {
throw new IllegalStateException("exception occurs when get meta
data", e);
}
+
}
private final String key;
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index f9f78e2bb..cd65fcb05 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -49,6 +49,7 @@ import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INCLUDE_INCREMENTAL;
+import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INCLUDE_SCHEMA_CHANGE;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
@@ -155,6 +156,7 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
? ROW_KINDS_FILTERED.defaultValue()
: config.get(ROW_KINDS_FILTERED);
boolean enableParallelRead =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
+ final boolean includeSchemaChange = config.get(INCLUDE_SCHEMA_CHANGE);
if (enableParallelRead) {
validateStartupOptionIfEnableParallel(startupOptions);
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
splitSize, 1);
@@ -195,6 +197,7 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
inlongMetric,
inlongAudit,
rowKindFiltered,
+ includeSchemaChange,
includeIncremental);
}
@@ -242,6 +245,7 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
options.add(ROW_KINDS_FILTERED);
options.add(AUDIT_KEYS);
options.add(INCLUDE_INCREMENTAL);
+ options.add(INCLUDE_SCHEMA_CHANGE);
return options;
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index fe283a04a..bd1c06e85 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -85,6 +85,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
private final String inlongMetric;
private final String inlongAudit;
private final boolean includeIncremental;
+ private final boolean includeSchemaChange;
//
--------------------------------------------------------------------------------------------
// Mutable attributes
//
--------------------------------------------------------------------------------------------
@@ -99,69 +100,6 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
*/
protected List<String> metadataKeys;
- /**
- * Constructor of MySqlTableSource.
- */
- public MySqlTableSource(
- ResolvedSchema physicalSchema,
- int port,
- String hostname,
- String database,
- String tableName,
- String username,
- String password,
- ZoneId serverTimeZone,
- Properties dbzProperties,
- @Nullable String serverId,
- boolean enableParallelRead,
- int splitSize,
- int splitMetaGroupSize,
- int fetchSize,
- Duration connectTimeout,
- int connectMaxRetries,
- int connectionPoolSize,
- double distributionFactorUpper,
- double distributionFactorLower,
- boolean appendSource,
- StartupOptions startupOptions,
- Duration heartbeatInterval,
- boolean migrateAll,
- String inlongMetric,
- String inlongAudit,
- String rowKindsFiltered,
- boolean includeIncremental) {
- this(
- physicalSchema,
- port,
- hostname,
- database,
- tableName,
- username,
- password,
- serverTimeZone,
- dbzProperties,
- serverId,
- enableParallelRead,
- splitSize,
- splitMetaGroupSize,
- fetchSize,
- connectTimeout,
- connectMaxRetries,
- connectionPoolSize,
- distributionFactorUpper,
- distributionFactorLower,
- appendSource,
- startupOptions,
- false,
- new Properties(),
- heartbeatInterval,
- migrateAll,
- inlongMetric,
- inlongAudit,
- rowKindsFiltered,
- includeIncremental);
- }
-
/**
* Constructor of MySqlTableSource.
*/
@@ -194,6 +132,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
String inlongMetric,
String inlongAudit,
String rowKindsFiltered,
+ boolean includeSchemaChange,
boolean includeIncremental) {
this.physicalSchema = physicalSchema;
this.port = port;
@@ -227,6 +166,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
this.inlongAudit = inlongAudit;
this.rowKindsFiltered = rowKindsFiltered;
this.includeIncremental = includeIncremental;
+ this.includeSchemaChange = includeSchemaChange;
}
@Override
@@ -284,6 +224,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+ .includeSchemaChanges(includeSchemaChange)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.inlongMetric(inlongMetric)
@@ -379,6 +320,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
inlongMetric,
inlongAudit,
rowKindsFiltered,
+ includeSchemaChange,
includeIncremental);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
index e131971f3..43598bf93 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java
@@ -55,6 +55,7 @@ public class AllMigrateTest {
option.put("append-mode", "true");
option.put("migrate-all", "true");
option.put("include-incremental", "true");
+ option.put("include-schema-change", "true");
List<String> tables = new ArrayList(10);
tables.add("test.*");
List<FieldInfo> fields = Collections.singletonList(
diff --git
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
index 7ad548aa6..c3cafc2b5 100644
---
a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
+++
b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJson.java
@@ -20,12 +20,14 @@ package org.apache.inlong.sort.formats.json.canal;
import java.util.List;
import java.util.Map;
import lombok.Builder;
+import lombok.Data;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@Builder
@JsonInclude(Include.NON_NULL)
+@Data
public class CanalJson {
@JsonProperty("data")