This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit daf28ddc99ae5678f50e6f3c3de0d959567d3522 Author: Sergei Morozov <[email protected]> AuthorDate: Thu Aug 22 10:09:16 2024 -0700 [FLINK-35056][cdc-connector/sqlserver] Fix scale mapping from SQL Server TIMESTAMP to Flink SQL timestamp This closes #3561. (cherry picked from commit 52f2019469cdeb63246317b6cb8c6b825233c27c) --- .../sqlserver/source/utils/SqlServerTypeUtils.java | 4 ++-- .../read/fetch/SqlServerScanFetchTaskTest.java | 26 ++++++++++++++++++++++ .../src/test/resources/ddl/pk.sql | 26 ++++++++++++++++++++++ 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java index 8f1853837..e5373e36e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java @@ -71,8 +71,8 @@ public class SqlServerTypeUtils { return DataTypes.DATE(); case Types.TIMESTAMP: case Types.TIMESTAMP_WITH_TIMEZONE: - return column.length() >= 0 - ? DataTypes.TIMESTAMP(column.length()) + return column.scale().isPresent() + ? DataTypes.TIMESTAMP(column.scale().get()) : DataTypes.TIMESTAMP(); case Types.BOOLEAN: return DataTypes.BOOLEAN(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java index 73b99ef31..13c7a68e3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java @@ -35,6 +35,7 @@ import org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSo import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; @@ -50,6 +51,8 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT; @@ -189,6 +192,29 @@ public class SqlServerScanFetchTaskTest extends SqlServerSourceTestBase { assertEqualsInAnyOrder(Arrays.asList(expected), actual); } + @Test + public void testDateTimePrimaryKey() throws Exception { + String databaseName = "pk"; + String tableName = "dbo.dt_pk"; + + initializeSqlServerTable(databaseName); + + SqlServerSourceConfigFactory sourceConfigFactory = + getConfigFactory(databaseName, new String[] {tableName}, 8096); + SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0); + SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig); + + List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect); + assertFalse(snapshotSplits.isEmpty()); + + RowType expectedType = + (RowType) + DataTypes.ROW(DataTypes.FIELD("dt", DataTypes.TIMESTAMP(3).notNull())) + .getLogicalType(); + + snapshotSplits.forEach(s -> assertEquals(expectedType, s.getSplitKeyType())); + } + @Test public void testDeleteDataInSnapshotScan() throws Exception { String databaseName = "customer"; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql new file mode 100644 index 000000000..3986a75be --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql @@ -0,0 +1,26 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE DATABASE pk; + +USE pk; +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE dt_pk ( + dt datetime NOT NULL PRIMARY KEY, + val INT +); + +EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'dt_pk', @role_name = NULL, @supports_net_changes = 0;
