This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f40ad5a28 [INLONG-8274][Sort] Fix mysql connector will throw 
exception when synchronizing incremental data (#8277)
1f40ad5a28 is described below

commit 1f40ad5a28ea4db883b0e55bcc6b28d0ddef6896
Author: Xin Gong <[email protected]>
AuthorDate: Mon Jun 19 17:11:05 2023 +0800

    [INLONG-8274][Sort] Fix mysql connector will throw exception when 
synchronizing incremental data (#8277)
---
 .../table/RowDataDebeziumDeserializeSchema.java         | 17 ++++++++++++++---
 .../cdc/mysql/source/reader/MySqlRecordEmitter.java     |  2 +-
 .../inlong/sort/cdc/mysql/table/MySqlTableSource.java   |  1 +
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
index 2a8691418d..a9c2a6380e 100644
--- 
a/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/debezium/table/RowDataDebeziumDeserializeSchema.java
@@ -103,6 +103,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
      * Whether works append source.
      */
     private final boolean appendSource;
+
+    private final boolean schemaChange;
     /**
      * A wrapped output collector which is used to append metadata columns 
after physical columns.
      */
@@ -126,7 +128,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             ZoneId serverTimeZone,
             boolean appendSource,
             DeserializationRuntimeConverterFactory userDefinedConverterFactory,
-            boolean migrateAll) {
+            boolean migrateAll,
+            boolean schemaChange) {
         this.hasMetadata = checkNotNull(metadataConverters).length > 0;
         this.appendMetadataCollector = new 
AppendMetadataCollector(metadataConverters, migrateAll);
         this.migrateAll = migrateAll;
@@ -139,6 +142,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         this.resultTypeInfo = checkNotNull(resultTypeInfo);
         this.rowKindValidator = rowValidator;
         this.appendSource = checkNotNull(appendSource);
+        this.schemaChange = schemaChange;
     }
 
     /**
@@ -710,7 +714,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         Struct value = (Struct) record.value();
         Schema valueSchema = record.valueSchema();
 
-        if (RecordUtils.isDdlRecord(value)) {
+        if (schemaChange && RecordUtils.isDdlRecord(value)) {
             extractDdlRecord(record, out, tableSchema, value);
             return;
         }
@@ -807,6 +811,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         private boolean migrateAll = false;
         private DeserializationRuntimeConverterFactory 
userDefinedConverterFactory =
                 DeserializationRuntimeConverterFactory.DEFAULT;
+        private boolean schemaChange = false;
 
         public Builder setPhysicalRowType(RowType physicalRowType) {
             this.physicalRowType = physicalRowType;
@@ -843,6 +848,11 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             return this;
         }
 
+        public Builder setSchemaChange(boolean schemaChange) {
+            this.schemaChange = schemaChange;
+            return this;
+        }
+
         public Builder setUserDefinedConverterFactory(
                 DeserializationRuntimeConverterFactory 
userDefinedConverterFactory) {
             this.userDefinedConverterFactory = userDefinedConverterFactory;
@@ -858,7 +868,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
                     serverTimeZone,
                     appendSource,
                     userDefinedConverterFactory,
-                    migrateAll);
+                    migrateAll,
+                    schemaChange);
         }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index c7eef03e69..ac37ddeb78 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -145,7 +145,7 @@ public final class MySqlRecordEmitter<T>
                 }
             }
 
-            if (tableChanges.isEmpty()) {
+            if (includeSchemaChanges && tableChanges.isEmpty()) {
                 TableId tableId = RecordUtils.getTableId(element);
                 // if this table is one of the captured tables, output the ddl 
element.
                 if 
(splitState.getMySQLSplit().getTableSchemas().containsKey(tableId)
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index c972fe5fd6..47b6af41c5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -208,6 +208,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                         .setUserDefinedConverterFactory(
                                 
MySqlDeserializationConverterFactory.instance())
                         .setMigrateAll(migrateAll)
+                        .setSchemaChange(includeSchemaChange)
                         .build();
         if (enableParallelRead) {
             MySqlSource<RowData> parallelSource =

Reply via email to