leonardBang commented on code in PR #4320: URL: https://github.com/apache/flink-cdc/pull/4320#discussion_r3396731836
########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/SqlServerPipelineRecordEmitter.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.sqlserver.source.reader; + +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.CreateTableEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.connectors.base.options.StartupOptions; +import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory; +import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState; +import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter; +import org.apache.flink.cdc.connectors.sqlserver.source.SqlServerEventDeserializer; +import org.apache.flink.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig; +import org.apache.flink.cdc.connectors.sqlserver.source.dialect.SqlServerDialect; +import org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils; +import org.apache.flink.cdc.connectors.sqlserver.utils.SqlServerSchemaUtils; +import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +import io.debezium.connector.sqlserver.SqlServerConnection; +import org.apache.kafka.connect.source.SourceRecord; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; +import static org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; + +/** The {@link RecordEmitter} implementation for SQL Server pipeline connector. */ +public class SqlServerPipelineRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> { + private final SqlServerSourceConfig sourceConfig; + private final SqlServerDialect sqlServerDialect; + + // Track tables that have already sent CreateTableEvent + private final Set<io.debezium.relational.TableId> alreadySendCreateTableTables; + + // Used when startup mode is snapshot (bounded mode) + private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true; + private final boolean isBounded; + + // Cache for CreateTableEvent, using Map for O(1) lookup + private final Map<io.debezium.relational.TableId, CreateTableEvent> createTableEventCache; + + public SqlServerPipelineRecordEmitter( + DebeziumDeserializationSchema<T> debeziumDeserializationSchema, + SourceReaderMetrics sourceReaderMetrics, + SqlServerSourceConfig sourceConfig, + OffsetFactory offsetFactory, + SqlServerDialect sqlServerDialect) { + super( + debeziumDeserializationSchema, + sourceReaderMetrics, + sourceConfig.isIncludeSchemaChanges(), + offsetFactory); + this.sourceConfig = sourceConfig; + this.sqlServerDialect = sqlServerDialect; + this.alreadySendCreateTableTables = new HashSet<>(); + this.createTableEventCache = new HashMap<>(); + this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions()); + generateCreateTableEvents(); Review Comment: Similar concern as in the deserializer: this introduces another connector-local create-table cache instead of reusing the framework cache / split-restoration mechanism already used by other pipeline connectors. ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerTypeUtils.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.sqlserver.utils; + +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.table.types.logical.DecimalType; + +import io.debezium.relational.Column; + +import java.sql.Types; + +/** A utility class for converting SQL Server types to Flink CDC types. */ +public class SqlServerTypeUtils { + + // SQL Server specific type names + static final String UNIQUE_IDENTIFIER = "uniqueidentifier"; + static final String XML = "xml"; + static final String SQL_VARIANT = "sql_variant"; + static final String HIERARCHY_ID = "hierarchyid"; + static final String GEOMETRY = "geometry"; + static final String GEOGRAPHY = "geography"; + static final String MONEY = "money"; + static final String SMALL_MONEY = "smallmoney"; + static final String DATETIME_OFFSET = "datetimeoffset"; + static final String DATETIME2 = "datetime2"; + static final String DATETIME = "datetime"; + static final String SMALL_DATETIME = "smalldatetime"; + static final String IMAGE = "image"; + static final String TIMESTAMP = "timestamp"; + static final String ROW_VERSION = "rowversion"; + static final String TEXT = "text"; + static final String N_TEXT = "ntext"; + + /** Returns a corresponding Flink CDC data type from a debezium {@link Column}. */ + public static DataType fromDbzColumn(Column column) { + DataType dataType = convertFromColumn(column); + if (column.isOptional()) { + return dataType; + } else { + return dataType.notNull(); + } + } + + /** + * Returns a corresponding Flink CDC data type from a debezium {@link Column} with nullable + * always be true. + */ + private static DataType convertFromColumn(Column column) { + int precision = column.length(); + int scale = column.scale().orElse(0); + + switch (column.jdbcType()) { + case Types.BIT: + case Types.BOOLEAN: + return DataTypes.BOOLEAN(); + case Types.TINYINT: + // SQL Server TINYINT is unsigned 0-255, maps to SMALLINT + return DataTypes.SMALLINT(); + case Types.SMALLINT: + return DataTypes.SMALLINT(); + case Types.INTEGER: + return DataTypes.INT(); + case Types.BIGINT: + return DataTypes.BIGINT(); + case Types.REAL: + return DataTypes.FLOAT(); + case Types.FLOAT: + return DataTypes.DOUBLE(); + case Types.DOUBLE: + return DataTypes.DOUBLE(); + case Types.NUMERIC: + case Types.DECIMAL: + if (precision > 0 && precision <= DecimalType.MAX_PRECISION) { + return DataTypes.DECIMAL(precision, scale); + } + return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, DecimalType.DEFAULT_SCALE); + case Types.CHAR: + case Types.NCHAR: + return DataTypes.CHAR(precision); + case Types.VARCHAR: + case Types.NVARCHAR: + case Types.LONGVARCHAR: + case Types.LONGNVARCHAR: + if (precision > 0) { + return DataTypes.VARCHAR(precision); + } + return DataTypes.STRING(); + case Types.CLOB: + case Types.NCLOB: + return DataTypes.STRING(); + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + case Types.BLOB: + return DataTypes.BYTES(); + case Types.DATE: + return DataTypes.DATE(); + case Types.TIME: + case Types.TIME_WITH_TIMEZONE: + return DataTypes.TIME(Math.max(scale, 0)); + case Types.TIMESTAMP: + return DataTypes.TIMESTAMP(scale > 0 ? scale : 6); + case Types.TIMESTAMP_WITH_TIMEZONE: + return DataTypes.TIMESTAMP_LTZ(scale > 0 ? scale : 6); + case Types.STRUCT: + // SQL Server specific types like unique identifier, xml, etc. + String typeName = column.typeName(); + if (UNIQUE_IDENTIFIER.equalsIgnoreCase(typeName)) { + return DataTypes.STRING(); + } + return DataTypes.STRING(); + default: + // For unknown types, try to handle them as STRING + String unknownTypeName = column.typeName(); + if (unknownTypeName != null) { + // Handle SQL Server specific types + switch (unknownTypeName.toLowerCase()) { + case UNIQUE_IDENTIFIER: + case XML: + case SQL_VARIANT: + case HIERARCHY_ID: + case GEOMETRY: + case GEOGRAPHY: + return DataTypes.STRING(); + case MONEY: + case SMALL_MONEY: + return DataTypes.DECIMAL(10, 4); Review Comment: money and smallmoney have different ranges and must not share the same mapping. SQL Server money is an 8-byte type with range ±922,337,203,685,477.5807, i.e. DECIMAL(19, 4); only smallmoney fits in DECIMAL(10, 4). Mapping money to DECIMAL(10, 4) will overflow/truncate any value above ~10 digits and silently corrupt data. Please split them: > case MONEY: > return DataTypes.DECIMAL(19, 4); > case SMALL_MONEY: > return DataTypes.DECIMAL(10, 4); Could you also add these two columns (with large money values) to `SqlServerFullTypesITCase` so the mapping is covered? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerEventDeserializer.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.sqlserver.source; + +import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.utils.SchemaMergingUtils; +import org.apache.flink.cdc.connectors.sqlserver.table.SqlServerReadableMetadata; +import org.apache.flink.cdc.connectors.sqlserver.utils.SqlServerSchemaUtils; +import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema; +import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode; +import org.apache.flink.table.data.TimestampData; + +import io.debezium.data.Envelope; +import io.debezium.relational.history.TableChanges; +import io.debezium.relational.history.TableChanges.TableChange; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord; +import static org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent; + +/** Event deserializer for {@link SqlServerDataSource}. */ +@Internal +public class SqlServerEventDeserializer extends DebeziumEventDeserializationSchema { + + private static final long serialVersionUID = 1L; + private final boolean includeSchemaChanges; + private final List<SqlServerReadableMetadata> readableMetadataList; + + /** + * Cache to compute schema differences for ALTER events. + * + * <p>This cache is runtime-only and will be reconstructed from checkpointed split state (see + * {@link #initializeTableSchemaCacheFromSplitSchemas(Map)}). It must not be {@code final} + * because Java deserialization bypasses field initializers for {@code transient} fields. + */ + private transient Map<TableId, Schema> tableSchemaCache; Review Comment: We already have framework-level schema cache support in DebeziumEventDeserializationSchema via getCreateTableEventCache() / applyCreateTableEvent(). Re-introducing a connector-local tableSchemaCache here makes SQL Server diverge from the pattern used by Postgres and creates another state recovery path to maintain. Could we reuse the framework cache instead? ########## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSourceOptions.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.sqlserver.source; + +import org.apache.flink.cdc.common.annotation.Experimental; +import org.apache.flink.cdc.common.annotation.PublicEvolving; +import org.apache.flink.cdc.common.configuration.ConfigOption; +import org.apache.flink.cdc.common.configuration.ConfigOptions; + +import java.time.Duration; + +/** Configurations for {@link SqlServerDataSource}. */ +@PublicEvolving +public class SqlServerDataSourceOptions { + + public static final ConfigOption<String> HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the SQL Server database server."); + + public static final ConfigOption<Integer> PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(1433) + .withDescription("Integer port number of the SQL Server database server."); + + public static final ConfigOption<String> USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription( + "Name of the SQL Server user to use when connecting to the SQL Server database server."); + + public static final ConfigOption<String> PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription( + "Password to use when connecting to the SQL Server database server."); + + public static final ConfigOption<String> TABLES = + ConfigOptions.key("tables") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the SQL Server tables to monitor. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database, schema and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. db0.dbo.\\.*, db1.dbo.user_table_[0-9]+, db[1-2].dbo.[app|web]_order_\\.*"); + + public static final ConfigOption<String> SERVER_TIME_ZONE = + ConfigOptions.key("server-time-zone") + .stringType() + .noDefaultValue() + .withDescription( + "The session time zone in database server. If not set, then " + + "ZoneId.systemDefault() is used to determine the server time zone."); + + public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN = + ConfigOptions.key("scan.incremental.snapshot.chunk.key-column") + .stringType() + .noDefaultValue() + .withDescription( + "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + + "By default, the chunk key is the first column of the primary key." + + "This column must be a column of the primary key."); + + public static final ConfigOption<Integer> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE = + ConfigOptions.key("scan.incremental.snapshot.chunk.size") + .intType() + .defaultValue(8096) + .withDescription( + "The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table."); + + public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE = + ConfigOptions.key("scan.snapshot.fetch.size") + .intType() + .defaultValue(1024) + .withDescription( + "The maximum fetch size for per poll when read table snapshot."); + + public static final ConfigOption<Duration> CONNECT_TIMEOUT = + ConfigOptions.key("connect.timeout") + .durationType() + .defaultValue(Duration.ofSeconds(30)) + .withDescription( + "The maximum time that the connector should wait after trying to connect to the SQL Server database server before timing out."); + + public static final ConfigOption<Integer> CONNECTION_POOL_SIZE = + ConfigOptions.key("connection.pool.size") + .intType() + .defaultValue(20) + .withDescription("The connection pool size."); + + public static final ConfigOption<Integer> CONNECT_MAX_RETRIES = + ConfigOptions.key("connect.max-retries") + .intType() + .defaultValue(3) + .withDescription( + "The max retry times that the connector should retry to build SQL Server database server connection."); + + public static final ConfigOption<String> SCAN_STARTUP_MODE = + ConfigOptions.key("scan.startup.mode") + .stringType() + .defaultValue("initial") + .withDescription( + "Optional startup mode for SQL Server CDC consumer, valid enumerations are " + + "\"initial\", \"latest-offset\", \"snapshot\" or \"timestamp\""); + + public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = + ConfigOptions.key("scan.startup.timestamp-millis") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" startup mode"); + + public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP = + ConfigOptions.key("scan.incremental.snapshot.backfill.skip") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially."); + + // ---------------------------------------------------------------------------- + // experimental options, won't add them to documentation + // ---------------------------------------------------------------------------- + @Experimental + public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE = + ConfigOptions.key("chunk-meta.group.size") + .intType() + .defaultValue(1000) + .withDescription( + "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); + + @Experimental + public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") + .doubleType() + .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") + .withDescription( + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query SQL Server for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") + .doubleType() + .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") + .withDescription( + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + + " table is evenly distribution or not." + + " The table chunks would use evenly calculation optimization when the data distribution is even," + + " and the query SQL Server for splitting would happen when it is uneven." + + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14 when enabling this feature."); + + @Experimental + public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED = + ConfigOptions.key("schema-change.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); + + @Experimental + public static final ConfigOption<String> TABLES_EXCLUDE = + ConfigOptions.key("tables.exclude") + .stringType() + .noDefaultValue() + .withDescription( + "Table names of the SQL Server tables to Exclude. Regular expressions are supported. " + + "It is important to note that the dot (.) is treated as a delimiter for database, schema and table names. " + + "If there is a need to use a dot (.) in a regular expression to match any character, " + + "it is necessary to escape the dot with a backslash." + + "eg. db0.dbo.\\.*, db1.dbo.user_table_[0-9]+, db[1-2].dbo.[app|web]_order_\\.*"); Review Comment: Same comment as tables: this example appears to advertise cross-database matching, but the current factory logic does not support that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
