This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1a8da1c843 [Improve] Support `int identity` type in sql server (#6186)
1a8da1c843 is described below
commit 1a8da1c8437a6f5c419bdb7220bd2600cf9425ec
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jan 18 13:46:50 2024 +0800
[Improve] Support `int identity` type in sql server (#6186)
* [Improve] Support `int identity` type in sql server
* update
* update
* update
* update
---
.../sqlserver/source/utils/SqlServerTypeUtils.java | 2 ++
.../sqlserver/SqlServerDataTypeConvertor.java | 1 +
.../jdbc/catalog/sqlserver/SqlServerType.java | 1 +
.../connectors/seatunnel/jdbc/AbstractJdbcIT.java | 3 ++
.../connectors/seatunnel/jdbc/JdbcCase.java | 1 +
.../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 42 +++++++++++++++++++++-
6 files changed, 49 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
index ec617b710d..72b6318ed1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -49,6 +49,7 @@ public class SqlServerTypeUtils {
// ------------------------------number-------------------------
private static final String SQLSERVER_INTEGER = "INT";
+ private static final String SQLSERVER_INT_IDENTITY = "INT IDENTITY";
private static final String SQLSERVER_SMALLINT = "SMALLINT";
private static final String SQLSERVER_TINYINT = "TINYINT";
private static final String SQLSERVER_BIGINT = "BIGINT";
@@ -91,6 +92,7 @@ public class SqlServerTypeUtils {
case SQLSERVER_VARBINARY:
return PrimitiveByteArrayType.INSTANCE;
case SQLSERVER_INTEGER:
+ case SQLSERVER_INT_IDENTITY:
return BasicType.INT_TYPE;
case SQLSERVER_SMALLINT:
case SQLSERVER_TINYINT:
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index d874c52345..9e41d85952 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -60,6 +60,7 @@ public class SqlServerDataTypeConvertor implements
DataTypeConvertor<SqlServerTy
case SMALLINT:
return BasicType.SHORT_TYPE;
case INTEGER:
+ case INT_IDENTITY:
return BasicType.INT_TYPE;
case BIGINT:
return BasicType.LONG_TYPE;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
index e848498c93..eee16cfa61 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java
@@ -34,6 +34,7 @@ public enum SqlServerType implements SQLType {
BIT("bit", java.sql.Types.BIT, Boolean.class),
SMALLINT("smallint", java.sql.Types.SMALLINT, Short.class),
INTEGER("int", java.sql.Types.INTEGER, Integer.class),
+ INT_IDENTITY("int identity", java.sql.Types.INTEGER, Integer.class),
BIGINT("bigint", java.sql.Types.BIGINT, Long.class),
FLOAT("float", java.sql.Types.DOUBLE, Double.class),
REAL("real", java.sql.Types.REAL, Float.class),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
index f61381d24b..08a9868ab6 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java
@@ -193,6 +193,9 @@ public abstract class AbstractJdbcIT extends TestSuiteBase
implements TestResour
jdbcCase.getDatabase(),
jdbcCase.getSchema(),
jdbcCase.getSourceTable()));
+ if (jdbcCase.getSinkCreateSql() != null) {
+ createTemplate = jdbcCase.getSinkCreateSql();
+ }
String createSink =
String.format(
createTemplate,
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
index 5f17eacc51..92ee490a20 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java
@@ -47,6 +47,7 @@ public class JdbcCase {
private String jdbcTemplate;
private String jdbcUrl;
private String createSql;
+ private String sinkCreateSql;
private String insertSql;
private List<String> configFile;
private Pair<String[], List<SeaTunnelRow>> testData;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index 93bcda6acd..56076dc4da 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -71,6 +71,44 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf");
private static final String CREATE_SQL =
"CREATE TABLE %s (\n"
+ + "\tINT_IDENTITY_TEST int identity,\n"
+ + "\tBIGINT_TEST bigint NOT NULL,\n"
+ + "\tBINARY_TEST binary(255) NULL,\n"
+ + "\tBIT_TEST bit NULL,\n"
+ + "\tCHAR_TEST char(255) COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tDATE_TEST date NULL,\n"
+ + "\tDATETIME_TEST datetime NULL,\n"
+ + "\tDATETIME2_TEST datetime2 NULL,\n"
+ + "\tDATETIMEOFFSET_TEST datetimeoffset NULL,\n"
+ + "\tDECIMAL_TEST decimal(18,2) NULL,\n"
+ + "\tFLOAT_TEST float NULL,\n"
+ + "\tIMAGE_TEST image NULL,\n"
+ + "\tINT_TEST int NULL,\n"
+ + "\tMONEY_TEST money NULL,\n"
+ + "\tNCHAR_TEST nchar(1) COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tNTEXT_TEST ntext COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tNUMERIC_TEST numeric(18,2) NULL,\n"
+ + "\tNVARCHAR_TEST nvarchar(16) COLLATE Chinese_PRC_CS_AS
NULL,\n"
+ + "\tNVARCHAR_MAX_TEST nvarchar(MAX) COLLATE
Chinese_PRC_CS_AS NULL,\n"
+ + "\tREAL_TEST real NULL,\n"
+ + "\tSMALLDATETIME_TEST smalldatetime NULL,\n"
+ + "\tSMALLINT_TEST smallint NULL,\n"
+ + "\tSMALLMONEY_TEST smallmoney NULL,\n"
+ + "\tSQL_VARIANT_TEST sql_variant NULL,\n"
+ + "\tTEXT_TEST text COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tTIME_TEST time NULL,\n"
+ + "\tTINYINT_TEST tinyint NULL,\n"
+ + "\tUNIQUEIDENTIFIER_TEST uniqueidentifier NULL,\n"
+ + "\tVARBINARY_TEST varbinary(255) NULL,\n"
+ + "\tVARBINARY_MAX_TEST varbinary(MAX) NULL,\n"
+ + "\tVARCHAR_TEST varchar(16) COLLATE Chinese_PRC_CS_AS
NULL,\n"
+ + "\tVARCHAR_MAX_TEST varchar(MAX) COLLATE
Chinese_PRC_CS_AS DEFAULT NULL NULL,\n"
+ + "\tXML_TEST xml NULL\n"
+ + ");";
+
+ private static final String SINK_CREATE_SQL =
+ "CREATE TABLE %s (\n"
+ + "\tINT_IDENTITY_TEST int NULL,\n"
+ "\tBIGINT_TEST bigint NOT NULL,\n"
+ "\tBINARY_TEST binary(255) NULL,\n"
+ "\tBIT_TEST bit NULL,\n"
@@ -138,6 +176,7 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
.catalogSchema(SQLSERVER_SCHEMA)
.catalogTable(SQLSERVER_SINK)
.createSql(CREATE_SQL)
+ .sinkCreateSql(SINK_CREATE_SQL)
.configFile(CONFIG_FILE)
.insertSql(insertSql)
.testData(testDataSet)
@@ -311,7 +350,8 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
Assertions.assertFalse(existsDataBefore);
// insert one data
sqlServerCatalog.executeSql(
- tablePathSqlserver_Sink, "insert into sink_lw(BIGINT_TEST)
values(12)");
+ tablePathSqlserver_Sink,
+ "insert into sink_lw(INT_IDENTITY_TEST, BIGINT_TEST) values(1,
12)");
boolean existsDataAfter =
sqlServerCatalog.isExistsData(tablePathSqlserver_Sink);
Assertions.assertTrue(existsDataAfter);
// truncateTable