This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 77f10a04a [cdc] Refactor mysql debezium json event parser (#2053)
77f10a04a is described below

commit 77f10a04af71449aecb1e4f76acdf6d559b159da
Author: Kerwin <[email protected]>
AuthorDate: Tue Sep 26 15:02:03 2023 +0800

    [cdc] Refactor mysql debezium json event parser (#2053)
---
 .../flink/action/cdc/kafka/KafkaSchemaUtils.java   |   6 +-
 .../action/cdc/kafka/KafkaSyncDatabaseAction.java  |   4 +-
 .../action/cdc/kafka/KafkaSyncTableAction.java     |   4 +-
 .../cdc/kafka/{formats => format}/DataFormat.java  |   8 +-
 .../kafka/{formats => format}/RecordParser.java    |   2 +-
 .../{formats => format}/RecordParserFactory.java   |   2 +-
 .../canal/CanalFieldParser.java                    |   2 +-
 .../canal/CanalRecordParser.java                   |   4 +-
 .../maxwell/MaxwellRecordParser.java               |   4 +-
 .../{formats => format}/ogg/OggRecordParser.java   |   4 +-
 .../cdc/mysql/MySqlDebeziumJsonEventParser.java    | 179 ++++++---------
 .../action/cdc/mysql/MySqlTableSchemaBuilder.java  |  70 +++---
 .../flink/action/cdc/mysql/MySqlTypeUtils.java     |   4 +-
 .../action/cdc/mysql/format/DebeziumEvent.java     | 245 +++++++++++++++++++++
 .../cdc/mysql/format/DebeziumEventUtils.java       |  45 ++++
 .../flink/action/cdc/mysql/DebeziumEventTest.java  |  71 ++++++
 .../resources/mysql/debezium-event-change.json     | 141 ++++++++++++
 17 files changed, 626 insertions(+), 169 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
index 330df8e79..7ed9a4aab 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
@@ -19,8 +19,8 @@
 package org.apache.paimon.flink.action.cdc.kafka;
 
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
 import org.apache.paimon.schema.Schema;
 
 import org.apache.flink.configuration.Configuration;
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import static 
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId;
-import static 
org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat;
+import static 
org.apache.paimon.flink.action.cdc.kafka.format.DataFormat.getDataFormat;
 
 /** Utility class to load kafka schema. */
 public class KafkaSchemaUtils {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 61aa45108..e8df881fb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -24,8 +24,8 @@ import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 87d3f0ddb..b00fa38b0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -26,8 +26,8 @@ import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.ActionBase;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
 import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java
similarity index 91%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java
index 71c4af8f4..e178fec40 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats;
+package org.apache.paimon.flink.action.cdc.kafka.format;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import 
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
-import 
org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
-import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.canal.CanalRecordParser;
+import 
org.apache.paimon.flink.action.cdc.kafka.format.maxwell.MaxwellRecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.ogg.OggRecordParser;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java
index bf6be1b7d..2c0ffb19a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats;
+package org.apache.paimon.flink.action.cdc.kafka.format;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java
index a47bca23f..e83d4b0d7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats;
+package org.apache.paimon.flink.action.cdc.kafka.format;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java
index cb77fe8e7..619da6b79 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
+package org.apache.paimon.flink.action.cdc.kafka.format.canal;
 
 /** Converts some special types such as enum、set、geometry. */
 public class CanalFieldParser {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java
index e0f2ec748..62ca27a4f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
+package org.apache.paimon.flink.action.cdc.kafka.format.canal;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java
index 8b51edad9..f116d5d27 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
+package org.apache.paimon.flink.action.cdc.kafka.format.maxwell;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.RowKind;
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
rename to 
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java
index 7f4fcc76a..0c389d48d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java
@@ -16,11 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
+package org.apache.paimon.flink.action.cdc.kafka.format.ogg;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.RowKind;
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index fd6c23e0d..e18cc1931 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -27,18 +27,17 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.EventParser;
 import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -69,6 +68,8 @@ import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -77,7 +78,6 @@ import java.util.regex.Pattern;
 
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
-import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
 
 /** {@link EventParser} for MySQL Debezium JSON. */
@@ -90,15 +90,15 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     private final boolean caseSensitive;
     private final TableNameConverter tableNameConverter;
     private final List<ComputedColumn> computedColumns;
-    private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
+    private final NewTableSchemaBuilder<TableChanges.TableChange> 
schemaBuilder;
     @Nullable private final Pattern includingPattern;
     @Nullable private final Pattern excludingPattern;
     private final Set<String> includedTables = new HashSet<>();
     private final Set<String> excludedTables = new HashSet<>();
     private final TypeMapping typeMapping;
 
-    private JsonNode root;
-    private JsonNode payload;
+    private DebeziumEvent root;
+
     // NOTE: current table name is not converted by tableNameConverter
     private String currentTable;
     private boolean shouldSynchronizeCurrentTable;
@@ -113,7 +113,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                 caseSensitive,
                 computedColumns,
                 new TableNameConverter(caseSensitive),
-                ddl -> Optional.empty(),
+                new MySqlTableSchemaBuilder(new HashMap<>(), caseSensitive, 
typeMapping),
                 null,
                 null,
                 typeMapping);
@@ -123,7 +123,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             ZoneId serverTimeZone,
             boolean caseSensitive,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<JsonNode> schemaBuilder,
+            NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
             TypeMapping typeMapping) {
@@ -143,7 +143,7 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             boolean caseSensitive,
             List<ComputedColumn> computedColumns,
             TableNameConverter tableNameConverter,
-            NewTableSchemaBuilder<JsonNode> schemaBuilder,
+            NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
             TypeMapping typeMapping) {
@@ -160,10 +160,11 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     @Override
     public void setRawEvent(String rawEvent) {
         try {
-            
objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
-            root = objectMapper.readValue(rawEvent, JsonNode.class);
-            payload = root.get("payload");
-            currentTable = payload.get("source").get("table").asText();
+            objectMapper
+                    
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
+                    
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+            root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
+            currentTable = root.payload().source().table();
             shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
         } catch (Exception e) {
             throw new RuntimeException(e);
@@ -175,63 +176,37 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
         return tableNameConverter.convert(Identifier.create(getDatabaseName(), 
currentTable));
     }
 
-    private boolean isSchemaChange() {
-        return payload.get("op") == null;
-    }
-
     @Override
     public List<DataField> parseSchemaChange() {
-        if (!shouldSynchronizeCurrentTable || !isSchemaChange()) {
+        if (!shouldSynchronizeCurrentTable || 
!root.payload().isSchemaChange()) {
             return Collections.emptyList();
         }
 
-        JsonNode historyRecord = payload.get("historyRecord");
-        if (historyRecord == null) {
+        DebeziumEvent.Payload payload = root.payload();
+        if (!payload.hasHistoryRecord()) {
             return Collections.emptyList();
         }
 
-        JsonNode columns;
+        TableChanges.TableChange tableChange = null;
         try {
-            String historyRecordString = historyRecord.asText();
-            JsonNode tableChanges = 
objectMapper.readTree(historyRecordString).get("tableChanges");
-            if (tableChanges.size() != 1) {
+            Iterator<TableChanges.TableChange> tableChanges = 
payload.getTableChanges();
+            long count;
+            for (count = 0L; tableChanges.hasNext(); ++count) {
+                tableChange = tableChanges.next();
+            }
+            if (count != 1) {
                 LOG.error(
                         "Invalid historyRecord, because tableChanges should 
contain exactly 1 item.\n"
-                                + historyRecord.asText());
+                                + payload.historyRecord());
                 return Collections.emptyList();
             }
-            columns = tableChanges.get(0).get("table").get("columns");
         } catch (Exception e) {
             LOG.info("Failed to parse history record for schema changes", e);
             return Collections.emptyList();
         }
-        if (columns == null) {
-            return Collections.emptyList();
-        }
 
-        List<DataField> result = new ArrayList<>();
-        for (int i = 0; i < columns.size(); i++) {
-            JsonNode column = columns.get(i);
-            JsonNode length = column.get("length");
-            JsonNode scale = column.get("scale");
-            DataType dataType =
-                    MySqlTypeUtils.toDataType(
-                            column.get("typeName").asText(),
-                            length == null ? null : length.asInt(),
-                            scale == null ? null : scale.asInt(),
-                            typeMapping);
-
-            dataType =
-                    dataType.copy(
-                            typeMapping.containsMode(TO_NULLABLE)
-                                    || column.get("optional").asBoolean());
-
-            String fieldName = column.get("name").asText();
-            result.add(
-                    new DataField(
-                            i, caseSensitive ? fieldName : 
fieldName.toLowerCase(), dataType));
-        }
-        return result;
+        Optional<Schema> schema = schemaBuilder.build(tableChange);
+        return schema.get().fields();
     }
 
     @Override
@@ -240,31 +215,31 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
             return Optional.empty();
         }
 
-        JsonNode historyRecord = payload.get("historyRecord");
-        if (historyRecord == null) {
+        DebeziumEvent.Payload payload = root.payload();
+        if (!payload.hasHistoryRecord()) {
             return Optional.empty();
         }
 
         try {
-            String historyRecordString = historyRecord.asText();
-            JsonNode tableChanges = 
objectMapper.readTree(historyRecordString).get("tableChanges");
-            if (tableChanges.size() != 1) {
+            TableChanges.TableChange tableChange = null;
+            Iterator<TableChanges.TableChange> tableChanges = 
payload.getTableChanges();
+            long count;
+            for (count = 0L; tableChanges.hasNext(); ++count) {
+                tableChange = tableChanges.next();
+            }
+            if (count != 1) {
                 LOG.error(
                         "Invalid historyRecord, because tableChanges should 
contain exactly 1 item.\n"
-                                + historyRecord.asText());
+                                + payload.historyRecord());
                 return Optional.empty();
             }
 
-            JsonNode tableChange = tableChanges.get(0);
-            if (!tableChange
-                    .get("type")
-                    .asText()
-                    .equals(TableChanges.TableChangeType.CREATE.name())) {
+            if (TableChanges.TableChangeType.CREATE != tableChange.getType()) {
                 return Optional.empty();
             }
 
-            JsonNode primaryKeyColumnNames = 
tableChange.get("table").get("primaryKeyColumnNames");
-            if (primaryKeyColumnNames.size() == 0) {
+            List<String> primaryKeyColumnNames = 
tableChange.getTable().primaryKeyColumnNames();
+            if (primaryKeyColumnNames.isEmpty()) {
                 LOG.debug(
                         "Didn't find primary keys from MySQL DDL for table 
'{}'. "
                                 + "This table won't be synchronized.",
@@ -283,19 +258,19 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
 
     @Override
     public List<CdcRecord> parseRecords() {
-        if (!shouldSynchronizeCurrentTable || isSchemaChange()) {
+        if (!shouldSynchronizeCurrentTable || root.payload().isSchemaChange()) 
{
             return Collections.emptyList();
         }
         List<CdcRecord> records = new ArrayList<>();
 
-        Map<String, String> before = extractRow(payload.get("before"));
-        if (before.size() > 0) {
+        Map<String, String> before = extractRow(root.payload().before());
+        if (!before.isEmpty()) {
             before = mapKeyCaseConvert(before, caseSensitive, 
recordKeyDuplicateErrMsg(before));
             records.add(new CdcRecord(RowKind.DELETE, before));
         }
 
-        Map<String, String> after = extractRow(payload.get("after"));
-        if (after.size() > 0) {
+        Map<String, String> after = extractRow(root.payload().after());
+        if (!after.isEmpty()) {
             after = mapKeyCaseConvert(after, caseSensitive, 
recordKeyDuplicateErrMsg(after));
             records.add(new CdcRecord(RowKind.INSERT, after));
         }
@@ -304,57 +279,34 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     }
 
     private String getDatabaseName() {
-        return payload.get("source").get("db").asText();
+        return root.payload().source().db();
     }
 
     private Map<String, String> extractRow(JsonNode recordRow) {
-        JsonNode schema =
+        if (recordRow == null) {
+            return new HashMap<>();
+        }
+
+        DebeziumEvent.Field schema =
                 Preconditions.checkNotNull(
-                        root.get("schema"),
+                        root.schema(),
                         "MySqlDebeziumJsonEventParser only supports debezium 
JSON with schema. "
                                 + "Please make sure that `includeSchema` is 
true "
                                 + "in the JsonDebeziumDeserializationSchema 
you created");
 
-        Map<String, String> mySqlFieldTypes = new HashMap<>();
-        Map<String, String> fieldClassNames = new HashMap<>();
-        JsonNode arrayNode = schema.get("fields");
-        for (int i = 0; i < arrayNode.size(); i++) {
-            JsonNode elementNode = arrayNode.get(i);
-            String field = elementNode.get("field").asText();
-            if ("before".equals(field) || "after".equals(field)) {
-                JsonNode innerArrayNode = elementNode.get("fields");
-                for (int j = 0; j < innerArrayNode.size(); j++) {
-                    JsonNode innerElementNode = innerArrayNode.get(j);
-                    String fieldName = innerElementNode.get("field").asText();
-                    String fieldType = innerElementNode.get("type").asText();
-                    mySqlFieldTypes.put(fieldName, fieldType);
-                    if (innerElementNode.get("name") != null) {
-                        String className = 
innerElementNode.get("name").asText();
-                        fieldClassNames.put(fieldName, className);
-                    }
-                }
-            }
-        }
-
-        // the geometry, point type can not be converted to string, so we 
convert it to Object
-        // first.
-        Map<String, Object> jsonMap =
-                objectMapper.convertValue(recordRow, new 
TypeReference<Map<String, Object>>() {});
-        if (jsonMap == null) {
-            return new HashMap<>();
-        }
+        Map<String, DebeziumEvent.Field> fields = 
schema.beforeAndAfterFields();
 
-        Map<String, String> resultMap = new HashMap<>();
-        for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
+        LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
+        for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet()) 
{
             String fieldName = field.getKey();
-            String mySqlType = field.getValue();
-            Object objectValue = jsonMap.get(fieldName);
-            if (objectValue == null) {
+            String mySqlType = field.getValue().type();
+            JsonNode objectValue = recordRow.get(fieldName);
+            if (objectValue == null || objectValue.isNull()) {
                 continue;
             }
 
-            String className = fieldClassNames.get(fieldName);
-            String oldValue = objectValue.toString();
+            String className = field.getValue().name();
+            String oldValue = objectValue.asText();
             String newValue = oldValue;
 
             if (Bits.LOGICAL_NAME.equals(className)) {
@@ -444,13 +396,13 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
                                 .toString();
             } else if (Point.LOGICAL_NAME.equals(className)
                     || Geometry.LOGICAL_NAME.equals(className)) {
-                JsonNode jsonNode = recordRow.get(fieldName);
                 try {
-                    byte[] wkb = jsonNode.get("wkb").binaryValue();
+                    byte[] wkb = 
objectValue.get(Geometry.WKB_FIELD).binaryValue();
                     newValue = MySqlTypeUtils.convertWkbArray(wkb);
                 } catch (Exception e) {
                     throw new IllegalArgumentException(
-                            String.format("Failed to convert %s to geometry 
JSON.", jsonNode), e);
+                            String.format("Failed to convert %s to geometry 
JSON.", objectValue),
+                            e);
                 }
             }
 
@@ -468,6 +420,11 @@ public class MySqlDebeziumJsonEventParser implements 
EventParser<String> {
     }
 
     private boolean shouldSynchronizeCurrentTable() {
+        // When database DDL operation, the current table is null.
+        if (currentTable == null) {
+            return false;
+        }
+
         if (excludedTables.contains(currentTable)) {
             return false;
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index aec731fb2..24edbc66f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -23,22 +23,22 @@ import 
org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataType;
 
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
 import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static 
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
 import static 
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Schema builder for MySQL cdc. */
-public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode> {
+public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<TableChanges.TableChange> {
 
     private final Map<String, String> tableConfig;
     private final boolean caseSensitive;
@@ -52,47 +52,45 @@ public class MySqlTableSchemaBuilder implements 
NewTableSchemaBuilder<JsonNode>
     }
 
     @Override
-    public Optional<Schema> build(JsonNode tableChange) {
-        JsonNode jsonTable = tableChange.get("table");
-        String tableName = tableChange.get("id").asText();
-        ArrayNode columns = (ArrayNode) jsonTable.get("columns");
-        LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();
-
-        for (JsonNode element : columns) {
-            JsonNode length = element.get("length");
-            JsonNode scale = element.get("scale");
+    public Optional<Schema> build(TableChanges.TableChange tableChange) {
+        Table table = tableChange.getTable();
+        String tableName = tableChange.getId().toString();
+        List<Column> columns = table.columns();
+
+        Schema.Builder builder = Schema.newBuilder();
+        Map<String, Integer> duplicateFields = new HashMap<>();
+
+        // column
+        for (Column column : columns) {
             DataType dataType =
                     MySqlTypeUtils.toDataType(
-                            element.get("typeExpression").asText(),
-                            length == null ? null : length.asInt(),
-                            scale == null ? null : scale.asInt(),
+                            column.typeExpression(),
+                            column.length(),
+                            column.scale().orElse(null),
                             typeMapping);
 
-            dataType =
-                    dataType.copy(
-                            typeMapping.containsMode(TO_NULLABLE)
-                                    || element.get("optional").asBoolean());
+            dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || 
column.isOptional());
 
+            String columnName = column.name();
+            if (!caseSensitive) {
+                checkArgument(
+                        !duplicateFields.containsKey(columnName.toLowerCase()),
+                        columnDuplicateErrMsg(tableName).apply(columnName));
+                columnName = columnName.toLowerCase();
+            }
             // TODO : add table comment and column comment when we upgrade 
flink cdc to 2.4
-            fields.put(element.get("name").asText(), dataType);
+            builder.column(columnName, dataType, null);
+            duplicateFields.put(columnName, 1);
         }
 
-        ArrayNode arrayNode = (ArrayNode) 
jsonTable.get("primaryKeyColumnNames");
-        List<String> primaryKeys = new ArrayList<>();
-        for (JsonNode primary : arrayNode) {
-            primaryKeys.add(primary.asText());
-        }
-
-        fields = mapKeyCaseConvert(fields, caseSensitive, 
columnDuplicateErrMsg(tableName));
+        // primaryKey
+        List<String> primaryKeys = table.primaryKeyColumnNames();
         primaryKeys = listCaseConvert(primaryKeys, caseSensitive);
+        builder.primaryKey(primaryKeys);
 
-        Schema.Builder builder = Schema.newBuilder();
+        // options
         builder.options(tableConfig);
-        for (Map.Entry<String, DataType> entry : fields.entrySet()) {
-            builder.column(entry.getKey(), entry.getValue());
-        }
-        Schema schema = builder.primaryKey(primaryKeys).build();
 
-        return Optional.of(schema);
+        return Optional.of(builder.build());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index a9a3c9630..616ba5726 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -228,7 +228,7 @@ public class MySqlTypeUtils {
                 return DataTypes.TIME();
             case DATETIME:
             case TIMESTAMP:
-                if (length == null) {
+                if (length == null || length <= 0) {
                     // default precision is 0
                     // see 
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
                     return DataTypes.TIMESTAMP(0);
@@ -241,7 +241,7 @@ public class MySqlTypeUtils {
                     } else {
                         return DataTypes.TIMESTAMP(0);
                     }
-                } else if (length >= 0 && length <= 
TimestampType.MAX_PRECISION) {
+                } else if (length <= TimestampType.MAX_PRECISION) {
                     return DataTypes.TIMESTAMP(length);
                 } else {
                     throw new UnsupportedOperationException(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java
new file mode 100644
index 000000000..48ab55ba4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.format;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import io.debezium.relational.history.TableChanges;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Debezium Event Records Entity. */
+public class DebeziumEvent {
+
+    private static final String FIELD_PAYLOAD = "payload";
+    private static final String FIELD_SCHEMA = "schema";
+    private static final String FIELD_SOURCE = "source";
+    private static final String FIELD_BEFORE = "before";
+    private static final String FIELD_AFTER = "after";
+    private static final String FIELD_HISTORY_RECORD = "historyRecord";
+    private static final String FIELD_OP = "op";
+    private static final String FIELD_DB = "db";
+    private static final String FIELD_TABLE = "table";
+    private static final String FIELD_FIELDS = "fields";
+    private static final String FIELD_NAME = "name";
+    private static final String FIELD_TYPE = "type";
+    private static final String FIELD_FIELD = "field";
+    private static final String FIELD_OPTIONAL = "optional";
+
+    @JsonProperty(FIELD_PAYLOAD)
+    private final Payload payload;
+
+    @JsonProperty(FIELD_SCHEMA)
+    private final Field schema;
+
+    @JsonCreator
+    public DebeziumEvent(
+            @JsonProperty(FIELD_PAYLOAD) Payload payload,
+            @JsonProperty(FIELD_SCHEMA) Field schema) {
+        this.payload = payload;
+        this.schema = schema;
+    }
+
+    @JsonGetter(FIELD_PAYLOAD)
+    public Payload payload() {
+        return payload;
+    }
+
+    @JsonGetter(FIELD_SCHEMA)
+    public Field schema() {
+        return schema;
+    }
+
+    /** Payload elements in Debezium event record. */
+    public static class Payload {
+        @JsonProperty(FIELD_SOURCE)
+        private final Source source;
+
+        @JsonProperty(FIELD_BEFORE)
+        private final JsonNode before;
+
+        @JsonProperty(FIELD_AFTER)
+        private final JsonNode after;
+
+        @JsonProperty(FIELD_HISTORY_RECORD)
+        private final String historyRecord;
+
+        @JsonProperty(FIELD_OP)
+        private final String op;
+
+        @JsonCreator
+        public Payload(
+                @JsonProperty(FIELD_SOURCE) Source source,
+                @JsonProperty(FIELD_BEFORE) JsonNode before,
+                @JsonProperty(FIELD_AFTER) JsonNode after,
+                @JsonProperty(FIELD_HISTORY_RECORD) String historyRecord,
+                @JsonProperty(FIELD_OP) String op) {
+            this.source = source;
+            this.before = before;
+            this.after = after;
+            this.historyRecord = historyRecord;
+            this.op = op;
+        }
+
+        @JsonGetter(FIELD_SOURCE)
+        public Source source() {
+            return source;
+        }
+
+        @JsonGetter(FIELD_BEFORE)
+        public JsonNode before() {
+            return before;
+        }
+
+        @JsonGetter(FIELD_AFTER)
+        public JsonNode after() {
+            return after;
+        }
+
+        @JsonGetter(FIELD_HISTORY_RECORD)
+        public String historyRecord() {
+            return historyRecord;
+        }
+
+        @JsonGetter(FIELD_OP)
+        public String op() {
+            return op;
+        }
+
+        public boolean isSchemaChange() {
+            return op() == null;
+        }
+
+        public boolean hasHistoryRecord() {
+            return historyRecord != null;
+        }
+
+        /** Get table changes in history record. */
+        public Iterator<TableChanges.TableChange> getTableChanges() throws 
IOException {
+            return 
DebeziumEventUtils.getTableChanges(historyRecord).iterator();
+        }
+    }
+
+    /** Payload elements in Debezium event record. */
+    public static class Field {
+
+        @JsonProperty(FIELD_FIELD)
+        private final String field;
+
+        @JsonProperty(FIELD_TYPE)
+        private final String type;
+
+        @JsonProperty(FIELD_NAME)
+        private final String name;
+
+        @JsonProperty(FIELD_OPTIONAL)
+        private final Boolean optional;
+
+        @JsonProperty(FIELD_FIELDS)
+        private final List<Field> fields;
+
+        @JsonCreator
+        public Field(
+                @JsonProperty(FIELD_FIELD) String field,
+                @JsonProperty(FIELD_TYPE) String type,
+                @JsonProperty(FIELD_NAME) String name,
+                @JsonProperty(FIELD_OPTIONAL) Boolean optional,
+                @JsonProperty(FIELD_FIELDS) List<Field> fields) {
+            this.field = field;
+            this.type = type;
+            this.name = name;
+            this.optional = optional;
+            this.fields = fields;
+        }
+
+        @JsonGetter(FIELD_FIELD)
+        public String field() {
+            return field;
+        }
+
+        @JsonGetter(FIELD_TYPE)
+        public String type() {
+            return type;
+        }
+
+        @JsonGetter(FIELD_NAME)
+        public String name() {
+            return name;
+        }
+
+        @JsonGetter(FIELD_OPTIONAL)
+        public Boolean optional() {
+            return optional;
+        }
+
+        @JsonGetter(FIELD_FIELDS)
+        public List<Field> fields() {
+            return fields;
+        }
+
+        public Map<String, Field> beforeAndAfterFields() {
+            return fields.stream()
+                    .filter(
+                            item ->
+                                    FIELD_BEFORE.equals(item.field)
+                                            || FIELD_AFTER.equals(item.field))
+                    .flatMap(item -> item.fields.stream())
+                    .collect(
+                            Collectors.toMap(
+                                    Field::field,
+                                    Function.identity(),
+                                    (v1, v2) -> v2,
+                                    LinkedHashMap::new));
+        }
+    }
+
+    /** Source element of payload in Debezium event record. */
+    public static class Source {
+        @JsonProperty(FIELD_DB)
+        private final String db;
+
+        @JsonProperty(FIELD_TABLE)
+        private final String table;
+
+        @JsonCreator
+        public Source(@JsonProperty(FIELD_DB) String db, 
@JsonProperty(FIELD_TABLE) String table) {
+            this.db = db;
+            this.table = table;
+        }
+
+        @JsonGetter(FIELD_DB)
+        public String db() {
+            return db;
+        }
+
+        @JsonGetter(FIELD_TABLE)
+        public String table() {
+            return table;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
new file mode 100644
index 000000000..562d138ad
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.format;
+
+import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import io.debezium.document.Array;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
+
+import java.io.IOException;
+
+/** A utility class that provide abilities for debezium event {@link 
DebeziumEvent}. */
+public class DebeziumEventUtils {
+
+    private static final DocumentReader DOCUMENT_READER = 
DocumentReader.defaultReader();
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
+
+    public static HistoryRecord getHistoryRecord(String historyRecordStr) 
throws IOException {
+        return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
+    }
+
+    public static TableChanges getTableChanges(String historyRecordStr) throws 
IOException {
+        HistoryRecord historyRecord = getHistoryRecord(historyRecordStr);
+        Array tableChanges = 
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+        return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java
new file mode 100644
index 000000000..0e27d0b11
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.debezium.relational.history.TableChanges;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DebeziumEvent}. */
+public class DebeziumEventTest {
+
+    private ObjectMapper objectMapper;
+
+    @BeforeEach
+    public void before() {
+        objectMapper = new ObjectMapper();
+        objectMapper
+                .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
+                .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, 
true)
+                .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+    }
+
+    @Test
+    public void testDeserialize() throws IOException {
+        final URL url =
+                DebeziumEventTest.class
+                        .getClassLoader()
+                        .getResource("mysql/debezium-event-change.json");
+        assertThat(url).isNotNull();
+        DebeziumEvent debeziumEvent = objectMapper.readValue(url, 
DebeziumEvent.class);
+        assertThat(debeziumEvent).isNotNull();
+        assertThat(debeziumEvent.payload().hasHistoryRecord()).isTrue();
+        Iterator<TableChanges.TableChange> tableChanges = 
debeziumEvent.payload().getTableChanges();
+        assertThat(Iterators.size(tableChanges)).isEqualTo(1);
+        tableChanges.forEachRemaining(
+                tableChange -> {
+                    
assertThat(tableChange.getType()).isEqualTo(TableChanges.TableChangeType.ALTER);
+                    assertThat(tableChange.getTable().id().toString())
+                            .isEqualTo("tinyint1_not_bool_test.t1");
+                });
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json
new file mode 100644
index 000000000..d3c11acef
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+    "schema":{
+        "type":"struct",
+        "fields":[
+            {
+                "type":"struct",
+                "fields":[
+                    {
+                        "type":"string",
+                        "optional":false,
+                        "field":"version"
+                    },
+                    {
+                        "type":"string",
+                        "optional":false,
+                        "field":"connector"
+                    },
+                    {
+                        "type":"string",
+                        "optional":false,
+                        "field":"name"
+                    },
+                    {
+                        "type":"int64",
+                        "optional":false,
+                        "field":"ts_ms"
+                    },
+                    {
+                        "type":"string",
+                        "optional":true,
+                        "name":"io.debezium.data.Enum",
+                        "version":1,
+                        "parameters":{
+                            "allowed":"true,last,false"
+                        },
+                        "default":"false",
+                        "field":"snapshot"
+                    },
+                    {
+                        "type":"string",
+                        "optional":false,
+                        "field":"db"
+                    },
+                    {
+                        "type":"string",
+                        "optional":true,
+                        "field":"sequence"
+                    },
+                    {
+                        "type":"string",
+                        "optional":true,
+                        "field":"table"
+                    },
+                    {
+                        "type":"int64",
+                        "optional":false,
+                        "field":"server_id"
+                    },
+                    {
+                        "type":"string",
+                        "optional":true,
+                        "field":"gtid"
+                    },
+                    {
+                        "type":"string",
+                        "optional":false,
+                        "field":"file"
+                    },
+                    {
+                        "type":"int64",
+                        "optional":false,
+                        "field":"pos"
+                    },
+                    {
+                        "type":"int32",
+                        "optional":false,
+                        "field":"row"
+                    },
+                    {
+                        "type":"int64",
+                        "optional":true,
+                        "field":"thread"
+                    },
+                    {
+                        "type":"string",
+                        "optional":true,
+                        "field":"query"
+                    }
+                ],
+                "optional":false,
+                "name":"io.debezium.connector.mysql.Source",
+                "field":"source"
+            },
+            {
+                "type":"string",
+                "optional":true,
+                "field":"historyRecord"
+            }
+        ],
+        "optional":false,
+        "name":"io.debezium.connector.mysql.SchemaChangeValue"
+    },
+    "payload":{
+        "source":{
+            "version":"1.6.4.Final",
+            "connector":"mysql",
+            "name":"mysql_binlog_source",
+            "ts_ms":1695203563233,
+            "snapshot":"false",
+            "db":"tinyint1_not_bool_test",
+            "sequence":null,
+            "table":"t1",
+            "server_id":223344,
+            "gtid":null,
+            "file":"mysql-bin.000003",
+            "pos":219,
+            "row":0,
+            "thread":null,
+            "query":null
+        },
+        
"historyRecord":"{\"source\":{\"file\":\"mysql-bin.000003\",\"pos\":219,\"server_id\":223344},\"position\":{\"transaction_id\":null,\"ts_sec\":1695203563,\"file\":\"mysql-bin.000003\",\"pos\":379,\"server_id\":223344},\"databaseName\":\"tinyint1_not_bool_test\",\"ddl\":\"ALTER
 TABLE t1 ADD COLUMN _new_tinyint1 
TINYINT(1)\",\"tableChanges\":[{\"type\":\"ALTER\",\"id\":\"\\\"tinyint1_not_bool_test\\\".\\\"t1\\\"\",\"table\":{\"defaultCharsetName\":\"latin1\",\"primaryKeyColumnNames
 [...]
+    }
+}

Reply via email to