This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ff44db116 [feature][connector][cdc] add
SeaTunnelRowDebeziumDeserializeSchema (#3499)
ff44db116 is described below
commit ff44db116eef6e099a3031dbd44d81460b50e5fe
Author: Zongwen Li <[email protected]>
AuthorDate: Wed Nov 23 16:59:47 2022 +0800
[feature][connector][cdc] add SeaTunnelRowDebeziumDeserializeSchema (#3499)
* [feature][connector][cdc] add SeaTunnel row Deserialization Converters
* [feature][connector][mysql-cdc] add createDebeziumDeserializationSchema
* [chore] fix checkstyle
* [feature] fill config factory
---
.../seatunnel/api/table/catalog/TableSchema.java | 13 +
.../cdc/base/config/JdbcSourceConfigFactory.java | 22 +
.../cdc/base/option/JdbcSourceOptions.java | 22 +
.../connectors/cdc/base/option/SourceOptions.java | 8 +
.../debezium/DebeziumDeserializationConverter.java | 30 ++
.../DebeziumDeserializationConverterFactory.java | 48 ++
.../connectors/cdc/debezium/MetadataConverter.java | 30 ++
...TunnelRowDebeziumDeserializationConverters.java | 538 +++++++++++++++++++++
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 212 ++++++++
.../cdc/debezium/utils/TemporalConversions.java | 205 ++++++++
.../connector-cdc/connector-cdc-mysql/pom.xml | 5 +
.../cdc/mysql/config/MySqlSourceOptions.java | 36 --
.../cdc/mysql/source/MySqlIncrementalSource.java | 29 +-
.../seatunnel/jdbc/catalog/JdbcCatalogOptions.java | 29 ++
14 files changed, 1187 insertions(+), 40 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
index 61d99ef52..0d076943e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.catalog;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import java.io.Serializable;
import java.util.ArrayList;
@@ -49,6 +50,18 @@ public final class TableSchema implements Serializable {
return columns;
}
+ public SeaTunnelRowType toPhysicalRowDataType() {
+ SeaTunnelDataType<?>[] fieldTypes = columns.stream()
+ .filter(Column::isPhysical)
+ .map(Column::getDataType)
+ .toArray(SeaTunnelDataType[]::new);
+ String[] fields = columns.stream()
+ .filter(Column::isPhysical)
+ .map(Column::getName)
+ .toArray(String[]::new);
+ return new SeaTunnelRowType(fields, fieldTypes);
+ }
+
public static final class Builder {
private final List<Column> columns = new ArrayList<>();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
index ccbb6de23..0f98969ce 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java
@@ -17,11 +17,13 @@
package org.apache.seatunnel.connectors.cdc.base.config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -177,6 +179,26 @@ public abstract class JdbcSourceConfigFactory implements
SourceConfig.Factory<Jd
return this;
}
+ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
+ this.port = config.get(JdbcSourceOptions.PORT);
+ this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
+ this.password = config.get(JdbcSourceOptions.PASSWORD);
+ // TODO: support multi-table
+ this.databaseList =
Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
+ this.tableList =
Collections.singletonList(config.get(JdbcSourceOptions.TABLE_NAME));
+ this.distributionFactorUpper =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
+ this.distributionFactorLower =
config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
+ this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
+ this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
+ this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
+ this.connectTimeout = config.get(JdbcSourceOptions.CONNECT_TIMEOUT);
+ this.connectMaxRetries =
config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
+ this.connectionPoolSize =
config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
+ this.dbzProperties = new Properties();
+ config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES).ifPresent(map ->
dbzProperties.putAll(map));
+ return this;
+ }
+
@Override
public abstract JdbcSourceConfig create(int subtask);
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
index 3052b40fb..942497051 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java
@@ -103,4 +103,26 @@ public class JdbcSourceOptions extends SourceOptions {
.defaultValue(3)
.withDescription(
"The max retry times that the connector should
retry to build database server connection.");
+
+ public static final Option<Double>
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+ Options.key("chunk-key.even-distribution.factor.upper-bound")
+ .doubleType()
+ .defaultValue(1000.0d)
+ .withDescription(
+ "The upper bound of chunk key distribution factor. The
distribution factor is used to determine whether the"
+ + " table is evenly distribution or not."
+ + " The table chunks would use evenly calculation
optimization when the data distribution is even,"
+ + " and the query for splitting would happen when it is
uneven."
+ + " The distribution factor could be calculated by
(MAX(id) - MIN(id) + 1) / rowCount.");
+
+ public static final Option<Double>
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+ Options.key("chunk-key.even-distribution.factor.lower-bound")
+ .doubleType()
+ .defaultValue(0.05d)
+ .withDescription(
+ "The lower bound of chunk key distribution factor. The
distribution factor is used to determine whether the"
+ + " table is evenly distribution or not."
+ + " The table chunks would use evenly calculation
optimization when the data distribution is even,"
+ + " and the query for splitting would happen when it is
uneven."
+ + " The distribution factor could be calculated by
(MAX(id) - MIN(id) + 1) / rowCount.");
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
index 16f12079c..3558f11bd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import java.util.Map;
+
@SuppressWarnings("MagicNumber")
public class SourceOptions {
@@ -83,10 +85,16 @@ public class SourceOptions {
.noDefaultValue()
.withDescription("Optional offsets used in case of \"specific\" stop
mode");
+ public static final Option<Map<String, String>> DEBEZIUM_PROPERTIES =
Options.key("debezium")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("Decides if the table options contains Debezium
client properties that start with prefix 'debezium'.");
+
public static final OptionRule.Builder BASE_RULE = OptionRule.builder()
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
.optional(INCREMENTAL_PARALLELISM)
.optional(STARTUP_MODE, STOP_MODE)
+ .optional(DEBEZIUM_PROPERTIES)
.conditional(STARTUP_MODE, StartupMode.TIMESTAMP, STARTUP_TIMESTAMP)
.conditional(STARTUP_MODE, StartupMode.SPECIFIC,
STARTUP_SPECIFIC_OFFSET_FILE, STARTUP_SPECIFIC_OFFSET_POS)
.conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverter.java
new file mode 100644
index 000000000..859ef27cf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import org.apache.kafka.connect.data.Schema;
+
+import java.io.Serializable;
+
+/**
+ * Runtime converter that converts objects of Debezium into objects of
internal data structures.
+ */
+@FunctionalInterface
+public interface DebeziumDeserializationConverter extends Serializable {
+ Object convert(Object dbzObj, Schema schema) throws Exception;
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverterFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverterFactory.java
new file mode 100644
index 000000000..c1dfe19a1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/DebeziumDeserializationConverterFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.Optional;
+
+/**
+ * Factory to create {@link DebeziumDeserializationConverter} according to
{@link SeaTunnelDataType}. It's
+ * usually used to create a user-defined {@link
DebeziumDeserializationConverter} which has a higher
+ * resolve order than default converter.
+ */
+public interface DebeziumDeserializationConverterFactory extends Serializable {
+
+ /**
+ * A user-defined converter factory which always fallback to default
converters.
+ */
+ DebeziumDeserializationConverterFactory DEFAULT =
+ (logicalType, serverTimeZone) -> Optional.empty();
+
+ /**
+ * Returns an optional {@link DebeziumDeserializationConverter}. Returns
{@link Optional#empty()}
+ * if fallback to default converter.
+ *
+ * @param type the SeaTunnel datatype to be converted from objects of
Debezium
+ * @param serverTimeZone TimeZone used to convert data with timestamp type
+ */
+ Optional<DebeziumDeserializationConverter> createUserDefinedConverter(
+ SeaTunnelDataType<?> type, ZoneId serverTimeZone);
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/MetadataConverter.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/MetadataConverter.java
new file mode 100644
index 000000000..519aa4efe
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/MetadataConverter.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+
+/**
+ * {@link SourceRecord} metadata info converter.
+ */
+@FunctionalInterface
+public interface MetadataConverter extends Serializable {
+ Object read(SourceRecord record);
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
new file mode 100644
index 000000000..529206727
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.row;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverter;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
+import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
+import org.apache.seatunnel.connectors.cdc.debezium.utils.TemporalConversions;
+
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Optional;
+
+/**
+ * Deserialization schema from Debezium object to {@link SeaTunnelRow}
+ */
+public class SeaTunnelRowDebeziumDeserializationConverters implements
Serializable {
+ private static final long serialVersionUID = -897499476343410567L;
+ protected final DebeziumDeserializationConverter[] physicalConverters;
+ protected final MetadataConverter[] metadataConverters;
+ protected final String[] fieldNames;
+
+ public SeaTunnelRowDebeziumDeserializationConverters(
+ SeaTunnelRowType physicalDataType,
+ MetadataConverter[] metadataConverters,
+ ZoneId serverTimeZone,
+ DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+ this.metadataConverters = metadataConverters;
+
+ this.physicalConverters =
+ Arrays.stream(physicalDataType.getFieldTypes())
+ .map(type -> createConverter(type, serverTimeZone,
userDefinedConverterFactory))
+ .toArray(DebeziumDeserializationConverter[]::new);
+ this.fieldNames = physicalDataType.getFieldNames();
+ }
+
+ public SeaTunnelRow convert(SourceRecord record, Struct struct, Schema
schema) throws Exception {
+ int arity = physicalConverters.length + metadataConverters.length;
+ SeaTunnelRow row = new SeaTunnelRow(arity);
+ // physical column
+ for (int i = 0; i < physicalConverters.length; i++) {
+ String fieldName = fieldNames[i];
+ Object fieldValue = struct.get(fieldName);
+ Field field = schema.field(fieldName);
+ if (field == null) {
+ row.setField(i, null);
+ } else {
+ Schema fieldSchema = field.schema();
+ Object convertedField =
SeaTunnelRowDebeziumDeserializationConverters.convertField(physicalConverters[i],
fieldValue, fieldSchema);
+ row.setField(i, convertedField);
+ }
+ }
+ // metadata column
+ for (int i = 0; i < metadataConverters.length; i++) {
+ row.setField(i + physicalConverters.length,
metadataConverters[i].read(record));
+ }
+ return row;
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Runtime Converters
+ //
-------------------------------------------------------------------------------------
+
+ /**
+ * Creates a runtime converter which is null safe.
+ */
+ private static DebeziumDeserializationConverter
createConverter(SeaTunnelDataType<?> type,
+ ZoneId
serverTimeZone,
+
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+ return wrapIntoNullableConverter(createNotNullConverter(type,
serverTimeZone, userDefinedConverterFactory));
+ }
+
+ //
--------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason
here. It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260).
+ //
--------------------------------------------------------------------------------
+
+ /**
+ * Creates a runtime converter which assuming input object is not null.
+ */
+ private static DebeziumDeserializationConverter
createNotNullConverter(SeaTunnelDataType<?> type,
+ ZoneId
serverTimeZone,
+
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+
+ // user defined converter has a higher resolve order
+ Optional<DebeziumDeserializationConverter> converter =
+ userDefinedConverterFactory.createUserDefinedConverter(type,
serverTimeZone);
+ if (converter.isPresent()) {
+ return converter.get();
+ }
+
+ // if no matched user defined converter, fallback to the default
converter
+ switch (type.getSqlType()) {
+ case NULL:
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ return null;
+ }
+ };
+ case BOOLEAN:
+ return wrapNumericConverter(convertToBoolean());
+ case TINYINT:
+ return wrapNumericConverter(convertToByte());
+ case SMALLINT:
+ return wrapNumericConverter(convertToShort());
+ case INT:
+ return wrapNumericConverter(convertToInt());
+ case BIGINT:
+ return wrapNumericConverter(convertToLong());
+ case DATE:
+ return convertToDate();
+ case TIME:
+ return convertToTime();
+ case TIMESTAMP:
+ return convertToTimestamp(serverTimeZone);
+ case FLOAT:
+ return wrapNumericConverter(convertToFloat());
+ case DOUBLE:
+ return wrapNumericConverter(convertToDouble());
+ case STRING:
+ return convertToString();
+ case BYTES:
+ return convertToBinary();
+ case DECIMAL:
+ return wrapNumericConverter(createDecimalConverter());
+ case ROW:
+ return createRowConverter((SeaTunnelRowType) type,
serverTimeZone, userDefinedConverterFactory);
+ case ARRAY:
+ case MAP:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
type);
+ }
+ }
+
+ private static DebeziumDeserializationConverter convertToBoolean() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Boolean) {
+ return dbzObj;
+ } else if (dbzObj instanceof Byte) {
+ return (byte) dbzObj != 0;
+ } else if (dbzObj instanceof Short) {
+ return (short) dbzObj != 0;
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).shortValue() != 0;
+ } else {
+ return Boolean.parseBoolean(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToByte() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Byte) {
+ return dbzObj;
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).byteValue();
+ } else {
+ return Byte.parseByte(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToShort() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Byte) {
+ return dbzObj;
+ } else if (dbzObj instanceof Short) {
+ return dbzObj;
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).shortValue();
+ } else {
+ return Short.parseShort(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToInt() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return dbzObj;
+ } else if (dbzObj instanceof Long) {
+ return ((Long) dbzObj).intValue();
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).intValue();
+ } else {
+ return Integer.parseInt(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToLong() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return dbzObj;
+ } else if (dbzObj instanceof Long) {
+ return dbzObj;
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).longValue();
+ } else {
+ return Long.parseLong(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToDouble() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return dbzObj;
+ } else if (dbzObj instanceof Double) {
+ return dbzObj;
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).doubleValue();
+ } else {
+ return Double.parseDouble(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToFloat() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return dbzObj;
+ } else if (dbzObj instanceof Double) {
+ return ((Double) dbzObj).floatValue();
+ } else if (dbzObj instanceof BigDecimal) {
+ return ((BigDecimal) dbzObj).floatValue();
+ } else {
+ return Float.parseFloat(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToDate() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return TemporalConversions.toLocalDate(dbzObj);
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToTime() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("MagicNumber")
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case MicroTime.SCHEMA_NAME:
+ return LocalTime.ofNanoOfDay((long) dbzObj *
1000L);
+ case NanoTime.SCHEMA_NAME:
+ return LocalTime.ofNanoOfDay((long) dbzObj);
+ default:
+ }
+ } else if (dbzObj instanceof Integer) {
+ return LocalTime.ofNanoOfDay((long) dbzObj * 1000_000L);
+ }
+ // get number of milliseconds of the day
+ return TemporalConversions.toLocalTime(dbzObj);
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToTimestamp(ZoneId
serverTimeZone) {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressWarnings("MagicNumber")
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case Timestamp.SCHEMA_NAME:
+ return toLocalDateTime((Long) dbzObj, 0);
+ case MicroTimestamp.SCHEMA_NAME:
+ long micro = (long) dbzObj;
+ return toLocalDateTime(micro / 1000, (int) (micro
% 1000 * 1000));
+ case NanoTimestamp.SCHEMA_NAME:
+ long nano = (long) dbzObj;
+ return toLocalDateTime(nano / 1000_000, (int)
(nano % 1000_000));
+ default:
+ }
+ }
+ return TemporalConversions.toLocalDateTime(dbzObj,
serverTimeZone);
+ }
+ };
+ }
+
+ @SuppressWarnings("MagicNumber")
+ public static LocalDateTime toLocalDateTime(long millisecond, int
nanoOfMillisecond) {
+ // 86400000 = 24 * 60 * 60 * 1000
+ int date = (int) (millisecond / 86400000);
+ int time = (int) (millisecond % 86400000);
+ if (time < 0) {
+ --date;
+ time += 86400000;
+ }
+ long nanoOfDay = time * 1_000_000L + nanoOfMillisecond;
+ LocalDate localDate = LocalDate.ofEpochDay(date);
+ LocalTime localTime = LocalTime.ofNanoOfDay(nanoOfDay);
+ return LocalDateTime.of(localDate, localTime);
+ }
+
+ private static DebeziumDeserializationConverter
convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof String) {
+ String str = (String) dbzObj;
+ // TIMESTAMP type is encoded in string type
+ Instant instant = Instant.parse(str);
+ return LocalDateTime.ofInstant(instant, serverTimeZone);
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDateTime from unexpected value
'"
+ + dbzObj
+ + "' of type "
+ + dbzObj.getClass().getName());
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToString() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return dbzObj.toString();
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter convertToBinary() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ if (dbzObj instanceof byte[]) {
+ return dbzObj;
+ } else if (dbzObj instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported BYTES value type: " +
dbzObj.getClass().getSimpleName());
+ }
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter createDecimalConverter() {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ BigDecimal bigDecimal;
+ if (dbzObj instanceof byte[]) {
+ // decimal.handling.mode=precise
+ bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+ } else if (dbzObj instanceof String) {
+ // decimal.handling.mode=string
+ bigDecimal = new BigDecimal((String) dbzObj);
+ } else if (dbzObj instanceof Double) {
+ // decimal.handling.mode=double
+ bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+ } else if (dbzObj instanceof BigDecimal) {
+ bigDecimal = (BigDecimal) dbzObj;
+ } else {
+ // fallback to string
+ bigDecimal = new BigDecimal(dbzObj.toString());
+ }
+
+ return bigDecimal;
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter
createRowConverter(SeaTunnelRowType rowType,
+ ZoneId
serverTimeZone,
+
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+ final DebeziumDeserializationConverter[] fieldConverters =
+ Arrays.stream(rowType.getFieldTypes())
+ .map(type -> createConverter(type, serverTimeZone,
userDefinedConverterFactory))
+ .toArray(DebeziumDeserializationConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames();
+
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ Struct struct = (Struct) dbzObj;
+ int arity = fieldNames.length;
+ SeaTunnelRow row = new SeaTunnelRow(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ Object fieldValue = struct.get(fieldName);
+ Field field = schema.field(fieldName);
+ if (field == null) {
+ row.setField(i, null);
+ } else {
+ Schema fieldSchema = field.schema();
+ Object convertedField =
SeaTunnelRowDebeziumDeserializationConverters.convertField(fieldConverters[i],
fieldValue, fieldSchema);
+ row.setField(i, convertedField);
+ }
+ }
+ return row;
+ }
+ };
+ }
+
+ private static Object convertField(
+ DebeziumDeserializationConverter fieldConverter, Object fieldValue,
Schema fieldSchema)
+ throws Exception {
+ if (fieldValue == null) {
+ return null;
+ } else {
+ return fieldConverter.convert(fieldValue, fieldSchema);
+ }
+ }
+
+ private static DebeziumDeserializationConverter
wrapIntoNullableConverter(DebeziumDeserializationConverter converter) {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ if (dbzObj == null) {
+ return null;
+ }
+ return converter.convert(dbzObj, schema);
+ }
+ };
+ }
+
+ private static DebeziumDeserializationConverter
wrapNumericConverter(DebeziumDeserializationConverter converter) {
+ return new DebeziumDeserializationConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+ SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
+ return
converter.convert(decimal.getDecimalValue().orElse(BigDecimal.ZERO), schema);
+ }
+ return converter.convert(dbzObj, schema);
+ }
+ };
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
new file mode 100644
index 000000000..8bcee31a5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.row;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
+import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.MetadataConverter;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+
+/**
+ * Deserialization schema from Debezium object to {@link SeaTunnelRow}.
+ */
+public final class SeaTunnelRowDebeziumDeserializeSchema
+ implements DebeziumDeserializationSchema<SeaTunnelRow> {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * TypeInformation of the produced {@link SeaTunnelRow}. *
+ */
+ private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
+
+ /**
+ * Runtime converter that converts Kafka {@link SourceRecord}s into {@link
SeaTunnelRow} consisted of
+ */
+ private final SeaTunnelRowDebeziumDeserializationConverters converters;
+
+ /**
+ * Validator to validate the row value.
+ */
+ private final ValueValidator validator;
+
+ /**
+ * Returns a builder to build {@link
SeaTunnelRowDebeziumDeserializeSchema}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ SeaTunnelRowDebeziumDeserializeSchema(
+ SeaTunnelRowType physicalDataType,
+ MetadataConverter[] metadataConverters,
+ SeaTunnelRowType resultType,
+ ValueValidator validator,
+ ZoneId serverTimeZone,
+ DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
+ this.converters = new SeaTunnelRowDebeziumDeserializationConverters(
+ physicalDataType,
+ metadataConverters,
+ serverTimeZone,
+ userDefinedConverterFactory
+ );
+ this.resultTypeInfo = checkNotNull(resultType);
+ this.validator = checkNotNull(validator);
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<SeaTunnelRow>
collector) throws Exception {
+ Envelope.Operation operation = Envelope.operationFor(record);
+ Struct messageStruct = (Struct) record.value();
+ Schema valueSchema = record.valueSchema();
+
+ Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
+ // TODO: multi-table
+ String tableName =
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+
+ if (operation == Envelope.Operation.CREATE || operation ==
Envelope.Operation.READ) {
+ SeaTunnelRow insert = extractAfterRow(converters, record,
messageStruct, valueSchema);
+ insert.setRowKind(RowKind.INSERT);
+ validator.validate(insert, RowKind.INSERT);
+ collector.collect(insert);
+ } else if (operation == Envelope.Operation.DELETE) {
+ SeaTunnelRow delete = extractBeforeRow(converters, record,
messageStruct, valueSchema);
+ validator.validate(delete, RowKind.DELETE);
+ delete.setRowKind(RowKind.DELETE);
+ collector.collect(delete);
+ } else {
+ SeaTunnelRow before = extractBeforeRow(converters, record,
messageStruct, valueSchema);
+ validator.validate(before, RowKind.UPDATE_BEFORE);
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ collector.collect(before);
+
+ SeaTunnelRow after = extractAfterRow(converters, record,
messageStruct, valueSchema);
+ validator.validate(after, RowKind.UPDATE_AFTER);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ collector.collect(after);
+ }
+ }
+
+ private SeaTunnelRow extractAfterRow(
+ SeaTunnelRowDebeziumDeserializationConverters runtimeConverter,
+ SourceRecord record,
+ Struct value,
+ Schema valueSchema) throws Exception {
+
+ Schema afterSchema =
valueSchema.field(Envelope.FieldName.AFTER).schema();
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ return runtimeConverter.convert(record, after, afterSchema);
+ }
+
+ private SeaTunnelRow extractBeforeRow(
+ SeaTunnelRowDebeziumDeserializationConverters runtimeConverter,
+ SourceRecord record,
+ Struct value,
+ Schema valueSchema)
+ throws Exception {
+
+ Schema beforeSchema =
valueSchema.field(Envelope.FieldName.BEFORE).schema();
+ Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+ return runtimeConverter.convert(record, before, beforeSchema);
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return resultTypeInfo;
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Builder
+ //
-------------------------------------------------------------------------------------
+
+ /**
+ * Custom validator to validate the row value.
+ */
+ public interface ValueValidator extends Serializable {
+ void validate(SeaTunnelRow rowData, RowKind rowKind) throws Exception;
+ }
+
+ /**
+ * Builder of {@link SeaTunnelRowDebeziumDeserializeSchema}.
+ */
+ public static class Builder {
+ private SeaTunnelRowType physicalRowType;
+ private SeaTunnelRowType resultTypeInfo;
+ private MetadataConverter[] metadataConverters = new
MetadataConverter[0];
+ private ValueValidator validator = (rowData, rowKind) -> {
+ };
+ private ZoneId serverTimeZone = ZoneId.of("UTC");
+ private DebeziumDeserializationConverterFactory
userDefinedConverterFactory =
+ DebeziumDeserializationConverterFactory.DEFAULT;
+
+ public Builder setPhysicalRowType(SeaTunnelRowType physicalRowType) {
+ this.physicalRowType = physicalRowType;
+ return this;
+ }
+
+ public Builder setMetadataConverters(MetadataConverter[]
metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ return this;
+ }
+
+ public Builder setResultTypeInfo(SeaTunnelRowType resultTypeInfo) {
+ this.resultTypeInfo = resultTypeInfo;
+ return this;
+ }
+
+ public Builder setValueValidator(ValueValidator validator) {
+ this.validator = validator;
+ return this;
+ }
+
+ public Builder setServerTimeZone(ZoneId serverTimeZone) {
+ this.serverTimeZone = serverTimeZone;
+ return this;
+ }
+
+ public Builder setUserDefinedConverterFactory(
+ DebeziumDeserializationConverterFactory
userDefinedConverterFactory) {
+ this.userDefinedConverterFactory = userDefinedConverterFactory;
+ return this;
+ }
+
+ public SeaTunnelRowDebeziumDeserializeSchema build() {
+ return new SeaTunnelRowDebeziumDeserializeSchema(
+ physicalRowType,
+ metadataConverters,
+ resultTypeInfo,
+ validator,
+ serverTimeZone,
+ userDefinedConverterFactory);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/utils/TemporalConversions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/utils/TemporalConversions.java
new file mode 100644
index 000000000..a05856f27
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/utils/TemporalConversions.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.cdc.debezium.utils;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.concurrent.TimeUnit;
+
+/** Temporal conversion constants. */
+public final class TemporalConversions {
+
+ static final long MILLISECONDS_PER_SECOND = TimeUnit.SECONDS.toMillis(1);
+ static final long MICROSECONDS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
+ static final long MICROSECONDS_PER_MILLISECOND =
TimeUnit.MILLISECONDS.toMicros(1);
+ static final long NANOSECONDS_PER_MILLISECOND =
TimeUnit.MILLISECONDS.toNanos(1);
+ static final long NANOSECONDS_PER_MICROSECOND =
TimeUnit.MICROSECONDS.toNanos(1);
+ static final long NANOSECONDS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
+ static final long NANOSECONDS_PER_DAY = TimeUnit.DAYS.toNanos(1);
+ static final long SECONDS_PER_DAY = TimeUnit.DAYS.toSeconds(1);
+ static final long MICROSECONDS_PER_DAY = TimeUnit.DAYS.toMicros(1);
+ static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
+
+ private TemporalConversions() {}
+
+ @SuppressWarnings("MagicNumber")
+ public static LocalDate toLocalDate(Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof LocalDate) {
+ return (LocalDate) obj;
+ }
+ if (obj instanceof LocalDateTime) {
+ return ((LocalDateTime) obj).toLocalDate();
+ }
+ if (obj instanceof java.sql.Date) {
+ return ((java.sql.Date) obj).toLocalDate();
+ }
+ if (obj instanceof java.sql.Time) {
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDate from a java.sql.Time value
'" + obj + "'");
+ }
+ if (obj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) obj;
+ return LocalDate.of(date.getYear() + 1900, date.getMonth() + 1,
date.getDate());
+ }
+ if (obj instanceof Long) {
+ // Assume the value is the epoch day number
+ return LocalDate.ofEpochDay((Long) obj);
+ }
+ if (obj instanceof Integer) {
+ // Assume the value is the epoch day number
+ return LocalDate.ofEpochDay((Integer) obj);
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDate from unexpected value '"
+ + obj
+ + "' of type "
+ + obj.getClass().getName());
+ }
+
+ public static LocalTime toLocalTime(Object obj) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof LocalTime) {
+ return (LocalTime) obj;
+ }
+ if (obj instanceof LocalDateTime) {
+ return ((LocalDateTime) obj).toLocalTime();
+ }
+ if (obj instanceof java.sql.Date) {
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDate from a java.sql.Date value
'" + obj + "'");
+ }
+ if (obj instanceof java.sql.Time) {
+ java.sql.Time time = (java.sql.Time) obj;
+ long millis = (int) (time.getTime() % MILLISECONDS_PER_SECOND);
+ int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+ return LocalTime.of(
+ time.getHours(), time.getMinutes(), time.getSeconds(),
nanosOfSecond);
+ }
+ if (obj instanceof java.sql.Timestamp) {
+ java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+ return LocalTime.of(
+ timestamp.getHours(),
+ timestamp.getMinutes(),
+ timestamp.getSeconds(),
+ timestamp.getNanos());
+ }
+ if (obj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) obj;
+ long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+ int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+ return LocalTime.of(
+ date.getHours(), date.getMinutes(), date.getSeconds(),
nanosOfSecond);
+ }
+ if (obj instanceof Duration) {
+ Long value = ((Duration) obj).toNanos();
+ if (value >= 0 && value <= NANOSECONDS_PER_DAY) {
+ return LocalTime.ofNanoOfDay(value);
+ } else {
+ throw new IllegalArgumentException(
+ "Time values must use number of milliseconds greater
than 0 and less than 86400000000000");
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalTime from unexpected value '"
+ + obj
+ + "' of type "
+ + obj.getClass().getName());
+ }
+
+ @SuppressWarnings("MagicNumber")
+ public static LocalDateTime toLocalDateTime(Object obj, ZoneId
serverTimeZone) {
+ if (obj == null) {
+ return null;
+ }
+ if (obj instanceof OffsetDateTime) {
+ return ((OffsetDateTime) obj).toLocalDateTime();
+ }
+ if (obj instanceof Instant) {
+ return ((Instant) obj).atOffset(ZoneOffset.UTC).toLocalDateTime();
+ }
+ if (obj instanceof LocalDateTime) {
+ return (LocalDateTime) obj;
+ }
+ if (obj instanceof LocalDate) {
+ LocalDate date = (LocalDate) obj;
+ return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+ }
+ if (obj instanceof LocalTime) {
+ LocalTime time = (LocalTime) obj;
+ return LocalDateTime.of(EPOCH, time);
+ }
+ if (obj instanceof java.sql.Date) {
+ java.sql.Date sqlDate = (java.sql.Date) obj;
+ LocalDate date = sqlDate.toLocalDate();
+ return LocalDateTime.of(date, LocalTime.MIDNIGHT);
+ }
+ if (obj instanceof java.sql.Time) {
+ LocalTime localTime = toLocalTime(obj);
+ return LocalDateTime.of(EPOCH, localTime);
+ }
+ if (obj instanceof java.sql.Timestamp) {
+ java.sql.Timestamp timestamp = (java.sql.Timestamp) obj;
+ return LocalDateTime.of(
+ timestamp.getYear() + 1900,
+ timestamp.getMonth() + 1,
+ timestamp.getDate(),
+ timestamp.getHours(),
+ timestamp.getMinutes(),
+ timestamp.getSeconds(),
+ timestamp.getNanos());
+ }
+ if (obj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date) obj;
+ long millis = (int) (date.getTime() % MILLISECONDS_PER_SECOND);
+ if (millis < 0) {
+ millis = MILLISECONDS_PER_SECOND + millis;
+ }
+ int nanosOfSecond = (int) (millis * NANOSECONDS_PER_MILLISECOND);
+ return LocalDateTime.of(
+ date.getYear() + 1900,
+ date.getMonth() + 1,
+ date.getDate(),
+ date.getHours(),
+ date.getMinutes(),
+ date.getSeconds(),
+ nanosOfSecond);
+ }
+ if (obj instanceof String) {
+ String str = (String) obj;
+ // TIMESTAMP type is encoded in string type
+ Instant instant = Instant.parse(str);
+ return LocalDateTime.ofInstant(instant, serverTimeZone);
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to LocalDateTime from unexpected value '"
+ + obj
+ + "' of type "
+ + obj.getClass().getName());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
index e5ec3a4b4..6f83f8107 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
@@ -39,6 +39,11 @@
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-jdbc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<dependencyManagement>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceOptions.java
deleted file mode 100644
index 7801cbe79..000000000
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceOptions.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class MySqlSourceOptions {
- public static final Option<String> SERVER_ID =
- Options.key("server-id")
- .stringType()
- .noDefaultValue()
- .withDescription("A numeric ID or a numeric ID range of this
database client, "
- + "The numeric ID syntax is like '5400', the numeric ID range
syntax "
- + "is like '5400-5408', The numeric ID range syntax is
recommended when "
- + "'scan.incremental.snapshot.enabled' enabled. Every ID must
be unique across all "
- + "currently-running database processes in the MySQL cluster.
This connector"
- + " joins the MySQL cluster as another server (with this
unique ID) "
- + "so it can read the binlog. By default, a random number is
generated between"
- + " 5400 and 6400, though we recommend setting an explicit
value.");
-}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 7f01380fc..be3cfbc4f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -19,19 +19,27 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
+import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
-import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalog;
import com.google.auto.service.AutoService;
+import java.time.ZoneId;
+
@AutoService(SeaTunnelSource.class)
public class MySqlIncrementalSource<T> extends IncrementalSource<T,
JdbcSourceConfig> {
@Override
@@ -42,14 +50,27 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
@Override
public SourceConfig.Factory<JdbcSourceConfig>
createSourceConfigFactory(ReadonlyConfig config) {
MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
- configFactory.serverId(config.get(MySqlSourceOptions.SERVER_ID));
+ configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
+ configFactory.fromReadonlyConfig(readonlyConfig);
return configFactory;
}
+ @SuppressWarnings("unchecked")
@Override
public DebeziumDeserializationSchema<T>
createDebeziumDeserializationSchema(ReadonlyConfig config) {
- // TODO: seatunnel row
- return null;
+ JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
+ String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
+ // TODO: support multi-table
+ // TODO: support metadata keys
+ MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql",
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(),
jdbcSourceConfig.getPassword(), baseUrl);
+ CatalogTable table =
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0),
jdbcSourceConfig.getTableList().get(0)));
+ SeaTunnelRowType physicalRowType =
table.getTableSchema().toPhysicalRowDataType();
+ String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
+ return (DebeziumDeserializationSchema<T>)
SeaTunnelRowDebeziumDeserializeSchema.builder()
+ .setPhysicalRowType(physicalRowType)
+ .setResultTypeInfo(physicalRowType)
+ .setServerTimeZone(ZoneId.of(zoneId))
+ .build();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
new file mode 100644
index 000000000..93ea1e609
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/JdbcCatalogOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class JdbcCatalogOptions {
+ public static final Option<String> BASE_URL = Options.key("base-url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("URL has to be without database, like
\"jdbc:mysql://localhost:5432/\" or" +
+ "\"jdbc:mysql://localhost:5432\" rather than
\"jdbc:mysql://localhost:5432/db\"");
+}