This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1a8da1c843 [Improve] Support `int identity` type in sql server (#6186)
1a8da1c843 is described below

commit 1a8da1c8437a6f5c419bdb7220bd2600cf9425ec
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 18 13:46:50 2024 +0800

    [Improve] Support `int identity` type in sql server (#6186)
    
    * [Improve] Support `int identity` type in sql server
    
    * update
    
    * update
    
    * update
    
    * update
---
 .../sqlserver/source/utils/SqlServerTypeUtils.java |  2 ++
 .../sqlserver/SqlServerDataTypeConvertor.java      |  1 +
 .../jdbc/catalog/sqlserver/SqlServerType.java      |  1 +
 .../connectors/seatunnel/jdbc/AbstractJdbcIT.java  |  3 ++
 .../connectors/seatunnel/jdbc/JdbcCase.java        |  1 +
 .../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 42 +++++++++++++++++++++-
 6 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
index ec617b710d..72b6318ed1 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -49,6 +49,7 @@ public class SqlServerTypeUtils {
 
     // ------------------------------number-------------------------
     private static final String SQLSERVER_INTEGER = "INT";
+    private static final String SQLSERVER_INT_IDENTITY = "INT IDENTITY";
     private static final String SQLSERVER_SMALLINT = "SMALLINT";
     private static final String SQLSERVER_TINYINT = "TINYINT";
     private static final String SQLSERVER_BIGINT = "BIGINT";
@@ -91,6 +92,7 @@ public class SqlServerTypeUtils {
             case SQLSERVER_VARBINARY:
                 return PrimitiveByteArrayType.INSTANCE;
             case SQLSERVER_INTEGER:
+            case SQLSERVER_INT_IDENTITY:
                 return BasicType.INT_TYPE;
             case SQLSERVER_SMALLINT:
             case SQLSERVER_TINYINT:
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index d874c52345..9e41d85952 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -60,6 +60,7 @@ public class SqlServerDataTypeConvertor implements 
DataTypeConvertor<SqlServerTy
             case SMALLINT:
                 return BasicType.SHORT_TYPE;
             case INTEGER:
+            case INT_IDENTITY:
                 return BasicType.INT_TYPE;
             case BIGINT:
                 return BasicType.LONG_TYPE;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index e848498c93..eee16cfa61 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -34,6 +34,7 @@ public enum SqlServerType implements SQLType {
     BIT("bit", java.sql.Types.BIT, Boolean.class),
     SMALLINT("smallint", java.sql.Types.SMALLINT, Short.class),
     INTEGER("int", java.sql.Types.INTEGER, Integer.class),
+    INT_IDENTITY("int identity", java.sql.Types.INTEGER, Integer.class),
     BIGINT("bigint", java.sql.Types.BIGINT, Long.class),
     FLOAT("float", java.sql.Types.DOUBLE, Double.class),
     REAL("real", java.sql.Types.REAL, Float.class),
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index f61381d24b..08a9868ab6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -193,6 +193,9 @@ public abstract class AbstractJdbcIT extends TestSuiteBase 
implements TestResour
                                     jdbcCase.getDatabase(),
                                     jdbcCase.getSchema(),
                                     jdbcCase.getSourceTable()));
+            if (jdbcCase.getSinkCreateSql() != null) {
+                createTemplate = jdbcCase.getSinkCreateSql();
+            }
             String createSink =
                     String.format(
                             createTemplate,
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index 5f17eacc51..92ee490a20 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -47,6 +47,7 @@ public class JdbcCase {
     private String jdbcTemplate;
     private String jdbcUrl;
     private String createSql;
+    private String sinkCreateSql;
     private String insertSql;
     private List<String> configFile;
     private Pair<String[], List<SeaTunnelRow>> testData;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index 93bcda6acd..56076dc4da 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -71,6 +71,44 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
             Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf");
     private static final String CREATE_SQL =
             "CREATE TABLE %s (\n"
+                    + "\tINT_IDENTITY_TEST int identity,\n"
+                    + "\tBIGINT_TEST bigint NOT NULL,\n"
+                    + "\tBINARY_TEST binary(255) NULL,\n"
+                    + "\tBIT_TEST bit NULL,\n"
+                    + "\tCHAR_TEST char(255) COLLATE Chinese_PRC_CS_AS NULL,\n"
+                    + "\tDATE_TEST date NULL,\n"
+                    + "\tDATETIME_TEST datetime NULL,\n"
+                    + "\tDATETIME2_TEST datetime2 NULL,\n"
+                    + "\tDATETIMEOFFSET_TEST datetimeoffset NULL,\n"
+                    + "\tDECIMAL_TEST decimal(18,2) NULL,\n"
+                    + "\tFLOAT_TEST float NULL,\n"
+                    + "\tIMAGE_TEST image NULL,\n"
+                    + "\tINT_TEST int NULL,\n"
+                    + "\tMONEY_TEST money NULL,\n"
+                    + "\tNCHAR_TEST nchar(1) COLLATE Chinese_PRC_CS_AS NULL,\n"
+                    + "\tNTEXT_TEST ntext COLLATE Chinese_PRC_CS_AS NULL,\n"
+                    + "\tNUMERIC_TEST numeric(18,2) NULL,\n"
+                    + "\tNVARCHAR_TEST nvarchar(16) COLLATE Chinese_PRC_CS_AS 
NULL,\n"
+                    + "\tNVARCHAR_MAX_TEST nvarchar(MAX) COLLATE 
Chinese_PRC_CS_AS NULL,\n"
+                    + "\tREAL_TEST real NULL,\n"
+                    + "\tSMALLDATETIME_TEST smalldatetime NULL,\n"
+                    + "\tSMALLINT_TEST smallint NULL,\n"
+                    + "\tSMALLMONEY_TEST smallmoney NULL,\n"
+                    + "\tSQL_VARIANT_TEST sql_variant NULL,\n"
+                    + "\tTEXT_TEST text COLLATE Chinese_PRC_CS_AS NULL,\n"
+                    + "\tTIME_TEST time NULL,\n"
+                    + "\tTINYINT_TEST tinyint NULL,\n"
+                    + "\tUNIQUEIDENTIFIER_TEST uniqueidentifier NULL,\n"
+                    + "\tVARBINARY_TEST varbinary(255) NULL,\n"
+                    + "\tVARBINARY_MAX_TEST varbinary(MAX) NULL,\n"
+                    + "\tVARCHAR_TEST varchar(16) COLLATE Chinese_PRC_CS_AS 
NULL,\n"
+                    + "\tVARCHAR_MAX_TEST varchar(MAX) COLLATE 
Chinese_PRC_CS_AS DEFAULT NULL NULL,\n"
+                    + "\tXML_TEST xml NULL\n"
+                    + ");";
+
+    private static final String SINK_CREATE_SQL =
+            "CREATE TABLE %s (\n"
+                    + "\tINT_IDENTITY_TEST int NULL,\n"
                     + "\tBIGINT_TEST bigint NOT NULL,\n"
                     + "\tBINARY_TEST binary(255) NULL,\n"
                     + "\tBIT_TEST bit NULL,\n"
@@ -138,6 +176,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
                 .catalogSchema(SQLSERVER_SCHEMA)
                 .catalogTable(SQLSERVER_SINK)
                 .createSql(CREATE_SQL)
+                .sinkCreateSql(SINK_CREATE_SQL)
                 .configFile(CONFIG_FILE)
                 .insertSql(insertSql)
                 .testData(testDataSet)
@@ -311,7 +350,8 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
         Assertions.assertFalse(existsDataBefore);
         // insert one data
         sqlServerCatalog.executeSql(
-                tablePathSqlserver_Sink, "insert into sink_lw(BIGINT_TEST) 
values(12)");
+                tablePathSqlserver_Sink,
+                "insert into sink_lw(INT_IDENTITY_TEST, BIGINT_TEST) values(1, 
12)");
         boolean existsDataAfter = 
sqlServerCatalog.isExistsData(tablePathSqlserver_Sink);
         Assertions.assertTrue(existsDataAfter);
         // truncateTable

Reply via email to