This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 77f10a04a [cdc] Refactor mysql debezium json event parser (#2053)
77f10a04a is described below
commit 77f10a04af71449aecb1e4f76acdf6d559b159da
Author: Kerwin <[email protected]>
AuthorDate: Tue Sep 26 15:02:03 2023 +0800
[cdc] Refactor mysql debezium json event parser (#2053)
---
.../flink/action/cdc/kafka/KafkaSchemaUtils.java | 6 +-
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 4 +-
.../action/cdc/kafka/KafkaSyncTableAction.java | 4 +-
.../cdc/kafka/{formats => format}/DataFormat.java | 8 +-
.../kafka/{formats => format}/RecordParser.java | 2 +-
.../{formats => format}/RecordParserFactory.java | 2 +-
.../canal/CanalFieldParser.java | 2 +-
.../canal/CanalRecordParser.java | 4 +-
.../maxwell/MaxwellRecordParser.java | 4 +-
.../{formats => format}/ogg/OggRecordParser.java | 4 +-
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 179 ++++++---------
.../action/cdc/mysql/MySqlTableSchemaBuilder.java | 70 +++---
.../flink/action/cdc/mysql/MySqlTypeUtils.java | 4 +-
.../action/cdc/mysql/format/DebeziumEvent.java | 245 +++++++++++++++++++++
.../cdc/mysql/format/DebeziumEventUtils.java | 45 ++++
.../flink/action/cdc/mysql/DebeziumEventTest.java | 71 ++++++
.../resources/mysql/debezium-event-change.json | 141 ++++++++++++
17 files changed, 626 insertions(+), 169 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
index 330df8e79..7ed9a4aab 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSchemaUtils.java
@@ -19,8 +19,8 @@
package org.apache.paimon.flink.action.cdc.kafka;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
import org.apache.paimon.schema.Schema;
import org.apache.flink.configuration.Configuration;
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static
org.apache.paimon.flink.action.cdc.kafka.KafkaActionUtils.kafkaPropertiesGroupId;
-import static
org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat.getDataFormat;
+import static
org.apache.paimon.flink.action.cdc.kafka.format.DataFormat.getDataFormat;
/** Utility class to load kafka schema. */
public class KafkaSchemaUtils {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 61aa45108..e8df881fb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -24,8 +24,8 @@ import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index 87d3f0ddb..b00fa38b0 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -26,8 +26,8 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.DataFormat;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.DataFormat;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.CdcSinkBuilder;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java
similarity index 91%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java
index 71c4af8f4..e178fec40 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/DataFormat.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/DataFormat.java
@@ -16,13 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats;
+package org.apache.paimon.flink.action.cdc.kafka.format;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import
org.apache.paimon.flink.action.cdc.kafka.formats.canal.CanalRecordParser;
-import
org.apache.paimon.flink.action.cdc.kafka.formats.maxwell.MaxwellRecordParser;
-import org.apache.paimon.flink.action.cdc.kafka.formats.ogg.OggRecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.canal.CanalRecordParser;
+import
org.apache.paimon.flink.action.cdc.kafka.format.maxwell.MaxwellRecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.ogg.OggRecordParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java
similarity index 99%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java
index bf6be1b7d..2c0ffb19a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParser.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats;
+package org.apache.paimon.flink.action.cdc.kafka.format;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java
similarity index 96%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java
index a47bca23f..e83d4b0d7 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/RecordParserFactory.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/RecordParserFactory.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats;
+package org.apache.paimon.flink.action.cdc.kafka.format;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java
similarity index 98%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java
index cb77fe8e7..619da6b79 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalFieldParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalFieldParser.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
+package org.apache.paimon.flink.action.cdc.kafka.format.canal;
/** Converts some special types such as enum、set、geometry. */
public class CanalFieldParser {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java
similarity index 98%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java
index e0f2ec748..62ca27a4f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/canal/CanalRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/canal/CanalRecordParser.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats.canal;
+package org.apache.paimon.flink.action.cdc.kafka.format.canal;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java
similarity index 96%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java
index 8b51edad9..f116d5d27 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/maxwell/MaxwellRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/maxwell/MaxwellRecordParser.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats.maxwell;
+package org.apache.paimon.flink.action.cdc.kafka.format.maxwell;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java
similarity index 97%
rename from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
rename to
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java
index 7f4fcc76a..0c389d48d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/formats/ogg/OggRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/format/ogg/OggRecordParser.java
@@ -16,11 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action.cdc.kafka.formats.ogg;
+package org.apache.paimon.flink.action.cdc.kafka.format.ogg;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
-import org.apache.paimon.flink.action.cdc.kafka.formats.RecordParser;
+import org.apache.paimon.flink.action.cdc.kafka.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index fd6c23e0d..e18cc1931 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -27,18 +27,17 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -69,6 +68,8 @@ import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -77,7 +78,6 @@ import java.util.regex.Pattern;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
-import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
/** {@link EventParser} for MySQL Debezium JSON. */
@@ -90,15 +90,15 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
private final boolean caseSensitive;
private final TableNameConverter tableNameConverter;
private final List<ComputedColumn> computedColumns;
- private final NewTableSchemaBuilder<JsonNode> schemaBuilder;
+ private final NewTableSchemaBuilder<TableChanges.TableChange>
schemaBuilder;
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
private final TypeMapping typeMapping;
- private JsonNode root;
- private JsonNode payload;
+ private DebeziumEvent root;
+
// NOTE: current table name is not converted by tableNameConverter
private String currentTable;
private boolean shouldSynchronizeCurrentTable;
@@ -113,7 +113,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
caseSensitive,
computedColumns,
new TableNameConverter(caseSensitive),
- ddl -> Optional.empty(),
+ new MySqlTableSchemaBuilder(new HashMap<>(), caseSensitive,
typeMapping),
null,
null,
typeMapping);
@@ -123,7 +123,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
ZoneId serverTimeZone,
boolean caseSensitive,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<JsonNode> schemaBuilder,
+ NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
TypeMapping typeMapping) {
@@ -143,7 +143,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
boolean caseSensitive,
List<ComputedColumn> computedColumns,
TableNameConverter tableNameConverter,
- NewTableSchemaBuilder<JsonNode> schemaBuilder,
+ NewTableSchemaBuilder<TableChanges.TableChange> schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
TypeMapping typeMapping) {
@@ -160,10 +160,11 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public void setRawEvent(String rawEvent) {
try {
-
objectMapper.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true);
- root = objectMapper.readValue(rawEvent, JsonNode.class);
- payload = root.get("payload");
- currentTable = payload.get("source").get("table").asText();
+ objectMapper
+
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
+
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
+ currentTable = root.payload().source().table();
shouldSynchronizeCurrentTable = shouldSynchronizeCurrentTable();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -175,63 +176,37 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
return tableNameConverter.convert(Identifier.create(getDatabaseName(),
currentTable));
}
- private boolean isSchemaChange() {
- return payload.get("op") == null;
- }
-
@Override
public List<DataField> parseSchemaChange() {
- if (!shouldSynchronizeCurrentTable || !isSchemaChange()) {
+ if (!shouldSynchronizeCurrentTable ||
!root.payload().isSchemaChange()) {
return Collections.emptyList();
}
- JsonNode historyRecord = payload.get("historyRecord");
- if (historyRecord == null) {
+ DebeziumEvent.Payload payload = root.payload();
+ if (!payload.hasHistoryRecord()) {
return Collections.emptyList();
}
- JsonNode columns;
+ TableChanges.TableChange tableChange = null;
try {
- String historyRecordString = historyRecord.asText();
- JsonNode tableChanges =
objectMapper.readTree(historyRecordString).get("tableChanges");
- if (tableChanges.size() != 1) {
+ Iterator<TableChanges.TableChange> tableChanges =
payload.getTableChanges();
+ long count;
+ for (count = 0L; tableChanges.hasNext(); ++count) {
+ tableChange = tableChanges.next();
+ }
+ if (count != 1) {
LOG.error(
"Invalid historyRecord, because tableChanges should
contain exactly 1 item.\n"
- + historyRecord.asText());
+ + payload.historyRecord());
return Collections.emptyList();
}
- columns = tableChanges.get(0).get("table").get("columns");
} catch (Exception e) {
LOG.info("Failed to parse history record for schema changes", e);
return Collections.emptyList();
}
- if (columns == null) {
- return Collections.emptyList();
- }
- List<DataField> result = new ArrayList<>();
- for (int i = 0; i < columns.size(); i++) {
- JsonNode column = columns.get(i);
- JsonNode length = column.get("length");
- JsonNode scale = column.get("scale");
- DataType dataType =
- MySqlTypeUtils.toDataType(
- column.get("typeName").asText(),
- length == null ? null : length.asInt(),
- scale == null ? null : scale.asInt(),
- typeMapping);
-
- dataType =
- dataType.copy(
- typeMapping.containsMode(TO_NULLABLE)
- || column.get("optional").asBoolean());
-
- String fieldName = column.get("name").asText();
- result.add(
- new DataField(
- i, caseSensitive ? fieldName :
fieldName.toLowerCase(), dataType));
- }
- return result;
+ Optional<Schema> schema = schemaBuilder.build(tableChange);
+ return schema.get().fields();
}
@Override
@@ -240,31 +215,31 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
return Optional.empty();
}
- JsonNode historyRecord = payload.get("historyRecord");
- if (historyRecord == null) {
+ DebeziumEvent.Payload payload = root.payload();
+ if (!payload.hasHistoryRecord()) {
return Optional.empty();
}
try {
- String historyRecordString = historyRecord.asText();
- JsonNode tableChanges =
objectMapper.readTree(historyRecordString).get("tableChanges");
- if (tableChanges.size() != 1) {
+ TableChanges.TableChange tableChange = null;
+ Iterator<TableChanges.TableChange> tableChanges =
payload.getTableChanges();
+ long count;
+ for (count = 0L; tableChanges.hasNext(); ++count) {
+ tableChange = tableChanges.next();
+ }
+ if (count != 1) {
LOG.error(
"Invalid historyRecord, because tableChanges should
contain exactly 1 item.\n"
- + historyRecord.asText());
+ + payload.historyRecord());
return Optional.empty();
}
- JsonNode tableChange = tableChanges.get(0);
- if (!tableChange
- .get("type")
- .asText()
- .equals(TableChanges.TableChangeType.CREATE.name())) {
+ if (TableChanges.TableChangeType.CREATE != tableChange.getType()) {
return Optional.empty();
}
- JsonNode primaryKeyColumnNames =
tableChange.get("table").get("primaryKeyColumnNames");
- if (primaryKeyColumnNames.size() == 0) {
+ List<String> primaryKeyColumnNames =
tableChange.getTable().primaryKeyColumnNames();
+ if (primaryKeyColumnNames.isEmpty()) {
LOG.debug(
"Didn't find primary keys from MySQL DDL for table
'{}'. "
+ "This table won't be synchronized.",
@@ -283,19 +258,19 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
@Override
public List<CdcRecord> parseRecords() {
- if (!shouldSynchronizeCurrentTable || isSchemaChange()) {
+ if (!shouldSynchronizeCurrentTable || root.payload().isSchemaChange())
{
return Collections.emptyList();
}
List<CdcRecord> records = new ArrayList<>();
- Map<String, String> before = extractRow(payload.get("before"));
- if (before.size() > 0) {
+ Map<String, String> before = extractRow(root.payload().before());
+ if (!before.isEmpty()) {
before = mapKeyCaseConvert(before, caseSensitive,
recordKeyDuplicateErrMsg(before));
records.add(new CdcRecord(RowKind.DELETE, before));
}
- Map<String, String> after = extractRow(payload.get("after"));
- if (after.size() > 0) {
+ Map<String, String> after = extractRow(root.payload().after());
+ if (!after.isEmpty()) {
after = mapKeyCaseConvert(after, caseSensitive,
recordKeyDuplicateErrMsg(after));
records.add(new CdcRecord(RowKind.INSERT, after));
}
@@ -304,57 +279,34 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
private String getDatabaseName() {
- return payload.get("source").get("db").asText();
+ return root.payload().source().db();
}
private Map<String, String> extractRow(JsonNode recordRow) {
- JsonNode schema =
+ if (recordRow == null) {
+ return new HashMap<>();
+ }
+
+ DebeziumEvent.Field schema =
Preconditions.checkNotNull(
- root.get("schema"),
+ root.schema(),
"MySqlDebeziumJsonEventParser only supports debezium
JSON with schema. "
+ "Please make sure that `includeSchema` is
true "
+ "in the JsonDebeziumDeserializationSchema
you created");
- Map<String, String> mySqlFieldTypes = new HashMap<>();
- Map<String, String> fieldClassNames = new HashMap<>();
- JsonNode arrayNode = schema.get("fields");
- for (int i = 0; i < arrayNode.size(); i++) {
- JsonNode elementNode = arrayNode.get(i);
- String field = elementNode.get("field").asText();
- if ("before".equals(field) || "after".equals(field)) {
- JsonNode innerArrayNode = elementNode.get("fields");
- for (int j = 0; j < innerArrayNode.size(); j++) {
- JsonNode innerElementNode = innerArrayNode.get(j);
- String fieldName = innerElementNode.get("field").asText();
- String fieldType = innerElementNode.get("type").asText();
- mySqlFieldTypes.put(fieldName, fieldType);
- if (innerElementNode.get("name") != null) {
- String className =
innerElementNode.get("name").asText();
- fieldClassNames.put(fieldName, className);
- }
- }
- }
- }
-
- // the geometry, point type can not be converted to string, so we
convert it to Object
- // first.
- Map<String, Object> jsonMap =
- objectMapper.convertValue(recordRow, new
TypeReference<Map<String, Object>>() {});
- if (jsonMap == null) {
- return new HashMap<>();
- }
+ Map<String, DebeziumEvent.Field> fields =
schema.beforeAndAfterFields();
- Map<String, String> resultMap = new HashMap<>();
- for (Map.Entry<String, String> field : mySqlFieldTypes.entrySet()) {
+ LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
+ for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet())
{
String fieldName = field.getKey();
- String mySqlType = field.getValue();
- Object objectValue = jsonMap.get(fieldName);
- if (objectValue == null) {
+ String mySqlType = field.getValue().type();
+ JsonNode objectValue = recordRow.get(fieldName);
+ if (objectValue == null || objectValue.isNull()) {
continue;
}
- String className = fieldClassNames.get(fieldName);
- String oldValue = objectValue.toString();
+ String className = field.getValue().name();
+ String oldValue = objectValue.asText();
String newValue = oldValue;
if (Bits.LOGICAL_NAME.equals(className)) {
@@ -444,13 +396,13 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
.toString();
} else if (Point.LOGICAL_NAME.equals(className)
|| Geometry.LOGICAL_NAME.equals(className)) {
- JsonNode jsonNode = recordRow.get(fieldName);
try {
- byte[] wkb = jsonNode.get("wkb").binaryValue();
+ byte[] wkb =
objectValue.get(Geometry.WKB_FIELD).binaryValue();
newValue = MySqlTypeUtils.convertWkbArray(wkb);
} catch (Exception e) {
throw new IllegalArgumentException(
- String.format("Failed to convert %s to geometry
JSON.", jsonNode), e);
+ String.format("Failed to convert %s to geometry
JSON.", objectValue),
+ e);
}
}
@@ -468,6 +420,11 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
}
private boolean shouldSynchronizeCurrentTable() {
+ // When database DDL operation, the current table is null.
+ if (currentTable == null) {
+ return false;
+ }
+
if (excludedTables.contains(currentTable)) {
return false;
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
index aec731fb2..24edbc66f 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTableSchemaBuilder.java
@@ -23,22 +23,22 @@ import
org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataType;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
-import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static
org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Schema builder for MySQL cdc. */
-public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode> {
+public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<TableChanges.TableChange> {
private final Map<String, String> tableConfig;
private final boolean caseSensitive;
@@ -52,47 +52,45 @@ public class MySqlTableSchemaBuilder implements
NewTableSchemaBuilder<JsonNode>
}
@Override
- public Optional<Schema> build(JsonNode tableChange) {
- JsonNode jsonTable = tableChange.get("table");
- String tableName = tableChange.get("id").asText();
- ArrayNode columns = (ArrayNode) jsonTable.get("columns");
- LinkedHashMap<String, DataType> fields = new LinkedHashMap<>();
-
- for (JsonNode element : columns) {
- JsonNode length = element.get("length");
- JsonNode scale = element.get("scale");
+ public Optional<Schema> build(TableChanges.TableChange tableChange) {
+ Table table = tableChange.getTable();
+ String tableName = tableChange.getId().toString();
+ List<Column> columns = table.columns();
+
+ Schema.Builder builder = Schema.newBuilder();
+ Map<String, Integer> duplicateFields = new HashMap<>();
+
+ // column
+ for (Column column : columns) {
DataType dataType =
MySqlTypeUtils.toDataType(
- element.get("typeExpression").asText(),
- length == null ? null : length.asInt(),
- scale == null ? null : scale.asInt(),
+ column.typeExpression(),
+ column.length(),
+ column.scale().orElse(null),
typeMapping);
- dataType =
- dataType.copy(
- typeMapping.containsMode(TO_NULLABLE)
- || element.get("optional").asBoolean());
+ dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) ||
column.isOptional());
+ String columnName = column.name();
+ if (!caseSensitive) {
+ checkArgument(
+ !duplicateFields.containsKey(columnName.toLowerCase()),
+ columnDuplicateErrMsg(tableName).apply(columnName));
+ columnName = columnName.toLowerCase();
+ }
// TODO : add table comment and column comment when we upgrade
flink cdc to 2.4
- fields.put(element.get("name").asText(), dataType);
+ builder.column(columnName, dataType, null);
+ duplicateFields.put(columnName, 1);
}
- ArrayNode arrayNode = (ArrayNode)
jsonTable.get("primaryKeyColumnNames");
- List<String> primaryKeys = new ArrayList<>();
- for (JsonNode primary : arrayNode) {
- primaryKeys.add(primary.asText());
- }
-
- fields = mapKeyCaseConvert(fields, caseSensitive,
columnDuplicateErrMsg(tableName));
+ // primaryKey
+ List<String> primaryKeys = table.primaryKeyColumnNames();
primaryKeys = listCaseConvert(primaryKeys, caseSensitive);
+ builder.primaryKey(primaryKeys);
- Schema.Builder builder = Schema.newBuilder();
+ // options
builder.options(tableConfig);
- for (Map.Entry<String, DataType> entry : fields.entrySet()) {
- builder.column(entry.getKey(), entry.getValue());
- }
- Schema schema = builder.primaryKey(primaryKeys).build();
- return Optional.of(schema);
+ return Optional.of(builder.build());
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
index a9a3c9630..616ba5726 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlTypeUtils.java
@@ -228,7 +228,7 @@ public class MySqlTypeUtils {
return DataTypes.TIME();
case DATETIME:
case TIMESTAMP:
- if (length == null) {
+ if (length == null || length <= 0) {
// default precision is 0
// see
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-type-syntax.html
return DataTypes.TIMESTAMP(0);
@@ -241,7 +241,7 @@ public class MySqlTypeUtils {
} else {
return DataTypes.TIMESTAMP(0);
}
- } else if (length >= 0 && length <=
TimestampType.MAX_PRECISION) {
+ } else if (length <= TimestampType.MAX_PRECISION) {
return DataTypes.TIMESTAMP(length);
} else {
throw new UnsupportedOperationException(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java
new file mode 100644
index 000000000..48ab55ba4
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEvent.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.format;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import io.debezium.relational.history.TableChanges;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Debezium Event Records Entity. */
+public class DebeziumEvent {
+
+ private static final String FIELD_PAYLOAD = "payload";
+ private static final String FIELD_SCHEMA = "schema";
+ private static final String FIELD_SOURCE = "source";
+ private static final String FIELD_BEFORE = "before";
+ private static final String FIELD_AFTER = "after";
+ private static final String FIELD_HISTORY_RECORD = "historyRecord";
+ private static final String FIELD_OP = "op";
+ private static final String FIELD_DB = "db";
+ private static final String FIELD_TABLE = "table";
+ private static final String FIELD_FIELDS = "fields";
+ private static final String FIELD_NAME = "name";
+ private static final String FIELD_TYPE = "type";
+ private static final String FIELD_FIELD = "field";
+ private static final String FIELD_OPTIONAL = "optional";
+
+ @JsonProperty(FIELD_PAYLOAD)
+ private final Payload payload;
+
+ @JsonProperty(FIELD_SCHEMA)
+ private final Field schema;
+
+ @JsonCreator
+ public DebeziumEvent(
+ @JsonProperty(FIELD_PAYLOAD) Payload payload,
+ @JsonProperty(FIELD_SCHEMA) Field schema) {
+ this.payload = payload;
+ this.schema = schema;
+ }
+
+ @JsonGetter(FIELD_PAYLOAD)
+ public Payload payload() {
+ return payload;
+ }
+
+ @JsonGetter(FIELD_SCHEMA)
+ public Field schema() {
+ return schema;
+ }
+
+ /** Payload elements in Debezium event record. */
+ public static class Payload {
+ @JsonProperty(FIELD_SOURCE)
+ private final Source source;
+
+ @JsonProperty(FIELD_BEFORE)
+ private final JsonNode before;
+
+ @JsonProperty(FIELD_AFTER)
+ private final JsonNode after;
+
+ @JsonProperty(FIELD_HISTORY_RECORD)
+ private final String historyRecord;
+
+ @JsonProperty(FIELD_OP)
+ private final String op;
+
+ @JsonCreator
+ public Payload(
+ @JsonProperty(FIELD_SOURCE) Source source,
+ @JsonProperty(FIELD_BEFORE) JsonNode before,
+ @JsonProperty(FIELD_AFTER) JsonNode after,
+ @JsonProperty(FIELD_HISTORY_RECORD) String historyRecord,
+ @JsonProperty(FIELD_OP) String op) {
+ this.source = source;
+ this.before = before;
+ this.after = after;
+ this.historyRecord = historyRecord;
+ this.op = op;
+ }
+
+ @JsonGetter(FIELD_SOURCE)
+ public Source source() {
+ return source;
+ }
+
+ @JsonGetter(FIELD_BEFORE)
+ public JsonNode before() {
+ return before;
+ }
+
+ @JsonGetter(FIELD_AFTER)
+ public JsonNode after() {
+ return after;
+ }
+
+ @JsonGetter(FIELD_HISTORY_RECORD)
+ public String historyRecord() {
+ return historyRecord;
+ }
+
+ @JsonGetter(FIELD_OP)
+ public String op() {
+ return op;
+ }
+
+ public boolean isSchemaChange() {
+ return op() == null;
+ }
+
+ public boolean hasHistoryRecord() {
+ return historyRecord != null;
+ }
+
+ /** Get table changes in history record. */
+ public Iterator<TableChanges.TableChange> getTableChanges() throws
IOException {
+ return
DebeziumEventUtils.getTableChanges(historyRecord).iterator();
+ }
+ }
+
+ /** Payload elements in Debezium event record. */
+ public static class Field {
+
+ @JsonProperty(FIELD_FIELD)
+ private final String field;
+
+ @JsonProperty(FIELD_TYPE)
+ private final String type;
+
+ @JsonProperty(FIELD_NAME)
+ private final String name;
+
+ @JsonProperty(FIELD_OPTIONAL)
+ private final Boolean optional;
+
+ @JsonProperty(FIELD_FIELDS)
+ private final List<Field> fields;
+
+ @JsonCreator
+ public Field(
+ @JsonProperty(FIELD_FIELD) String field,
+ @JsonProperty(FIELD_TYPE) String type,
+ @JsonProperty(FIELD_NAME) String name,
+ @JsonProperty(FIELD_OPTIONAL) Boolean optional,
+ @JsonProperty(FIELD_FIELDS) List<Field> fields) {
+ this.field = field;
+ this.type = type;
+ this.name = name;
+ this.optional = optional;
+ this.fields = fields;
+ }
+
+ @JsonGetter(FIELD_FIELD)
+ public String field() {
+ return field;
+ }
+
+ @JsonGetter(FIELD_TYPE)
+ public String type() {
+ return type;
+ }
+
+ @JsonGetter(FIELD_NAME)
+ public String name() {
+ return name;
+ }
+
+ @JsonGetter(FIELD_OPTIONAL)
+ public Boolean optional() {
+ return optional;
+ }
+
+ @JsonGetter(FIELD_FIELDS)
+ public List<Field> fields() {
+ return fields;
+ }
+
+ public Map<String, Field> beforeAndAfterFields() {
+ return fields.stream()
+ .filter(
+ item ->
+ FIELD_BEFORE.equals(item.field)
+ || FIELD_AFTER.equals(item.field))
+ .flatMap(item -> item.fields.stream())
+ .collect(
+ Collectors.toMap(
+ Field::field,
+ Function.identity(),
+ (v1, v2) -> v2,
+ LinkedHashMap::new));
+ }
+ }
+
+ /** Source element of payload in Debezium event record. */
+ public static class Source {
+ @JsonProperty(FIELD_DB)
+ private final String db;
+
+ @JsonProperty(FIELD_TABLE)
+ private final String table;
+
+ @JsonCreator
+ public Source(@JsonProperty(FIELD_DB) String db,
@JsonProperty(FIELD_TABLE) String table) {
+ this.db = db;
+ this.table = table;
+ }
+
+ @JsonGetter(FIELD_DB)
+ public String db() {
+ return db;
+ }
+
+ @JsonGetter(FIELD_TABLE)
+ public String table() {
+ return table;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
new file mode 100644
index 000000000..562d138ad
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql.format;
+
+import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import io.debezium.document.Array;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
+
+import java.io.IOException;
+
+/** A utility class that provide abilities for debezium event {@link
DebeziumEvent}. */
+public class DebeziumEventUtils {
+
+ private static final DocumentReader DOCUMENT_READER =
DocumentReader.defaultReader();
+ private static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
+ new FlinkJsonTableChangeSerializer();
+
+ public static HistoryRecord getHistoryRecord(String historyRecordStr)
throws IOException {
+ return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
+ }
+
+ public static TableChanges getTableChanges(String historyRecordStr) throws
IOException {
+ HistoryRecord historyRecord = getHistoryRecord(historyRecordStr);
+ Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+ return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java
new file mode 100644
index 000000000..0e27d0b11
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/DebeziumEventTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.mysql;
+
+import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonParser;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.debezium.relational.history.TableChanges;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Iterator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DebeziumEvent}. */
+public class DebeziumEventTest {
+
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ public void before() {
+ objectMapper = new ObjectMapper();
+ objectMapper
+ .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
+ .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS,
true)
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ }
+
+ @Test
+ public void testDeserialize() throws IOException {
+ final URL url =
+ DebeziumEventTest.class
+ .getClassLoader()
+ .getResource("mysql/debezium-event-change.json");
+ assertThat(url).isNotNull();
+ DebeziumEvent debeziumEvent = objectMapper.readValue(url,
DebeziumEvent.class);
+ assertThat(debeziumEvent).isNotNull();
+ assertThat(debeziumEvent.payload().hasHistoryRecord()).isTrue();
+ Iterator<TableChanges.TableChange> tableChanges =
debeziumEvent.payload().getTableChanges();
+ assertThat(Iterators.size(tableChanges)).isEqualTo(1);
+ tableChanges.forEachRemaining(
+ tableChange -> {
+
assertThat(tableChange.getType()).isEqualTo(TableChanges.TableChangeType.ALTER);
+ assertThat(tableChange.getTable().id().toString())
+ .isEqualTo("tinyint1_not_bool_test.t1");
+ });
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json
new file mode 100644
index 000000000..d3c11acef
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/debezium-event-change.json
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "schema":{
+ "type":"struct",
+ "fields":[
+ {
+ "type":"struct",
+ "fields":[
+ {
+ "type":"string",
+ "optional":false,
+ "field":"version"
+ },
+ {
+ "type":"string",
+ "optional":false,
+ "field":"connector"
+ },
+ {
+ "type":"string",
+ "optional":false,
+ "field":"name"
+ },
+ {
+ "type":"int64",
+ "optional":false,
+ "field":"ts_ms"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "name":"io.debezium.data.Enum",
+ "version":1,
+ "parameters":{
+ "allowed":"true,last,false"
+ },
+ "default":"false",
+ "field":"snapshot"
+ },
+ {
+ "type":"string",
+ "optional":false,
+ "field":"db"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"sequence"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"table"
+ },
+ {
+ "type":"int64",
+ "optional":false,
+ "field":"server_id"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"gtid"
+ },
+ {
+ "type":"string",
+ "optional":false,
+ "field":"file"
+ },
+ {
+ "type":"int64",
+ "optional":false,
+ "field":"pos"
+ },
+ {
+ "type":"int32",
+ "optional":false,
+ "field":"row"
+ },
+ {
+ "type":"int64",
+ "optional":true,
+ "field":"thread"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"query"
+ }
+ ],
+ "optional":false,
+ "name":"io.debezium.connector.mysql.Source",
+ "field":"source"
+ },
+ {
+ "type":"string",
+ "optional":true,
+ "field":"historyRecord"
+ }
+ ],
+ "optional":false,
+ "name":"io.debezium.connector.mysql.SchemaChangeValue"
+ },
+ "payload":{
+ "source":{
+ "version":"1.6.4.Final",
+ "connector":"mysql",
+ "name":"mysql_binlog_source",
+ "ts_ms":1695203563233,
+ "snapshot":"false",
+ "db":"tinyint1_not_bool_test",
+ "sequence":null,
+ "table":"t1",
+ "server_id":223344,
+ "gtid":null,
+ "file":"mysql-bin.000003",
+ "pos":219,
+ "row":0,
+ "thread":null,
+ "query":null
+ },
+
"historyRecord":"{\"source\":{\"file\":\"mysql-bin.000003\",\"pos\":219,\"server_id\":223344},\"position\":{\"transaction_id\":null,\"ts_sec\":1695203563,\"file\":\"mysql-bin.000003\",\"pos\":379,\"server_id\":223344},\"databaseName\":\"tinyint1_not_bool_test\",\"ddl\":\"ALTER
TABLE t1 ADD COLUMN _new_tinyint1
TINYINT(1)\",\"tableChanges\":[{\"type\":\"ALTER\",\"id\":\"\\\"tinyint1_not_bool_test\\\".\\\"t1\\\"\",\"table\":{\"defaultCharsetName\":\"latin1\",\"primaryKeyColumnNames
[...]
+ }
+}