leonardBang commented on code in PR #4320:
URL: https://github.com/apache/flink-cdc/pull/4320#discussion_r3396731836


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/reader/SqlServerPipelineRecordEmitter.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.sqlserver.source.reader;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.base.options.StartupOptions;
+import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
+import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitState;
+import org.apache.flink.cdc.connectors.base.source.metrics.SourceReaderMetrics;
+import 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
+import 
org.apache.flink.cdc.connectors.sqlserver.source.SqlServerEventDeserializer;
+import 
org.apache.flink.cdc.connectors.sqlserver.source.config.SqlServerSourceConfig;
+import 
org.apache.flink.cdc.connectors.sqlserver.source.dialect.SqlServerDialect;
+import 
org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils;
+import org.apache.flink.cdc.connectors.sqlserver.utils.SqlServerSchemaUtils;
+import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+import io.debezium.connector.sqlserver.SqlServerConnection;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isLowWatermarkEvent;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getTableId;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+import static 
org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
+
+/** The {@link RecordEmitter} implementation for SQL Server pipeline 
connector. */
+public class SqlServerPipelineRecordEmitter<T> extends 
IncrementalSourceRecordEmitter<T> {
+    private final SqlServerSourceConfig sourceConfig;
+    private final SqlServerDialect sqlServerDialect;
+
+    // Track tables that have already sent CreateTableEvent
+    private final Set<io.debezium.relational.TableId> 
alreadySendCreateTableTables;
+
+    // Used when startup mode is snapshot (bounded mode)
+    private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
+    private final boolean isBounded;
+
+    // Cache for CreateTableEvent, using Map for O(1) lookup
+    private final Map<io.debezium.relational.TableId, CreateTableEvent> 
createTableEventCache;
+
+    public SqlServerPipelineRecordEmitter(
+            DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
+            SourceReaderMetrics sourceReaderMetrics,
+            SqlServerSourceConfig sourceConfig,
+            OffsetFactory offsetFactory,
+            SqlServerDialect sqlServerDialect) {
+        super(
+                debeziumDeserializationSchema,
+                sourceReaderMetrics,
+                sourceConfig.isIncludeSchemaChanges(),
+                offsetFactory);
+        this.sourceConfig = sourceConfig;
+        this.sqlServerDialect = sqlServerDialect;
+        this.alreadySendCreateTableTables = new HashSet<>();
+        this.createTableEventCache = new HashMap<>();
+        this.isBounded = 
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
+        generateCreateTableEvents();

Review Comment:
    Similar concern as in the deserializer: this introduces another 
connector-local create-table cache instead of reusing the framework cache / 
split-restoration mechanism already used by other pipeline connectors.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/utils/SqlServerTypeUtils.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.sqlserver.utils;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import io.debezium.relational.Column;
+
+import java.sql.Types;
+
+/** A utility class for converting SQL Server types to Flink CDC types. */
+public class SqlServerTypeUtils {
+
+    // SQL Server specific type names
+    static final String UNIQUE_IDENTIFIER = "uniqueidentifier";
+    static final String XML = "xml";
+    static final String SQL_VARIANT = "sql_variant";
+    static final String HIERARCHY_ID = "hierarchyid";
+    static final String GEOMETRY = "geometry";
+    static final String GEOGRAPHY = "geography";
+    static final String MONEY = "money";
+    static final String SMALL_MONEY = "smallmoney";
+    static final String DATETIME_OFFSET = "datetimeoffset";
+    static final String DATETIME2 = "datetime2";
+    static final String DATETIME = "datetime";
+    static final String SMALL_DATETIME = "smalldatetime";
+    static final String IMAGE = "image";
+    static final String TIMESTAMP = "timestamp";
+    static final String ROW_VERSION = "rowversion";
+    static final String TEXT = "text";
+    static final String N_TEXT = "ntext";
+
+    /** Returns a corresponding Flink CDC data type from a debezium {@link 
Column}. */
+    public static DataType fromDbzColumn(Column column) {
+        DataType dataType = convertFromColumn(column);
+        if (column.isOptional()) {
+            return dataType;
+        } else {
+            return dataType.notNull();
+        }
+    }
+
+    /**
+     * Returns a corresponding Flink CDC data type from a debezium {@link 
Column} with nullable
+     * always be true.
+     */
+    private static DataType convertFromColumn(Column column) {
+        int precision = column.length();
+        int scale = column.scale().orElse(0);
+
+        switch (column.jdbcType()) {
+            case Types.BIT:
+            case Types.BOOLEAN:
+                return DataTypes.BOOLEAN();
+            case Types.TINYINT:
+                // SQL Server TINYINT is unsigned 0-255, maps to SMALLINT
+                return DataTypes.SMALLINT();
+            case Types.SMALLINT:
+                return DataTypes.SMALLINT();
+            case Types.INTEGER:
+                return DataTypes.INT();
+            case Types.BIGINT:
+                return DataTypes.BIGINT();
+            case Types.REAL:
+                return DataTypes.FLOAT();
+            case Types.FLOAT:
+                return DataTypes.DOUBLE();
+            case Types.DOUBLE:
+                return DataTypes.DOUBLE();
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                if (precision > 0 && precision <= DecimalType.MAX_PRECISION) {
+                    return DataTypes.DECIMAL(precision, scale);
+                }
+                return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 
DecimalType.DEFAULT_SCALE);
+            case Types.CHAR:
+            case Types.NCHAR:
+                return DataTypes.CHAR(precision);
+            case Types.VARCHAR:
+            case Types.NVARCHAR:
+            case Types.LONGVARCHAR:
+            case Types.LONGNVARCHAR:
+                if (precision > 0) {
+                    return DataTypes.VARCHAR(precision);
+                }
+                return DataTypes.STRING();
+            case Types.CLOB:
+            case Types.NCLOB:
+                return DataTypes.STRING();
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            case Types.BLOB:
+                return DataTypes.BYTES();
+            case Types.DATE:
+                return DataTypes.DATE();
+            case Types.TIME:
+            case Types.TIME_WITH_TIMEZONE:
+                return DataTypes.TIME(Math.max(scale, 0));
+            case Types.TIMESTAMP:
+                return DataTypes.TIMESTAMP(scale > 0 ? scale : 6);
+            case Types.TIMESTAMP_WITH_TIMEZONE:
+                return DataTypes.TIMESTAMP_LTZ(scale > 0 ? scale : 6);
+            case Types.STRUCT:
+                // SQL Server specific types like unique identifier, xml, etc.
+                String typeName = column.typeName();
+                if (UNIQUE_IDENTIFIER.equalsIgnoreCase(typeName)) {
+                    return DataTypes.STRING();
+                }
+                return DataTypes.STRING();
+            default:
+                // For unknown types, try to handle them as STRING
+                String unknownTypeName = column.typeName();
+                if (unknownTypeName != null) {
+                    // Handle SQL Server specific types
+                    switch (unknownTypeName.toLowerCase()) {
+                        case UNIQUE_IDENTIFIER:
+                        case XML:
+                        case SQL_VARIANT:
+                        case HIERARCHY_ID:
+                        case GEOMETRY:
+                        case GEOGRAPHY:
+                            return DataTypes.STRING();
+                        case MONEY:
+                        case SMALL_MONEY:
+                            return DataTypes.DECIMAL(10, 4);

Review Comment:
   
    money and smallmoney have different ranges and must not share the same 
mapping. SQL Server money is an 8-byte type with range 
±922,337,203,685,477.5807, i.e. DECIMAL(19, 4);
   only smallmoney fits in DECIMAL(10, 4). Mapping money to DECIMAL(10, 4) will 
overflow/truncate any value above ~10 digits and silently corrupt data. Please 
split them:
     > case MONEY:
     >     return DataTypes.DECIMAL(19, 4);
     > case SMALL_MONEY:
     >     return DataTypes.DECIMAL(10, 4);
     Could you also add these two columns (with large money values) to 
`SqlServerFullTypesITCase` so the mapping is covered?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerEventDeserializer.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.sqlserver.source;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
+import 
org.apache.flink.cdc.connectors.sqlserver.table.SqlServerReadableMetadata;
+import org.apache.flink.cdc.connectors.sqlserver.utils.SqlServerSchemaUtils;
+import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
+import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
+import org.apache.flink.table.data.TimestampData;
+
+import io.debezium.data.Envelope;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
+import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.isSchemaChangeEvent;
+
+/** Event deserializer for {@link SqlServerDataSource}. */
+@Internal
+public class SqlServerEventDeserializer extends 
DebeziumEventDeserializationSchema {
+
+    private static final long serialVersionUID = 1L;
+    private final boolean includeSchemaChanges;
+    private final List<SqlServerReadableMetadata> readableMetadataList;
+
+    /**
+     * Cache to compute schema differences for ALTER events.
+     *
+     * <p>This cache is runtime-only and will be reconstructed from 
checkpointed split state (see
+     * {@link #initializeTableSchemaCacheFromSplitSchemas(Map)}). It must not 
be {@code final}
+     * because Java deserialization bypasses field initializers for {@code 
transient} fields.
+     */
+    private transient Map<TableId, Schema> tableSchemaCache;

Review Comment:
    We already have framework-level schema cache support in 
DebeziumEventDeserializationSchema via getCreateTableEventCache() / 
applyCreateTableEvent(). Re-introducing a connector-local tableSchemaCache here 
makes SQL Server diverge from the pattern used by Postgres and creates another 
state recovery path to maintain. Could we reuse the framework cache instead?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-sqlserver/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/SqlServerDataSourceOptions.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.sqlserver.source;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+import org.apache.flink.cdc.common.annotation.PublicEvolving;
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+import java.time.Duration;
+
+/** Configurations for {@link SqlServerDataSource}. */
+@PublicEvolving
+public class SqlServerDataSourceOptions {
+
+    public static final ConfigOption<String> HOSTNAME =
+            ConfigOptions.key("hostname")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("IP address or hostname of the SQL Server 
database server.");
+
+    public static final ConfigOption<Integer> PORT =
+            ConfigOptions.key("port")
+                    .intType()
+                    .defaultValue(1433)
+                    .withDescription("Integer port number of the SQL Server 
database server.");
+
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Name of the SQL Server user to use when 
connecting to the SQL Server database server.");
+
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Password to use when connecting to the SQL Server 
database server.");
+
+    public static final ConfigOption<String> TABLES =
+            ConfigOptions.key("tables")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the SQL Server tables to monitor. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database, schema and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.dbo.\\.*, 
db1.dbo.user_table_[0-9]+, db[1-2].dbo.[app|web]_order_\\.*");
+
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The session time zone in database server. If not 
set, then "
+                                    + "ZoneId.systemDefault() is used to 
determine the server time zone.");
+
+    public static final ConfigOption<String> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
+                                    + "By default, the chunk key is the first 
column of the primary key."
+                                    + "This column must be a column of the 
primary key.");
+
+    public static final ConfigOption<Integer> 
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE =
+            ConfigOptions.key("scan.incremental.snapshot.chunk.size")
+                    .intType()
+                    .defaultValue(8096)
+                    .withDescription(
+                            "The chunk size (number of rows) of table 
snapshot, captured tables are split into multiple chunks when read the snapshot 
of table.");
+
+    public static final ConfigOption<Integer> SCAN_SNAPSHOT_FETCH_SIZE =
+            ConfigOptions.key("scan.snapshot.fetch.size")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "The maximum fetch size for per poll when read 
table snapshot.");
+
+    public static final ConfigOption<Duration> CONNECT_TIMEOUT =
+            ConfigOptions.key("connect.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription(
+                            "The maximum time that the connector should wait 
after trying to connect to the SQL Server database server before timing out.");
+
+    public static final ConfigOption<Integer> CONNECTION_POOL_SIZE =
+            ConfigOptions.key("connection.pool.size")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("The connection pool size.");
+
+    public static final ConfigOption<Integer> CONNECT_MAX_RETRIES =
+            ConfigOptions.key("connect.max-retries")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription(
+                            "The max retry times that the connector should 
retry to build SQL Server database server connection.");
+
+    public static final ConfigOption<String> SCAN_STARTUP_MODE =
+            ConfigOptions.key("scan.startup.mode")
+                    .stringType()
+                    .defaultValue("initial")
+                    .withDescription(
+                            "Optional startup mode for SQL Server CDC 
consumer, valid enumerations are "
+                                    + "\"initial\", \"latest-offset\", 
\"snapshot\" or \"timestamp\"");
+
+    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.startup.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" 
startup mode");
+
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP =
+            ConfigOptions.key("scan.incremental.snapshot.backfill.skip")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to skip backfill in snapshot reading 
phase. If backfill is skipped, changes on captured tables during snapshot phase 
will be consumed later in change log reading phase instead of being merged into 
the snapshot.WARNING: Skipping backfill might lead to data inconsistency 
because some change log events happened within the snapshot phase might be 
replayed (only at-least-once semantic is promised). For example updating an 
already updated value in snapshot, or deleting an already deleted entry in 
snapshot. These replayed change log events should be handled specially.");
+
+    // 
----------------------------------------------------------------------------
+    // experimental options, won't add them to documentation
+    // 
----------------------------------------------------------------------------
+    @Experimental
+    public static final ConfigOption<Integer> CHUNK_META_GROUP_SIZE =
+            ConfigOptions.key("chunk-meta.group.size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The group size of chunk meta, if the meta size 
exceeds the group size, the meta will be divided into multiple groups.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
+                    .doubleType()
+                    .defaultValue(1000.0d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
+                    .withDescription(
+                            "The upper bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query SQL Server for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Double> 
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
+            ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
+                    .doubleType()
+                    .defaultValue(0.05d)
+                    
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
+                    .withDescription(
+                            "The lower bound of chunk key distribution factor. 
The distribution factor is used to determine whether the"
+                                    + " table is evenly distribution or not."
+                                    + " The table chunks would use evenly 
calculation optimization when the data distribution is even,"
+                                    + " and the query SQL Server for splitting 
would happen when it is uneven."
+                                    + " The distribution factor could be 
calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
+            ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to close idle readers at the end of the 
snapshot phase. This feature depends on "
+                                    + "FLIP-147: Support Checkpoints After 
Tasks Finished. The flink version is required to be "
+                                    + "greater than or equal to 1.14 when 
enabling this feature.");
+
+    @Experimental
+    public static final ConfigOption<Boolean> SCHEMA_CHANGE_ENABLED =
+            ConfigOptions.key("schema-change.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether send schema change events, by default is 
true. If set to false, the schema changes will not be sent.");
+
+    @Experimental
+    public static final ConfigOption<String> TABLES_EXCLUDE =
+            ConfigOptions.key("tables.exclude")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Table names of the SQL Server tables to Exclude. 
Regular expressions are supported. "
+                                    + "It is important to note that the dot 
(.) is treated as a delimiter for database, schema and table names. "
+                                    + "If there is a need to use a dot (.) in 
a regular expression to match any character, "
+                                    + "it is necessary to escape the dot with 
a backslash."
+                                    + "eg. db0.dbo.\\.*, 
db1.dbo.user_table_[0-9]+, db[1-2].dbo.[app|web]_order_\\.*");

Review Comment:
    Same comment as tables: this example appears to advertise cross-database 
matching, but the current factory logic does not support that.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to