This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 52f201946 [FLINK-35056][cdc-connector/sqlserver] Fix scale mapping
from SQL Server TIMESTAMP to Flink SQL timestamp
52f201946 is described below
commit 52f2019469cdeb63246317b6cb8c6b825233c27c
Author: Sergei Morozov <[email protected]>
AuthorDate: Thu Aug 22 10:09:16 2024 -0700
[FLINK-35056][cdc-connector/sqlserver] Fix scale mapping from SQL Server
TIMESTAMP to Flink SQL timestamp
This closes #3561.
---
.../sqlserver/source/utils/SqlServerTypeUtils.java | 4 ++--
.../read/fetch/SqlServerScanFetchTaskTest.java | 26 ++++++++++++++++++++++
.../src/test/resources/ddl/pk.sql | 26 ++++++++++++++++++++++
3 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
index 8f1853837..e5373e36e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -71,8 +71,8 @@ public class SqlServerTypeUtils {
return DataTypes.DATE();
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
- return column.length() >= 0
- ? DataTypes.TIMESTAMP(column.length())
+ return column.scale().isPresent()
+ ? DataTypes.TIMESTAMP(column.scale().get())
: DataTypes.TIMESTAMP();
case Types.BOOLEAN:
return DataTypes.BOOLEAN();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
index 73b99ef31..13c7a68e3 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java
@@ -35,6 +35,7 @@ import
org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSo
import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
@@ -50,6 +51,8 @@ import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static
org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
@@ -189,6 +192,29 @@ public class SqlServerScanFetchTaskTest extends
SqlServerSourceTestBase {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}
+ @Test
+ public void testDateTimePrimaryKey() throws Exception {
+ String databaseName = "pk";
+ String tableName = "dbo.dt_pk";
+
+ initializeSqlServerTable(databaseName);
+
+ SqlServerSourceConfigFactory sourceConfigFactory =
+ getConfigFactory(databaseName, new String[] {tableName}, 8096);
+ SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
+ SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
+
+ List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig,
sqlServerDialect);
+ assertFalse(snapshotSplits.isEmpty());
+
+ RowType expectedType =
+ (RowType)
+ DataTypes.ROW(DataTypes.FIELD("dt",
DataTypes.TIMESTAMP(3).notNull()))
+ .getLogicalType();
+
+ snapshotSplits.forEach(s -> assertEquals(expectedType,
s.getSplitKeyType()));
+ }
+
@Test
public void testDeleteDataInSnapshotScan() throws Exception {
String databaseName = "customer";
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql
new file mode 100644
index 000000000..3986a75be
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/resources/ddl/pk.sql
@@ -0,0 +1,26 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE DATABASE pk;
+
+USE pk;
+EXEC sys.sp_cdc_enable_db;
+
+CREATE TABLE dt_pk (
+ dt datetime NOT NULL PRIMARY KEY,
+ val INT
+);
+
+EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'dt_pk',
@role_name = NULL, @supports_net_changes = 0;