This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 1f40ad5a28 [INLONG-8274][Sort] Fix mysql connector will throw
exception when synchronizing incremental data (#8277)
1f40ad5a28 is described below
commit 1f40ad5a28ea4db883b0e55bcc6b28d0ddef6896
Author: Xin Gong <[email protected]>
AuthorDate: Mon Jun 19 17:11:05 2023 +0800
[INLONG-8274][Sort] Fix mysql connector will throw exception when
synchronizing incremental data (#8277)
---
.../table/RowDataDebeziumDeserializeSchema.java | 17 ++++++++++++++---
.../cdc/mysql/source/reader/MySqlRecordEmitter.java | 2 +-
.../inlong/sort/cdc/mysql/table/MySqlTableSource.java | 1 +
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 2a8691418d..a9c2a6380e 100644
---
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -103,6 +103,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
* Whether works append source.
*/
private final boolean appendSource;
+
+ private final boolean schemaChange;
/**
* A wrapped output collector which is used to append metadata columns
after physical columns.
*/
@@ -126,7 +128,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
ZoneId serverTimeZone,
boolean appendSource,
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
- boolean migrateAll) {
+ boolean migrateAll,
+ boolean schemaChange) {
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
this.appendMetadataCollector = new
AppendMetadataCollector(metadataConverters, migrateAll);
this.migrateAll = migrateAll;
@@ -139,6 +142,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
this.resultTypeInfo = checkNotNull(resultTypeInfo);
this.rowKindValidator = rowValidator;
this.appendSource = checkNotNull(appendSource);
+ this.schemaChange = schemaChange;
}
/**
@@ -710,7 +714,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
- if (RecordUtils.isDdlRecord(value)) {
+ if (schemaChange && RecordUtils.isDdlRecord(value)) {
extractDdlRecord(record, out, tableSchema, value);
return;
}
@@ -807,6 +811,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
private boolean migrateAll = false;
private DeserializationRuntimeConverterFactory
userDefinedConverterFactory =
DeserializationRuntimeConverterFactory.DEFAULT;
+ private boolean schemaChange = false;
public Builder setPhysicalRowType(RowType physicalRowType) {
this.physicalRowType = physicalRowType;
@@ -843,6 +848,11 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
return this;
}
+ public Builder setSchemaChange(boolean schemaChange) {
+ this.schemaChange = schemaChange;
+ return this;
+ }
+
public Builder setUserDefinedConverterFactory(
DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
this.userDefinedConverterFactory = userDefinedConverterFactory;
@@ -858,7 +868,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
serverTimeZone,
appendSource,
userDefinedConverterFactory,
- migrateAll);
+ migrateAll,
+ schemaChange);
}
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index c7eef03e69..ac37ddeb78 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -145,7 +145,7 @@ public final class MySqlRecordEmitter<T>
}
}
- if (tableChanges.isEmpty()) {
+ if (includeSchemaChanges && tableChanges.isEmpty()) {
TableId tableId = RecordUtils.getTableId(element);
// if this table is one of the captured tables, output the ddl
element.
if
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index c972fe5fd6..47b6af41c5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -208,6 +208,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.setUserDefinedConverterFactory(
MySqlDeserializationConverterFactory.instance())
.setMigrateAll(migrateAll)
+ .setSchemaChange(includeSchemaChange)
.build();
if (enableParallelRead) {
MySqlSource<RowData> parallelSource =