This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ff44db116 [feature][connector][cdc] add 
SeaTunnelRowDebeziumDeserializeSchema (#3499)
ff44db116 is described below

commit ff44db116eef6e099a3031dbd44d81460b50e5fe
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Nov 23 16:59:47 2022 +0800

    [feature][connector][cdc] add SeaTunnelRowDebeziumDeserializeSchema (#3499)
    
    * [feature][connector][cdc] add SeaTunnel row Deserialization Converters
    
    * [feature][connector][mysql-cdc] add createDebeziumDeserializationSchema
    
    * [chore] fix checkstyle
    
    * [feature] fill config factory
---
 .../seatunnel/api/table/catalog/TableSchema.java   |  13 +
 .../cdc/base/config/JdbcSourceConfigFactory.java   |  22 +
 .../cdc/base/option/JdbcSourceOptions.java         |  22 +
 .../connectors/cdc/base/option/SourceOptions.java  |   8 +
 .../debezium/DebeziumDeserializationConverter.java |  30 ++
 .../DebeziumDeserializationConverterFactory.java   |  48 ++
 .../connectors/cdc/debezium/MetadataConverter.java |  30 ++
 ...TunnelRowDebeziumDeserializationConverters.java | 538 +++++++++++++++++++++
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java | 212 ++++++++
 .../cdc/debezium/utils/TemporalConversions.java    | 205 ++++++++
 .../connector-cdc/connector-cdc-mysql/pom.xml      |   5 +
 .../cdc/mysql/config/MySqlSourceOptions.java       |  36 --
 .../cdc/mysql/source/MySqlIncrementalSource.java   |  29 +-
 .../seatunnel/jdbc/catalog/JdbcCatalogOptions.java |  29 ++
 14 files changed, 1187 insertions(+), 40 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 61d99ef52..0d076943e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.api.table.catalog;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -49,6 +50,18 @@ public final class TableSchema implements Serializable {
         return columns;
     }
 
+    public SeaTunnelRowType toPhysicalRowDataType() {
+        SeaTunnelDataType<?>[] fieldTypes = columns.stream()
+            .filter(Column::isPhysical)
+            .map(Column::getDataType)
+            .toArray(SeaTunnelDataType[]::new);
+        String[] fields = columns.stream()
+            .filter(Column::isPhysical)
+            .map(Column::getName)
+            .toArray(String[]::new);
+        return new SeaTunnelRowType(fields, fieldTypes);
+    }
+
     public static final class Builder {
         private final List<Column> columns = new ArrayList<>();
 
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index ccbb6de23..0f98969ce 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -17,11 +17,13 @@
 
 package org.apache.seatunnel.connectors.cdc.base.config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
@@ -177,6 +179,26 @@ public abstract class JdbcSourceConfigFactory implements 
SourceConfig.Factory<Jd
         return this;
     }
 
+    public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
+        this.port = config.get(JdbcSourceOptions.PORT);
+        this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
+        this.password = config.get(JdbcSourceOptions.PASSWORD);
+        // TODO: support multi-table
+        this.databaseList = 
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
+        this.tableList = 
Collections.singletonList(config.get(JdbcSourceOptions.TABLE_NAME));
+        this.distributionFactorUpper = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+        this.distributionFactorLower = 
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+        this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
+        this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
+        this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
+        this.connectTimeout = config.get(JdbcSourceOptions.CONNECT_TIMEOUT);
+        this.connectMaxRetries = 
config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
+        this.connectionPoolSize = 
config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
+        this.dbzProperties = new Properties();
+        config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES).ifPresent(map -> 
dbzProperties.putAll(map));
+        return this;
+    }
+
     @Override
     public abstract JdbcSourceConfig create(int subtask);
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 3052b40fb..942497051 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -103,4 +103,26 @@ public class JdbcSourceOptions extends SourceOptions {
                     .defaultValue(3)
                     .withDescription(
                             "The max retry times that the connector should 
retry to build database server connection.");
+
+    public static final Option<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+        Options.key("chunk-key.even-distribution.factor.upper-bound")
+            .doubleType()
+            .defaultValue(1000.0d)
+            .withDescription(
+                "The upper bound of chunk key distribution factor. The 
distribution factor is used to determine whether the"
+                    + " table is evenly distribution or not."
+                    + " The table chunks would use evenly calculation 
optimization when the data distribution is even,"
+                    + " and the query for splitting would happen when it is 
uneven."
+                    + " The distribution factor could be calculated by 
(MAX(id) - MIN(id) + 1) / rowCount.");
+
+    public static final Option<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+        Options.key("chunk-key.even-distribution.factor.lower-bound")
+            .doubleType()
+            .defaultValue(0.05d)
+            .withDescription(
+                "The lower bound of chunk key distribution factor. The 
distribution factor is used to determine whether the"
+                    + " table is evenly distribution or not."
+                    + " The table chunks would use evenly calculation 
optimization when the data distribution is even,"
+                    + " and the query for splitting would happen when it is 
uneven."
+                    + " The distribution factor could be calculated by 
(MAX(id) - MIN(id) + 1) / rowCount.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 16f12079c..3558f11bd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 
+import java.util.Map;
+
 @SuppressWarnings("MagicNumber")
 public class SourceOptions {
 
@@ -83,10 +85,16 @@ public class SourceOptions {
         .noDefaultValue()
         .withDescription("Optional offsets used in case of \"specific\" stop 
mode");
 
+    public static final Option<Map<String, String>> DEBEZIUM_PROPERTIES = 
Options.key("debezium")
+        .mapType()
+        .noDefaultValue()
+        .withDescription("Decides if the table options contains Debezium 
client properties that start with prefix 'debezium'.");
+
     public static final OptionRule.Builder BASE_RULE = OptionRule.builder()
         .optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
         .optional(INCREMENTAL_PARALLELISM)
         .optional(STARTUP_MODE, STOP_MODE)
+        .optional(DEBEZIUM_PROPERTIES)
         .conditional(STARTUP_MODE, StartupMode.TIMESTAMP, STARTUP_TIMESTAMP)
         .conditional(STARTUP_MODE, StartupMode.SPECIFIC, 
STARTUP_SPECIFIC_OFFSET_FILE, STARTUP_SPECIFIC_OFFSET_POS)
         .conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverter.java
new file mode 100644
index 000000000..859ef27cf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import org.apache.kafka.connect.data.Schema;
+
+import java.io.Serializable;
+
+/**
+ * Runtime converter that converts objects of Debezium into objects of 
internal data structures.
+ */
+@FunctionalInterface
+public interface DebeziumDeserializationConverter extends Serializable {
+    Object convert(Object dbzObj, Schema schema) throws Exception;
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverterFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverterFactory.java
new file mode 100644
index 000000000..c1dfe19a1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverterFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.Optional;
+
+/**
+ * Factory to create {@link DebeziumDeserializationConverter} according to 
{@link SeaTunnelDataType}. It's
+ * usually used to create a user-defined {@link 
DebeziumDeserializationConverter} which has a higher
+ * resolve order than default converter.
+ */
+public interface DebeziumDeserializationConverterFactory extends Serializable {
+
+    /**
+     * A user-defined converter factory which always fallback to default 
converters.
+     */
+    DebeziumDeserializationConverterFactory DEFAULT =
+        (logicalType, serverTimeZone) -> Optional.empty();
+
+    /**
+     * Returns an optional {@link DebeziumDeserializationConverter}. Returns 
{@link Optional#empty()}
+     * if fallback to default converter.
+     *
+     * @param type the SeaTunnel datatype to be converted from objects of 
Debezium
+     * @param serverTimeZone TimeZone used to convert data with timestamp type
+     */
+    Optional<DebeziumDeserializationConverter> createUserDefinedConverter(
+        SeaTunnelDataType<?> type, ZoneId serverTimeZone);
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/MetadataConverter.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/MetadataConverter.java
new file mode 100644
index 000000000..519aa4efe
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/MetadataConverter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+
+/**
+ * {@link SourceRecord} metadata info converter.
+ */
+@FunctionalInterface
+public interface MetadataConverter extends Serializable {
+    Object read(SourceRecord record);
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
new file mode 100644
index 000000000..529206727
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.row;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverter;
+import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
+import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
+import org.apache.seatunnel.connectors.cdc.debezium.utils.TemporalConversions;
+
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Optional;
+
+/**
+ * Deserialization schema from Debezium object to {@link SeaTunnelRow}
+ */
+public class SeaTunnelRowDebeziumDeserializationConverters implements 
Serializable {
+    private static final long serialVersionUID = -897499476343410567L;
+    protected final DebeziumDeserializationConverter[] physicalConverters;
+    protected final MetadataConverter[] metadataConverters;
+    protected final String[] fieldNames;
+
+    public SeaTunnelRowDebeziumDeserializationConverters(
+        SeaTunnelRowType physicalDataType,
+        MetadataConverter[] metadataConverters,
+        ZoneId serverTimeZone,
+        DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+        this.metadataConverters = metadataConverters;
+
+        this.physicalConverters =
+            Arrays.stream(physicalDataType.getFieldTypes())
+                .map(type -> createConverter(type, serverTimeZone, 
userDefinedConverterFactory))
+                .toArray(DebeziumDeserializationConverter[]::new);
+        this.fieldNames = physicalDataType.getFieldNames();
+    }
+
+    public SeaTunnelRow convert(SourceRecord record, Struct struct, Schema 
schema) throws Exception {
+        int arity = physicalConverters.length + metadataConverters.length;
+        SeaTunnelRow row = new SeaTunnelRow(arity);
+        // physical column
+        for (int i = 0; i < physicalConverters.length; i++) {
+            String fieldName = fieldNames[i];
+            Object fieldValue = struct.get(fieldName);
+            Field field = schema.field(fieldName);
+            if (field == null) {
+                row.setField(i, null);
+            } else {
+                Schema fieldSchema = field.schema();
+                Object convertedField = 
SeaTunnelRowDebeziumDeserializationConverters.convertField(physicalConverters[i],
 fieldValue, fieldSchema);
+                row.setField(i, convertedField);
+            }
+        }
+        // metadata column
+        for (int i = 0; i < metadataConverters.length; i++) {
+            row.setField(i + physicalConverters.length, 
metadataConverters[i].read(record));
+        }
+        return row;
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
-------------------------------------------------------------------------------------
+
+    /**
+     * Creates a runtime converter which is null safe.
+     */
+    private static DebeziumDeserializationConverter 
createConverter(SeaTunnelDataType<?> type,
+                                                             ZoneId 
serverTimeZone,
+                                                             
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+        return wrapIntoNullableConverter(createNotNullConverter(type, 
serverTimeZone, userDefinedConverterFactory));
+    }
+
+    // 
--------------------------------------------------------------------------------
+    // IMPORTANT! We use anonymous classes instead of lambdas for a reason 
here. It is
+    // necessary because the maven shade plugin cannot relocate classes in
+    // SerializedLambdas (MSHADE-260).
+    // 
--------------------------------------------------------------------------------
+
+    /**
+     * Creates a runtime converter which assuming input object is not null.
+     */
+    private static DebeziumDeserializationConverter 
createNotNullConverter(SeaTunnelDataType<?> type,
+                                                                    ZoneId 
serverTimeZone,
+                                                                    
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+
+        // user defined converter has a higher resolve order
+        Optional<DebeziumDeserializationConverter> converter =
+            userDefinedConverterFactory.createUserDefinedConverter(type, 
serverTimeZone);
+        if (converter.isPresent()) {
+            return converter.get();
+        }
+
+        // if no matched user defined converter, fallback to the default 
converter
+        switch (type.getSqlType()) {
+            case NULL:
+                return new DebeziumDeserializationConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                        return null;
+                    }
+                };
+            case BOOLEAN:
+                return wrapNumericConverter(convertToBoolean());
+            case TINYINT:
+                return wrapNumericConverter(convertToByte());
+            case SMALLINT:
+                return wrapNumericConverter(convertToShort());
+            case INT:
+                return wrapNumericConverter(convertToInt());
+            case BIGINT:
+                return wrapNumericConverter(convertToLong());
+            case DATE:
+                return convertToDate();
+            case TIME:
+                return convertToTime();
+            case TIMESTAMP:
+                return convertToTimestamp(serverTimeZone);
+            case FLOAT:
+                return wrapNumericConverter(convertToFloat());
+            case DOUBLE:
+                return wrapNumericConverter(convertToDouble());
+            case STRING:
+                return convertToString();
+            case BYTES:
+                return convertToBinary();
+            case DECIMAL:
+                return wrapNumericConverter(createDecimalConverter());
+            case ROW:
+                return createRowConverter((SeaTunnelRowType) type, 
serverTimeZone, userDefinedConverterFactory);
+            case ARRAY:
+            case MAP:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
type);
+        }
+    }
+
+    private static DebeziumDeserializationConverter convertToBoolean() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Boolean) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Byte) {
+                    return (byte) dbzObj != 0;
+                } else if (dbzObj instanceof Short) {
+                    return (short) dbzObj != 0;
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).shortValue() != 0;
+                } else {
+                    return Boolean.parseBoolean(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToByte() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Byte) {
+                    return dbzObj;
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).byteValue();
+                } else {
+                    return Byte.parseByte(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToShort() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Byte) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Short) {
+                    return dbzObj;
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).shortValue();
+                } else {
+                    return Short.parseShort(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToInt() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Long) {
+                    return ((Long) dbzObj).intValue();
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).intValue();
+                } else {
+                    return Integer.parseInt(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToLong() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Integer) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Long) {
+                    return dbzObj;
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).longValue();
+                } else {
+                    return Long.parseLong(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToDouble() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Double) {
+                    return dbzObj;
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).doubleValue();
+                } else {
+                    return Double.parseDouble(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToFloat() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Float) {
+                    return dbzObj;
+                } else if (dbzObj instanceof Double) {
+                    return ((Double) dbzObj).floatValue();
+                } else if (dbzObj instanceof BigDecimal) {
+                    return ((BigDecimal) dbzObj).floatValue();
+                } else {
+                    return Float.parseFloat(dbzObj.toString());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToDate() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return TemporalConversions.toLocalDate(dbzObj);
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToTime() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @SuppressWarnings("MagicNumber")
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case MicroTime.SCHEMA_NAME:
+                            return LocalTime.ofNanoOfDay((long) dbzObj * 
1000L);
+                        case NanoTime.SCHEMA_NAME:
+                            return LocalTime.ofNanoOfDay((long) dbzObj);
+                        default:
+                    }
+                } else if (dbzObj instanceof Integer) {
+                    return LocalTime.ofNanoOfDay((long) dbzObj * 1000_000L);
+                }
+                // get number of milliseconds of the day
+                return TemporalConversions.toLocalTime(dbzObj);
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToTimestamp(ZoneId 
serverTimeZone) {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @SuppressWarnings("MagicNumber")
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof Long) {
+                    switch (schema.name()) {
+                        case Timestamp.SCHEMA_NAME:
+                            return toLocalDateTime((Long) dbzObj, 0);
+                        case MicroTimestamp.SCHEMA_NAME:
+                            long micro = (long) dbzObj;
+                            return toLocalDateTime(micro / 1000, (int) (micro 
% 1000 * 1000));
+                        case NanoTimestamp.SCHEMA_NAME:
+                            long nano = (long) dbzObj;
+                            return toLocalDateTime(nano / 1000_000, (int) 
(nano % 1000_000));
+                        default:
+                    }
+                }
+                return TemporalConversions.toLocalDateTime(dbzObj, 
serverTimeZone);
+            }
+        };
+    }
+
+    @SuppressWarnings("MagicNumber")
+    public static LocalDateTime toLocalDateTime(long millisecond, int 
nanoOfMillisecond) {
+        // 86400000 = 24 * 60 * 60 * 1000
+        int date = (int) (millisecond / 86400000);
+        int time = (int) (millisecond % 86400000);
+        if (time < 0) {
+            --date;
+            time += 86400000;
+        }
+        long nanoOfDay = time * 1_000_000L + nanoOfMillisecond;
+        LocalDate localDate = LocalDate.ofEpochDay(date);
+        LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay);
+        return LocalDateTime.of(localDate, localTime);
+    }
+
+    private static DebeziumDeserializationConverter 
convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj instanceof String) {
+                    String str = (String) dbzObj;
+                    // TIMESTAMP type is encoded in string type
+                    Instant instant = Instant.parse(str);
+                    return LocalDateTime.ofInstant(instant, serverTimeZone);
+                }
+                throw new IllegalArgumentException(
+                    "Unable to convert to LocalDateTime from unexpected value 
'"
+                        + dbzObj
+                        + "' of type "
+                        + dbzObj.getClass().getName());
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToString() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) {
+                return dbzObj.toString();
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter convertToBinary() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                if (dbzObj instanceof byte[]) {
+                    return dbzObj;
+                } else if (dbzObj instanceof ByteBuffer) {
+                    ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+                    byte[] bytes = new byte[byteBuffer.remaining()];
+                    byteBuffer.get(bytes);
+                    return bytes;
+                } else {
+                    throw new UnsupportedOperationException(
+                        "Unsupported BYTES value type: " + 
dbzObj.getClass().getSimpleName());
+                }
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter createDecimalConverter() {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                BigDecimal bigDecimal;
+                if (dbzObj instanceof byte[]) {
+                    // decimal.handling.mode=precise
+                    bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+                } else if (dbzObj instanceof String) {
+                    // decimal.handling.mode=string
+                    bigDecimal = new BigDecimal((String) dbzObj);
+                } else if (dbzObj instanceof Double) {
+                    // decimal.handling.mode=double
+                    bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+                } else if (dbzObj instanceof BigDecimal) {
+                    bigDecimal = (BigDecimal) dbzObj;
+                } else {
+                    // fallback to string
+                    bigDecimal = new BigDecimal(dbzObj.toString());
+                }
+
+                return bigDecimal;
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter 
createRowConverter(SeaTunnelRowType rowType,
+                                                                ZoneId 
serverTimeZone,
+                                                                
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+        final DebeziumDeserializationConverter[] fieldConverters =
+            Arrays.stream(rowType.getFieldTypes())
+                .map(type -> createConverter(type, serverTimeZone, 
userDefinedConverterFactory))
+                .toArray(DebeziumDeserializationConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames();
+
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                Struct struct = (Struct) dbzObj;
+                int arity = fieldNames.length;
+                SeaTunnelRow row = new SeaTunnelRow(arity);
+                for (int i = 0; i < arity; i++) {
+                    String fieldName = fieldNames[i];
+                    Object fieldValue = struct.get(fieldName);
+                    Field field = schema.field(fieldName);
+                    if (field == null) {
+                        row.setField(i, null);
+                    } else {
+                        Schema fieldSchema = field.schema();
+                        Object convertedField = 
SeaTunnelRowDebeziumDeserializationConverters.convertField(fieldConverters[i], 
fieldValue, fieldSchema);
+                        row.setField(i, convertedField);
+                    }
+                }
+                return row;
+            }
+        };
+    }
+
+    private static Object convertField(
+        DebeziumDeserializationConverter fieldConverter, Object fieldValue, 
Schema fieldSchema)
+        throws Exception {
+        if (fieldValue == null) {
+            return null;
+        } else {
+            return fieldConverter.convert(fieldValue, fieldSchema);
+        }
+    }
+
+    private static DebeziumDeserializationConverter 
wrapIntoNullableConverter(DebeziumDeserializationConverter converter) {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                if (dbzObj == null) {
+                    return null;
+                }
+                return converter.convert(dbzObj, schema);
+            }
+        };
+    }
+
+    private static DebeziumDeserializationConverter 
wrapNumericConverter(DebeziumDeserializationConverter converter) {
+        return new DebeziumDeserializationConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Object dbzObj, Schema schema) throws 
Exception {
+                if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+                    SpecialValueDecimal decimal = 
VariableScaleDecimal.toLogical((Struct) dbzObj);
+                    return 
converter.convert(decimal.getDecimalValue().orElse(BigDecimal.ZERO), schema);
+                }
+                return converter.convert(dbzObj, schema);
+            }
+        };
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
new file mode 100644
index 000000000..8bcee31a5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.row;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
+import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+
+/**
+ * Deserialization schema from Debezium object to {@link SeaTunnelRow}.
+ */
+public final class SeaTunnelRowDebeziumDeserializeSchema
+    implements DebeziumDeserializationSchema<SeaTunnelRow> {
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * TypeInformation of the produced {@link SeaTunnelRow}. *
+     */
+    private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
+
+    /**
+     * Runtime converter that converts Kafka {@link SourceRecord}s into {@link 
SeaTunnelRow} consisted of
+     */
+    private final SeaTunnelRowDebeziumDeserializationConverters converters;
+
+    /**
+     * Validator to validate the row value.
+     */
+    private final ValueValidator validator;
+
+    /**
+     * Returns a builder to build {@link 
SeaTunnelRowDebeziumDeserializeSchema}.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    SeaTunnelRowDebeziumDeserializeSchema(
+        SeaTunnelRowType physicalDataType,
+        MetadataConverter[] metadataConverters,
+        SeaTunnelRowType resultType,
+        ValueValidator validator,
+        ZoneId serverTimeZone,
+        DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+        this.converters = new SeaTunnelRowDebeziumDeserializationConverters(
+            physicalDataType,
+            metadataConverters,
+            serverTimeZone,
+            userDefinedConverterFactory
+        );
+        this.resultTypeInfo = checkNotNull(resultType);
+        this.validator = checkNotNull(validator);
+    }
+
+    @Override
+    public void deserialize(SourceRecord record, Collector<SeaTunnelRow> 
collector) throws Exception {
+        Envelope.Operation operation = Envelope.operationFor(record);
+        Struct messageStruct = (Struct) record.value();
+        Schema valueSchema = record.valueSchema();
+
+        Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
+        // TODO: multi-table
+        String tableName = 
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+
+        if (operation == Envelope.Operation.CREATE || operation == 
Envelope.Operation.READ) {
+            SeaTunnelRow insert = extractAfterRow(converters, record, 
messageStruct, valueSchema);
+            insert.setRowKind(RowKind.INSERT);
+            validator.validate(insert, RowKind.INSERT);
+            collector.collect(insert);
+        } else if (operation == Envelope.Operation.DELETE) {
+            SeaTunnelRow delete = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
+            validator.validate(delete, RowKind.DELETE);
+            delete.setRowKind(RowKind.DELETE);
+            collector.collect(delete);
+        } else {
+            SeaTunnelRow before = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
+            validator.validate(before, RowKind.UPDATE_BEFORE);
+            before.setRowKind(RowKind.UPDATE_BEFORE);
+            collector.collect(before);
+
+            SeaTunnelRow after = extractAfterRow(converters, record, 
messageStruct, valueSchema);
+            validator.validate(after, RowKind.UPDATE_AFTER);
+            after.setRowKind(RowKind.UPDATE_AFTER);
+            collector.collect(after);
+        }
+    }
+
+    private SeaTunnelRow extractAfterRow(
+        SeaTunnelRowDebeziumDeserializationConverters runtimeConverter,
+        SourceRecord record,
+        Struct value,
+        Schema valueSchema) throws Exception {
+
+        Schema afterSchema = 
valueSchema.field(Envelope.FieldName.AFTER).schema();
+        Struct after = value.getStruct(Envelope.FieldName.AFTER);
+        return runtimeConverter.convert(record, after, afterSchema);
+    }
+
+    private SeaTunnelRow extractBeforeRow(
+        SeaTunnelRowDebeziumDeserializationConverters runtimeConverter,
+        SourceRecord record,
+        Struct value,
+        Schema valueSchema)
+        throws Exception {
+
+        Schema beforeSchema = 
valueSchema.field(Envelope.FieldName.BEFORE).schema();
+        Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+        return runtimeConverter.convert(record, before, beforeSchema);
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return resultTypeInfo;
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Builder
+    // 
-------------------------------------------------------------------------------------
+
+    /**
+     * Custom validator to validate the row value.
+     */
+    public interface ValueValidator extends Serializable {
+        void validate(SeaTunnelRow rowData, RowKind rowKind) throws Exception;
+    }
+
+    /**
+     * Builder of {@link SeaTunnelRowDebeziumDeserializeSchema}.
+     */
+    public static class Builder {
+        private SeaTunnelRowType physicalRowType;
+        private SeaTunnelRowType resultTypeInfo;
+        private MetadataConverter[] metadataConverters = new 
MetadataConverter[0];
+        private ValueValidator validator = (rowData, rowKind) -> {
+        };
+        private ZoneId serverTimeZone = ZoneId.of("UTC");
+        private DebeziumDeserializationConverterFactory 
userDefinedConverterFactory =
+            DebeziumDeserializationConverterFactory.DEFAULT;
+
+        public Builder setPhysicalRowType(SeaTunnelRowType physicalRowType) {
+            this.physicalRowType = physicalRowType;
+            return this;
+        }
+
+        public Builder setMetadataConverters(MetadataConverter[] 
metadataConverters) {
+            this.metadataConverters = metadataConverters;
+            return this;
+        }
+
+        public Builder setResultTypeInfo(SeaTunnelRowType resultTypeInfo) {
+            this.resultTypeInfo = resultTypeInfo;
+            return this;
+        }
+
+        public Builder setValueValidator(ValueValidator validator) {
+            this.validator = validator;
+            return this;
+        }
+
+        public Builder setServerTimeZone(ZoneId serverTimeZone) {
+            this.serverTimeZone = serverTimeZone;
+            return this;
+        }
+
+        public Builder setUserDefinedConverterFactory(
+            DebeziumDeserializationConverterFactory 
userDefinedConverterFactory) {
+            this.userDefinedConverterFactory = userDefinedConverterFactory;
+            return this;
+        }
+
+        public SeaTunnelRowDebeziumDeserializeSchema build() {
+            return new SeaTunnelRowDebeziumDeserializeSchema(
+                physicalRowType,
+                metadataConverters,
+                resultTypeInfo,
+                validator,
+                serverTimeZone,
+                userDefinedConverterFactory);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/utils/TemporalConversions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/utils/TemporalConversions.java
new file mode 100644
index 000000000..a05856f27
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/utils/TemporalConversions.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.concurrent.TimeUnit;
+
+/** Temporal conversion constants. */
+public final class TemporalConversions {
+
+    static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+    static final long MICROSECONDS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
+    static final long MICROSECONDS_PER_MILLISECOND = 
TimeUnit.MILLISECONDS.toMicros(1);
+    static final long NANOSECONDS_PER_MILLISECOND = 
TimeUnit.MILLISECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_MICROSECOND = 
TimeUnit.MICROSECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+    static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1);
+    static final long SECONDS_PER_DAY = TimeUnit.DAYS.toSeconds(1);
+    static final long MICROSECONDS_PER_DAY = TimeUnit.DAYS.toMicros(1);
+    static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
+
+    private TemporalConversions() {}
+
+    @SuppressWarnings("MagicNumber")
+    public static LocalDate toLocalDate(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof LocalDate) {
+            return (LocalDate) obj;
+        }
+        if (obj instanceof LocalDateTime) {
+            return ((LocalDateTime) obj).toLocalDate();
+        }
+        if (obj instanceof java.sql.Date) {
+            return ((java.sql.Date) obj).toLocalDate();
+        }
+        if (obj instanceof java.sql.Time) {
+            throw new IllegalArgumentException(
+                    "Unable to convert to LocalDate from a java.sql.Time value 
'" + obj + "'");
+        }
+        if (obj instanceof java.util.Date) {
+            java.util.Date date = (java.util.Date) obj;
+            return LocalDate.of(date.getYear() + 1900, date.getMonth() + 1, 
date.getDate());
+        }
+        if (obj instanceof Long) {
+            // Assume the value is the epoch day number
+            return LocalDate.ofEpochDay((Long) obj);
+        }
+        if (obj instanceof Integer) {
+            // Assume the value is the epoch day number
+            return LocalDate.ofEpochDay((Integer) obj);
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to LocalDate from unexpected value '"
+                        + obj
+                        + "' of type "
+                        + obj.getClass().getName());
+    }
+
+    public static LocalTime toLocalTime(Object obj) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof LocalTime) {
+            return (LocalTime) obj;
+        }
+        if (obj instanceof LocalDateTime) {
+            return ((LocalDateTime) obj).toLocalTime();
+        }
+        if (obj instanceof java.sql.Date) {
+            throw new IllegalArgumentException(
+                    "Unable to convert to LocalDate from a java.sql.Date value 
'" + obj + "'");
+        }
+        if (obj instanceof java.sql.Time) {
+            java.sql.Time time = (java.sql.Time) obj;
+            long millis = (int) (time.getTime() % MILLISECONDS_PER_SECOND);
+            int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+            return LocalTime.of(
+                    time.getHours(), time.getMinutes(), time.getSeconds(), 
nanosOfSecond);
+        }
+        if (obj instanceof java.sql.Timestamp) {
+            java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+            return LocalTime.of(
+                    timestamp.getHours(),
+                    timestamp.getMinutes(),
+                    timestamp.getSeconds(),
+                    timestamp.getNanos());
+        }
+        if (obj instanceof java.util.Date) {
+            java.util.Date date = (java.util.Date) obj;
+            long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+            int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+            return LocalTime.of(
+                    date.getHours(), date.getMinutes(), date.getSeconds(), 
nanosOfSecond);
+        }
+        if (obj instanceof Duration) {
+            Long value = ((Duration) obj).toNanos();
+            if (value >= 0 && value <= NANOSECONDS_PER_DAY) {
+                return LocalTime.ofNanoOfDay(value);
+            } else {
+                throw new IllegalArgumentException(
+                        "Time values must use number of milliseconds greater 
than 0 and less than 86400000000000");
+            }
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to LocalTime from unexpected value '"
+                        + obj
+                        + "' of type "
+                        + obj.getClass().getName());
+    }
+
+    @SuppressWarnings("MagicNumber")
+    public static LocalDateTime toLocalDateTime(Object obj, ZoneId 
serverTimeZone) {
+        if (obj == null) {
+            return null;
+        }
+        if (obj instanceof OffsetDateTime) {
+            return ((OffsetDateTime) obj).toLocalDateTime();
+        }
+        if (obj instanceof Instant) {
+            return ((Instant) obj).atOffset(ZoneOffset.UTC).toLocalDateTime();
+        }
+        if (obj instanceof LocalDateTime) {
+            return (LocalDateTime) obj;
+        }
+        if (obj instanceof LocalDate) {
+            LocalDate date = (LocalDate) obj;
+            return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+        }
+        if (obj instanceof LocalTime) {
+            LocalTime time = (LocalTime) obj;
+            return LocalDateTime.of(EPOCH, time);
+        }
+        if (obj instanceof java.sql.Date) {
+            java.sql.Date sqlDate = (java.sql.Date) obj;
+            LocalDate date = sqlDate.toLocalDate();
+            return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+        }
+        if (obj instanceof java.sql.Time) {
+            LocalTime localTime = toLocalTime(obj);
+            return LocalDateTime.of(EPOCH, localTime);
+        }
+        if (obj instanceof java.sql.Timestamp) {
+            java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+            return LocalDateTime.of(
+                    timestamp.getYear() + 1900,
+                    timestamp.getMonth() + 1,
+                    timestamp.getDate(),
+                    timestamp.getHours(),
+                    timestamp.getMinutes(),
+                    timestamp.getSeconds(),
+                    timestamp.getNanos());
+        }
+        if (obj instanceof java.util.Date) {
+            java.util.Date date = (java.util.Date) obj;
+            long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+            if (millis < 0) {
+                millis = MILLISECONDS_PER_SECOND + millis;
+            }
+            int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+            return LocalDateTime.of(
+                    date.getYear() + 1900,
+                    date.getMonth() + 1,
+                    date.getDate(),
+                    date.getHours(),
+                    date.getMinutes(),
+                    date.getSeconds(),
+                    nanosOfSecond);
+        }
+        if (obj instanceof String) {
+            String str = (String) obj;
+            // TIMESTAMP type is encoded in string type
+            Instant instant = Instant.parse(str);
+            return LocalDateTime.ofInstant(instant, serverTimeZone);
+        }
+        throw new IllegalArgumentException(
+                "Unable to convert to LocalDateTime from unexpected value '"
+                        + obj
+                        + "' of type "
+                        + obj.getClass().getName());
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
index e5ec3a4b4..6f83f8107 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
@@ -39,6 +39,11 @@
             <groupId>io.debezium</groupId>
             <artifactId>debezium-connector-mysql</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-jdbc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <dependencyManagement>
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceOptions.java
deleted file mode 100644
index 7801cbe79..000000000
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class MySqlSourceOptions {
-    public static final Option<String> SERVER_ID =
-        Options.key("server-id")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("A numeric ID or a numeric ID range of this 
database client, "
-                + "The numeric ID syntax is like '5400', the numeric ID range 
syntax "
-                + "is like '5400-5408', The numeric ID range syntax is 
recommended when "
-                + "'scan.incremental.snapshot.enabled' enabled. Every ID must 
be unique across all "
-                + "currently-running database processes in the MySQL cluster. 
This connector"
-                + " joins the MySQL  cluster as another server (with this 
unique ID) "
-                + "so it can read the binlog. By default, a random number is 
generated between"
-                + " 5400 and 6400, though we recommend setting an explicit 
value.");
-}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 7f01380fc..be3cfbc4f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -19,19 +19,27 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
 import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog;
 
 import com.google.auto.service.AutoService;
 
+import java.time.ZoneId;
+
 @AutoService(SeaTunnelSource.class)
 public class MySqlIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig> {
     @Override
@@ -42,14 +50,27 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
     @Override
     public SourceConfig.Factory<JdbcSourceConfig> 
createSourceConfigFactory(ReadonlyConfig config) {
         MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
-        configFactory.serverId(config.get(MySqlSourceOptions.SERVER_ID));
+        configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
+        configFactory.fromReadonlyConfig(readonlyConfig);
         return configFactory;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public DebeziumDeserializationSchema<T> 
createDebeziumDeserializationSchema(ReadonlyConfig config) {
-        // TODO: seatunnel row
-        return null;
+        JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
+        String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
+        // TODO: support multi-table
+        // TODO: support metadata keys
+        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", 
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), 
jdbcSourceConfig.getPassword(), baseUrl);
+        CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), 
jdbcSourceConfig.getTableList().get(0)));
+        SeaTunnelRowType physicalRowType = 
table.getTableSchema().toPhysicalRowDataType();
+        String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
+        return (DebeziumDeserializationSchema<T>) 
SeaTunnelRowDebeziumDeserializeSchema.builder()
+            .setPhysicalRowType(physicalRowType)
+            .setResultTypeInfo(physicalRowType)
+            .setServerTimeZone(ZoneId.of(zoneId))
+            .build();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
new file mode 100644
index 000000000..93ea1e609
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class JdbcCatalogOptions {
+    public static final Option<String> BASE_URL = Options.key("base-url")
+        .stringType()
+        .noDefaultValue()
+        .withDescription("URL has to be without database, like 
\"jdbc:mysql://localhost:5432/\" or" +
+            "\"jdbc:mysql://localhost:5432\" rather than 
\"jdbc:mysql://localhost:5432/db\"");
+}

Reply via email to