This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 52f201946 [FLINK-35056][cdc-connector/sqlserver] Fix scale mapping 
from SQL Server TIMESTAMP to Flink SQL timestamp
52f201946 is described below

commit 52f2019469cdeb63246317b6cb8c6b825233c27c
Author: Sergei Morozov <[email protected]>
AuthorDate: Thu Aug 22 10:09:16 2024 -0700

    [FLINK-35056][cdc-connector/sqlserver] Fix scale mapping from SQL Server 
TIMESTAMP to Flink SQL timestamp
    
    This closes #3561.
---
 .../sqlserver/source/utils/SqlServerTypeUtils.java |  4 ++--
 .../read/fetch/SqlServerScanFetchTaskTest.java     | 26 ++++++++++++++++++++++
 .../src/test/resources/ddl/pk.sql                  | 26 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
index 8f1853837..e5373e36e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -71,8 +71,8 @@ public class SqlServerTypeUtils {
                 return DataTypes.DATE();
             case Types.TIMESTAMP:
             case Types.TIMESTAMP_WITH_TIMEZONE:
-                return column.length() >= 0
-                        ? DataTypes.TIMESTAMP(column.length())
+                return column.scale().isPresent()
+                        ? DataTypes.TIMESTAMP(column.scale().get())
                         : DataTypes.TIMESTAMP();
             case Types.BOOLEAN:
                 return DataTypes.BOOLEAN();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
index 73b99ef31..13c7a68e3 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSo
 import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
@@ -50,6 +51,8 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static 
org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
@@ -189,6 +192,29 @@ public class SqlServerScanFetchTaskTest extends 
SqlServerSourceTestBase {
         assertEqualsInAnyOrder(Arrays.asList(expected), actual);
     }
 
+    @Test
+    public void testDateTimePrimaryKey() throws Exception {
+        String databaseName = "pk";
+        String tableName = "dbo.dt_pk";
+
+        initializeSqlServerTable(databaseName);
+
+        SqlServerSourceConfigFactory sourceConfigFactory =
+                getConfigFactory(databaseName, new String[] {tableName}, 8096);
+        SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
+        SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
+
+        List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, 
sqlServerDialect);
+        assertFalse(snapshotSplits.isEmpty());
+
+        RowType expectedType =
+                (RowType)
+                        DataTypes.ROW(DataTypes.FIELD("dt", 
DataTypes.TIMESTAMP(3).notNull()))
+                                .getLogicalType();
+
+        snapshotSplits.forEach(s -> assertEquals(expectedType, 
s.getSplitKeyType()));
+    }
+
     @Test
     public void testDeleteDataInSnapshotScan() throws Exception {
         String databaseName = "customer";
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql
new file mode 100644
index 000000000..3986a75be
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql
@@ -0,0 +1,26 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+-- 
+--      http://www.apache.org/licenses/LICENSE-2.0
+-- 
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE DATABASE pk;
+
+USE pk;
+EXEC sys.sp_cdc_enable_db;
+
+CREATE TABLE dt_pk (
+    dt  datetime NOT NULL PRIMARY KEY,
+    val INT
+);
+
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'dt_pk', 
@role_name = NULL, @supports_net_changes = 0;

Reply via email to