This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2acf82d174 [INLONG-10238][Sort] MySQL connector support audit ID
(#10239)
2acf82d174 is described below
commit 2acf82d174c6358447422006f271785d9c59e2ab
Author: XiaoYou201 <[email protected]>
AuthorDate: Wed May 22 09:46:55 2024 +0800
[INLONG-10238][Sort] MySQL connector support audit ID (#10239)
---
.../apache/inlong/sort/mysql/MySqlTableSource.java | 372 +++++++++++
.../inlong/sort/mysql/MysqlTableFactory.java | 18 +-
.../mysql/RowDataDebeziumDeserializeSchema.java | 687 +++++++++++++++++++++
licenses/inlong-sort-connectors/LICENSE | 14 +-
4 files changed, 1082 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
new file mode 100644
index 0000000000..c244378aca
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.connectors.mysql.source.MySqlSource;
+import
com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
+import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata;
+import com.ververica.cdc.connectors.mysql.table.StartupOptions;
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.DebeziumSourceFunction;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.SourceProvider;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DynamicTableSource} that describes how to create a MySQL binlog
source from a logical
+ * description.
+ * <p>
+ * Copy from com.ververica:flink-connector-mysql-cdc-2.3.0
+ */
+public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadata {
+
+ private final ResolvedSchema physicalSchema;
+ private final int port;
+ private final String hostname;
+ private final String database;
+ private final String username;
+ private final String password;
+ private final String serverId;
+ private final String tableName;
+ private final ZoneId serverTimeZone;
+ private final Properties dbzProperties;
+ private final boolean enableParallelRead;
+ private final int splitSize;
+ private final int splitMetaGroupSize;
+ private final int fetchSize;
+ private final Duration connectTimeout;
+ private final int connectionPoolSize;
+ private final int connectMaxRetries;
+ private final double distributionFactorUpper;
+ private final double distributionFactorLower;
+ private final StartupOptions startupOptions;
+ private final boolean scanNewlyAddedTableEnabled;
+ private final Properties jdbcProperties;
+ private final Duration heartbeatInterval;
+ private final String chunkKeyColumn;
+
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ private final MetricOption metricOption;
+
+ public MySqlTableSource(
+ ResolvedSchema physicalSchema,
+ int port,
+ String hostname,
+ String database,
+ String tableName,
+ String username,
+ String password,
+ ZoneId serverTimeZone,
+ Properties dbzProperties,
+ @Nullable String serverId,
+ boolean enableParallelRead,
+ int splitSize,
+ int splitMetaGroupSize,
+ int fetchSize,
+ Duration connectTimeout,
+ int connectMaxRetries,
+ int connectionPoolSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ StartupOptions startupOptions,
+ boolean scanNewlyAddedTableEnabled,
+ Properties jdbcProperties,
+ Duration heartbeatInterval,
+ @Nullable String chunkKeyColumn,
+ MetricOption metricOption) {
+ this.physicalSchema = physicalSchema;
+ this.port = port;
+ this.hostname = checkNotNull(hostname);
+ this.database = checkNotNull(database);
+ this.tableName = checkNotNull(tableName);
+ this.username = checkNotNull(username);
+ this.password = checkNotNull(password);
+ this.serverId = serverId;
+ this.serverTimeZone = serverTimeZone;
+ this.dbzProperties = dbzProperties;
+ this.enableParallelRead = enableParallelRead;
+ this.splitSize = splitSize;
+ this.splitMetaGroupSize = splitMetaGroupSize;
+ this.fetchSize = fetchSize;
+ this.connectTimeout = connectTimeout;
+ this.connectMaxRetries = connectMaxRetries;
+ this.connectionPoolSize = connectionPoolSize;
+ this.distributionFactorUpper = distributionFactorUpper;
+ this.distributionFactorLower = distributionFactorLower;
+ this.startupOptions = startupOptions;
+ this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
+ this.jdbcProperties = jdbcProperties;
+ // Mutable attributes
+ this.producedDataType = physicalSchema.toPhysicalRowDataType();
+ this.metadataKeys = Collections.emptyList();
+ this.heartbeatInterval = heartbeatInterval;
+ this.chunkKeyColumn = chunkKeyColumn;
+ this.metricOption = metricOption;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext)
{
+ RowType physicalDataType =
+ (RowType)
physicalSchema.toPhysicalRowDataType().getLogicalType();
+ MetadataConverter[] metadataConverters = getMetadataConverters();
+ final TypeInformation<RowData> typeInfo =
+ scanContext.createTypeInformation(producedDataType);
+
+ DebeziumDeserializationSchema<RowData> deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(metadataConverters)
+ .setResultTypeInfo(typeInfo)
+ .setServerTimeZone(serverTimeZone)
+ .setUserDefinedConverterFactory(
+
MySqlDeserializationConverterFactory.instance())
+ .setSourceMetricData(metricOption == null ? null : new
SourceMetricData(metricOption))
+ .build();
+ if (enableParallelRead) {
+ MySqlSource<RowData> parallelSource =
+ MySqlSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .databaseList(database)
+ .tableList(database + "." + tableName)
+ .username(username)
+ .password(password)
+ .serverTimeZone(serverTimeZone.toString())
+ .serverId(serverId)
+ .splitSize(splitSize)
+ .splitMetaGroupSize(splitMetaGroupSize)
+ .distributionFactorUpper(distributionFactorUpper)
+ .distributionFactorLower(distributionFactorLower)
+ .fetchSize(fetchSize)
+ .connectTimeout(connectTimeout)
+ .connectMaxRetries(connectMaxRetries)
+ .connectionPoolSize(connectionPoolSize)
+ .debeziumProperties(dbzProperties)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer)
+
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+ .jdbcProperties(jdbcProperties)
+ .heartbeatInterval(heartbeatInterval)
+ .chunkKeyColumn(chunkKeyColumn)
+ .build();
+ return SourceProvider.of(parallelSource);
+ } else {
+ com.ververica.cdc.connectors.mysql.MySqlSource.Builder<RowData>
builder =
+
com.ververica.cdc.connectors.mysql.MySqlSource.<RowData>builder()
+ .hostname(hostname)
+ .port(port)
+ .databaseList(database)
+ .tableList(database + "." + tableName)
+ .username(username)
+ .password(password)
+ .serverTimeZone(serverTimeZone.toString())
+ .debeziumProperties(dbzProperties)
+ .startupOptions(startupOptions)
+ .deserializer(deserializer);
+ Optional.ofNullable(serverId)
+ .ifPresent(serverId ->
builder.serverId(Integer.parseInt(serverId)));
+ DebeziumSourceFunction<RowData> sourceFunction = builder.build();
+ return SourceFunctionProvider.of(sourceFunction, false);
+ }
+ }
+
+ protected MetadataConverter[] getMetadataConverters() {
+ if (metadataKeys.isEmpty()) {
+ return new MetadataConverter[0];
+ }
+
+ return metadataKeys.stream()
+ .map(
+ key -> Stream.of(MySqlReadableMetadata.values())
+ .filter(m -> m.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new))
+ .map(MySqlReadableMetadata::getConverter)
+ .toArray(MetadataConverter[]::new);
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Stream.of(MySqlReadableMetadata.values())
+ .collect(
+ Collectors.toMap(
+ MySqlReadableMetadata::getKey,
MySqlReadableMetadata::getDataType));
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ MySqlTableSource source =
+ new MySqlTableSource(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ tableName,
+ username,
+ password,
+ serverTimeZone,
+ dbzProperties,
+ serverId,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ scanNewlyAddedTableEnabled,
+ jdbcProperties,
+ heartbeatInterval,
+ chunkKeyColumn,
+ metricOption);
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
+ return source;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof MySqlTableSource)) {
+ return false;
+ }
+ MySqlTableSource that = (MySqlTableSource) o;
+ return port == that.port
+ && enableParallelRead == that.enableParallelRead
+ && splitSize == that.splitSize
+ && splitMetaGroupSize == that.splitMetaGroupSize
+ && fetchSize == that.fetchSize
+ && distributionFactorUpper == that.distributionFactorUpper
+ && distributionFactorLower == that.distributionFactorLower
+ && scanNewlyAddedTableEnabled ==
that.scanNewlyAddedTableEnabled
+ && Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(hostname, that.hostname)
+ && Objects.equals(database, that.database)
+ && Objects.equals(username, that.username)
+ && Objects.equals(password, that.password)
+ && Objects.equals(serverId, that.serverId)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(serverTimeZone, that.serverTimeZone)
+ && Objects.equals(dbzProperties, that.dbzProperties)
+ && Objects.equals(connectTimeout, that.connectTimeout)
+ && Objects.equals(connectMaxRetries, that.connectMaxRetries)
+ && Objects.equals(connectionPoolSize, that.connectionPoolSize)
+ && Objects.equals(startupOptions, that.startupOptions)
+ && Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(jdbcProperties, that.jdbcProperties)
+ && Objects.equals(heartbeatInterval, that.heartbeatInterval)
+ && Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
+ && Objects.equals(metricOption, that.metricOption);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ physicalSchema,
+ port,
+ hostname,
+ database,
+ username,
+ password,
+ serverId,
+ tableName,
+ serverTimeZone,
+ dbzProperties,
+ enableParallelRead,
+ splitSize,
+ splitMetaGroupSize,
+ fetchSize,
+ connectTimeout,
+ connectMaxRetries,
+ connectionPoolSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ startupOptions,
+ producedDataType,
+ metadataKeys,
+ scanNewlyAddedTableEnabled,
+ jdbcProperties,
+ heartbeatInterval,
+ chunkKeyColumn,
+ metricOption);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "MySQL-CDC";
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
index c684cc3ae4..f903780a36 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -17,11 +17,12 @@
package org.apache.inlong.sort.mysql;
+import org.apache.inlong.sort.base.metric.MetricOption;
+
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
-import com.ververica.cdc.connectors.mysql.table.MySqlTableSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.ConfigOption;
@@ -48,7 +49,8 @@ import static
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProper
import static
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkState;
import static
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
-import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.base.Constants.GH_OST_DDL_CHANGE;
+import static org.apache.inlong.sort.base.Constants.GH_OST_TABLE_REGEX;
public class MysqlTableFactory implements DynamicTableSourceFactory {
@@ -97,6 +99,15 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
validateDistributionFactorLower(distributionFactorLower);
}
+ String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
+ String auditHostAndPorts = config.get(INLONG_AUDIT);
+ String auditKeys = config.get(AUDIT_KEYS);
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
return new MySqlTableSource(physicalSchema,
port,
hostname,
@@ -120,7 +131,8 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
scanNewlyAddedTableEnabled,
getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
- chunkKeyColumn);
+ chunkKeyColumn,
+ metricOption);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
new file mode 100644
index 0000000000..e98a700b62
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -0,0 +1,687 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.mysql;
+
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
+import com.ververica.cdc.debezium.table.AppendMetadataCollector;
+import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter;
+import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory;
+import com.ververica.cdc.debezium.table.MetadataConverter;
+import com.ververica.cdc.debezium.utils.TemporalConversions;
+import io.debezium.data.Envelope;
+import io.debezium.data.SpecialValueDecimal;
+import io.debezium.data.VariableScaleDecimal;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.NanoTime;
+import io.debezium.time.NanoTimestamp;
+import io.debezium.time.Timestamp;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Debezium object to Flink Table/SQL internal
data structure {@link
+ * RowData}.
+ * <p>
+ * Copy from com.ververica:flink-connector-mysql-cdc-2.3.0
+ */
+public final class RowDataDebeziumDeserializeSchema implements
DebeziumDeserializationSchema<RowData> {
+
+ private final static Logger LOG =
LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class);
+
+ private static final long serialVersionUID = 2L;
+
+ /** Custom validator to validate the row value. */
+ public interface ValueValidator extends Serializable {
+
+ void validate(RowData rowData, RowKind rowKind) throws Exception;
+ }
+
+ /** TypeInformation of the produced {@link RowData}. * */
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ /**
+ * Runtime converter that converts Kafka {@link SourceRecord}s into {@link
RowData} consisted of
+ * physical column values.
+ */
+ private final DeserializationRuntimeConverter physicalConverter;
+
+ /** Whether the deserializer needs to handle metadata columns. */
+ private final boolean hasMetadata;
+
+ /**
+ * A wrapped output collector which is used to append metadata columns
after physical columns.
+ */
+ private final AppendMetadataCollector appendMetadataCollector;
+
+ /** Validator to validate the row value. */
+ private final ValueValidator validator;
+
+ /** Changelog Mode to use for encoding changes in Flink internal data
structure. */
+ private final DebeziumChangelogMode changelogMode;
+ private final SourceMetricData sourceMetricData;
+
+ /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ RowDataDebeziumDeserializeSchema(
+ RowType physicalDataType,
+ MetadataConverter[] metadataConverters,
+ TypeInformation<RowData> resultTypeInfo,
+ ValueValidator validator,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory userDefinedConverterFactory,
+ DebeziumChangelogMode changelogMode,
+ SourceMetricData sourceMetricData) {
+ this.hasMetadata = checkNotNull(metadataConverters).length > 0;
+ this.appendMetadataCollector = new
AppendMetadataCollector(metadataConverters);
+ this.physicalConverter =
+ createConverter(
+ checkNotNull(physicalDataType),
+ serverTimeZone,
+ userDefinedConverterFactory);
+ this.resultTypeInfo = checkNotNull(resultTypeInfo);
+ this.validator = checkNotNull(validator);
+ this.changelogMode = checkNotNull(changelogMode);
+ this.sourceMetricData = sourceMetricData;
+ }
+
+ @Override
+ public void deserialize(SourceRecord record, Collector<RowData> out)
throws Exception {
+ Envelope.Operation op = Envelope.operationFor(record);
+ Struct value = (Struct) record.value();
+ Schema valueSchema = record.valueSchema();
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ GenericRowData insert = extractAfterRow(value, valueSchema);
+ validator.validate(insert, RowKind.INSERT);
+ insert.setRowKind(RowKind.INSERT);
+ if (sourceMetricData != null) {
+ out = new MetricsCollector<>(out, sourceMetricData);
+ }
+ emit(record, insert, out);
+ } else if (op == Envelope.Operation.DELETE) {
+ GenericRowData delete = extractBeforeRow(value, valueSchema);
+ validator.validate(delete, RowKind.DELETE);
+ delete.setRowKind(RowKind.DELETE);
+ emit(record, delete, out);
+ } else {
+ if (changelogMode == DebeziumChangelogMode.ALL) {
+ GenericRowData before = extractBeforeRow(value, valueSchema);
+ validator.validate(before, RowKind.UPDATE_BEFORE);
+ before.setRowKind(RowKind.UPDATE_BEFORE);
+ emit(record, before, out);
+ }
+
+ GenericRowData after = extractAfterRow(value, valueSchema);
+ validator.validate(after, RowKind.UPDATE_AFTER);
+ after.setRowKind(RowKind.UPDATE_AFTER);
+ if (sourceMetricData != null) {
+ out = new MetricsCollector<>(out, sourceMetricData);
+ }
+ emit(record, after, out);
+ }
+ }
+
+ private GenericRowData extractAfterRow(Struct value, Schema valueSchema)
throws Exception {
+ Schema afterSchema =
valueSchema.field(Envelope.FieldName.AFTER).schema();
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ return (GenericRowData) physicalConverter.convert(after, afterSchema);
+ }
+
+ private GenericRowData extractBeforeRow(Struct value, Schema valueSchema)
throws Exception {
+ Schema beforeSchema =
valueSchema.field(Envelope.FieldName.BEFORE).schema();
+ Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+ return (GenericRowData) physicalConverter.convert(before,
beforeSchema);
+ }
+
+ private void emit(SourceRecord inRecord, RowData physicalRow,
Collector<RowData> collector) {
+ if (!hasMetadata) {
+ collector.collect(physicalRow);
+ return;
+ }
+ appendMetadataCollector.inputRecord = inRecord;
+ appendMetadataCollector.outputCollector = collector;
+ appendMetadataCollector.collect(physicalRow);
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return resultTypeInfo;
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Builder
+ //
-------------------------------------------------------------------------------------
+
+ /** Builder of {@link RowDataDebeziumDeserializeSchema}. */
+ public static class Builder {
+
+ private RowType physicalRowType;
+ private TypeInformation<RowData> resultTypeInfo;
+ private MetadataConverter[] metadataConverters = new
MetadataConverter[0];
+ private ValueValidator validator = (rowData, rowKind) -> {
+ };
+ private ZoneId serverTimeZone = ZoneId.of("UTC");
+ private DeserializationRuntimeConverterFactory
userDefinedConverterFactory =
+ DeserializationRuntimeConverterFactory.DEFAULT;
+ private DebeziumChangelogMode changelogMode =
DebeziumChangelogMode.ALL;
+ private SourceMetricData sourceMetricData;
+
+ public Builder setPhysicalRowType(RowType physicalRowType) {
+ this.physicalRowType = physicalRowType;
+ return this;
+ }
+
+ public Builder setMetadataConverters(MetadataConverter[]
metadataConverters) {
+ this.metadataConverters = metadataConverters;
+ return this;
+ }
+
+ public Builder setResultTypeInfo(TypeInformation<RowData>
resultTypeInfo) {
+ this.resultTypeInfo = resultTypeInfo;
+ return this;
+ }
+
+ public Builder setValueValidator(ValueValidator validator) {
+ this.validator = validator;
+ return this;
+ }
+
+ public Builder setServerTimeZone(ZoneId serverTimeZone) {
+ this.serverTimeZone = serverTimeZone;
+ return this;
+ }
+
+ public Builder setUserDefinedConverterFactory(
+ DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
+ this.userDefinedConverterFactory = userDefinedConverterFactory;
+ return this;
+ }
+
+ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
+ this.changelogMode = changelogMode;
+ return this;
+ }
+ public Builder setSourceMetricData(SourceMetricData sourceMetricData) {
+ this.sourceMetricData = sourceMetricData;
+ return this;
+ }
+
+ public RowDataDebeziumDeserializeSchema build() {
+ return new RowDataDebeziumDeserializeSchema(
+ physicalRowType,
+ metadataConverters,
+ resultTypeInfo,
+ validator,
+ serverTimeZone,
+ userDefinedConverterFactory,
+ changelogMode,
+ sourceMetricData);
+ }
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Runtime Converters
+ //
-------------------------------------------------------------------------------------
+
+ /** Creates a runtime converter which is null safe. */
+ private static DeserializationRuntimeConverter createConverter(
+ LogicalType type,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
+ return wrapIntoNullableConverter(
+ createNotNullConverter(type, serverTimeZone,
userDefinedConverterFactory));
+ }
+
+ //
--------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason
here. It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260).
+ //
--------------------------------------------------------------------------------
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ public static DeserializationRuntimeConverter createNotNullConverter(
+ LogicalType type,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
+ // user defined converter has a higher resolve order
+ Optional<DeserializationRuntimeConverter> converter =
+ userDefinedConverterFactory.createUserDefinedConverter(type,
serverTimeZone);
+ if (converter.isPresent()) {
+ return converter.get();
+ }
+
+ // if no matched user defined converter, fallback to the default
converter
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return null;
+ }
+ };
+ case BOOLEAN:
+ return convertToBoolean();
+ case TINYINT:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return Byte.parseByte(dbzObj.toString());
+ }
+ };
+ case SMALLINT:
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return Short.parseShort(dbzObj.toString());
+ }
+ };
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ return convertToInt();
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ return convertToLong();
+ case DATE:
+ return convertToDate();
+ case TIME_WITHOUT_TIME_ZONE:
+ return convertToTime();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return convertToTimestamp(serverTimeZone);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return convertToLocalTimeZoneTimestamp(serverTimeZone);
+ case FLOAT:
+ return convertToFloat();
+ case DOUBLE:
+ return convertToDouble();
+ case CHAR:
+ case VARCHAR:
+ return convertToString();
+ case BINARY:
+ case VARBINARY:
+ return convertToBinary();
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ROW:
+ return createRowConverter(
+ (RowType) type, serverTimeZone,
userDefinedConverterFactory);
+ case ARRAY:
+ case MAP:
+ case MULTISET:
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
type);
+ }
+ }
+
+ private static DeserializationRuntimeConverter convertToBoolean() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Boolean) {
+ return dbzObj;
+ } else if (dbzObj instanceof Byte) {
+ return (byte) dbzObj == 1;
+ } else if (dbzObj instanceof Short) {
+ return (short) dbzObj == 1;
+ } else {
+ return Boolean.parseBoolean(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToInt() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return dbzObj;
+ } else if (dbzObj instanceof Long) {
+ return ((Long) dbzObj).intValue();
+ } else {
+ return Integer.parseInt(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToLong() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Integer) {
+ return ((Integer) dbzObj).longValue();
+ } else if (dbzObj instanceof Long) {
+ return dbzObj;
+ } else {
+ return Long.parseLong(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToDouble() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return ((Float) dbzObj).doubleValue();
+ } else if (dbzObj instanceof Double) {
+ return dbzObj;
+ } else {
+ return Double.parseDouble(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToFloat() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Float) {
+ return dbzObj;
+ } else if (dbzObj instanceof Double) {
+ return ((Double) dbzObj).floatValue();
+ } else {
+ return Float.parseFloat(dbzObj.toString());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToDate() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return (int)
TemporalConversions.toLocalDate(dbzObj).toEpochDay();
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToTime() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case MicroTime.SCHEMA_NAME:
+ return (int) ((long) dbzObj / 1000);
+ case NanoTime.SCHEMA_NAME:
+ return (int) ((long) dbzObj / 1000_000);
+ }
+ } else if (dbzObj instanceof Integer) {
+ return dbzObj;
+ }
+ // get number of milliseconds of the day
+ return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay()
* 1000;
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToTimestamp(ZoneId
serverTimeZone) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof Long) {
+ switch (schema.name()) {
+ case Timestamp.SCHEMA_NAME:
+ return TimestampData.fromEpochMillis((Long)
dbzObj);
+ case MicroTimestamp.SCHEMA_NAME:
+ long micro = (long) dbzObj;
+ return TimestampData.fromEpochMillis(
+ micro / 1000, (int) (micro % 1000 * 1000));
+ case NanoTimestamp.SCHEMA_NAME:
+ long nano = (long) dbzObj;
+ return TimestampData.fromEpochMillis(
+ nano / 1000_000, (int) (nano % 1000_000));
+ }
+ }
+ LocalDateTime localDateTime =
+ TemporalConversions.toLocalDateTime(dbzObj,
serverTimeZone);
+ return TimestampData.fromLocalDateTime(localDateTime);
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter
convertToLocalTimeZoneTimestamp(
+ ZoneId serverTimeZone) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof String) {
+ String str = (String) dbzObj;
+ // TIMESTAMP_LTZ type is encoded in string type
+ Instant instant = Instant.parse(str);
+ return TimestampData.fromLocalDateTime(
+ LocalDateTime.ofInstant(instant, serverTimeZone));
+ }
+ throw new IllegalArgumentException(
+ "Unable to convert to TimestampData from unexpected
value '"
+ + dbzObj
+ + "' of type "
+ + dbzObj.getClass().getName());
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToString() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ return StringData.fromString(dbzObj.toString());
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter convertToBinary() {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj instanceof byte[]) {
+ return dbzObj;
+ } else if (dbzObj instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported BYTES value type: " +
dbzObj.getClass().getSimpleName());
+ }
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter
createDecimalConverter(DecimalType decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) {
+ BigDecimal bigDecimal;
+ if (dbzObj instanceof byte[]) {
+ // decimal.handling.mode=precise
+ bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
+ } else if (dbzObj instanceof String) {
+ // decimal.handling.mode=string
+ bigDecimal = new BigDecimal((String) dbzObj);
+ } else if (dbzObj instanceof Double) {
+ // decimal.handling.mode=double
+ bigDecimal = BigDecimal.valueOf((Double) dbzObj);
+ } else {
+ if
(VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
+ SpecialValueDecimal decimal =
+ VariableScaleDecimal.toLogical((Struct)
dbzObj);
+ bigDecimal =
decimal.getDecimalValue().orElse(BigDecimal.ZERO);
+ } else {
+ // fallback to string
+ bigDecimal = new BigDecimal(dbzObj.toString());
+ }
+ }
+ return DecimalData.fromBigDecimal(bigDecimal, precision,
scale);
+ }
+ };
+ }
+
+ private static DeserializationRuntimeConverter createRowConverter(
+ RowType rowType,
+ ZoneId serverTimeZone,
+ DeserializationRuntimeConverterFactory
userDefinedConverterFactory) {
+ final DeserializationRuntimeConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(
+ logicType -> createConverter(
+ logicType,
+ serverTimeZone,
+ userDefinedConverterFactory))
+ .toArray(DeserializationRuntimeConverter[]::new);
+ final String[] fieldNames = rowType.getFieldNames().toArray(new
String[0]);
+
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ Struct struct = (Struct) dbzObj;
+ int arity = fieldNames.length;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; i++) {
+ String fieldName = fieldNames[i];
+ Field field = schema.field(fieldName);
+ if (field == null) {
+ row.setField(i, null);
+ } else {
+ Object fieldValue =
struct.getWithoutDefault(fieldName);
+ Schema fieldSchema = schema.field(fieldName).schema();
+ Object convertedField =
+ convertField(fieldConverters[i], fieldValue,
fieldSchema);
+ row.setField(i, convertedField);
+ }
+ }
+ return row;
+ }
+ };
+ }
+
+ private static Object convertField(
+ DeserializationRuntimeConverter fieldConverter, Object fieldValue,
Schema fieldSchema)
+ throws Exception {
+ if (fieldValue == null) {
+ return null;
+ } else {
+ return fieldConverter.convert(fieldValue, fieldSchema);
+ }
+ }
+
+ private static DeserializationRuntimeConverter wrapIntoNullableConverter(
+ DeserializationRuntimeConverter converter) {
+ return new DeserializationRuntimeConverter() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Object dbzObj, Schema schema) throws
Exception {
+ if (dbzObj == null) {
+ return null;
+ }
+ return converter.convert(dbzObj, schema);
+ }
+ };
+ }
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index 3546604c06..e3a995cc65 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -857,25 +857,27 @@
Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note
that the software have been modified.)
License :
https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE
-
1.3.23
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodbMongoDBTableSource.java
Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note
that the software have been modified.)
License :
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
-
1.3.24
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
-Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that
the software have been modified.)
+Source : com.ververica:flink-connector-postgres-cdc:2.3.0 (Please note that
the software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
-
-
1.3.25
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java
-Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that
the software have been modified.)
+Source : com.ververica:flink-connector-sqlserver-cdc:2.3.0 (Please note that
the software have been modified.)
+License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
+
+1.3.27
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the
software have been modified.)
License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE