This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6673f6f771 [Feature][Connectors-V2][Jdbc] Supports Sqlserver Niche
Data Types (#6122)
6673f6f771 is described below
commit 6673f6f7711ba199765f489c4b798fd3a9a5cefa
Author: 丑西蒙 <[email protected]>
AuthorDate: Wed Jan 3 19:55:00 2024 +0800
[Feature][Connectors-V2][Jdbc] Supports Sqlserver Niche Data Types (#6122)
---
.../sqlserver/SqlServerDataTypeConvertor.java | 2 +
.../dialect/sqlserver/SqlserverTypeMapper.java | 6 ++
.../connectors/seatunnel/jdbc/JdbcSqlServerIT.java | 108 ++++++++++++++++++++-
.../resources/jdbc_sqlserver_source_to_sink.conf | 6 +-
4 files changed, 115 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index 5f88492643..d874c52345 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -81,6 +81,8 @@ public class SqlServerDataTypeConvertor implements
DataTypeConvertor<SqlServerTy
case NVARCHAR:
case TEXT:
case XML:
+ case GUID:
+ case SQL_VARIANT:
return BasicType.STRING_TYPE;
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
index 2836441927..d727d71b7f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java
@@ -58,6 +58,9 @@ public class SqlserverTypeMapper implements
JdbcDialectTypeMapper {
private static final String SQLSERVER_NCHAR = "NCHAR";
private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
private static final String SQLSERVER_TEXT = "TEXT";
+ private static final String SQLSERVER_XML = "XML";
+ private static final String SQLSERVER_UNIQUEIDENTIFIER =
"UNIQUEIDENTIFIER";
+ private static final String SQLSERVER_SQLVARIANT = "SQL_VARIANT";
// ------------------------------time-------------------------
private static final String SQLSERVER_DATE = "DATE";
@@ -105,6 +108,9 @@ public class SqlserverTypeMapper implements
JdbcDialectTypeMapper {
case SQLSERVER_NTEXT:
case SQLSERVER_NVARCHAR:
case SQLSERVER_TEXT:
+ case SQLSERVER_XML:
+ case SQLSERVER_UNIQUEIDENTIFIER:
+ case SQLSERVER_SQLVARIANT:
return BasicType.STRING_TYPE;
case SQLSERVER_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
index e871d81fd1..59e5ff0189 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerIT.java
@@ -39,11 +39,17 @@ import org.testcontainers.utility.DockerLoggerFactory;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.math.BigDecimal;
import java.sql.SQLException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
public class JdbcSqlServerIT extends AbstractJdbcIT {
@@ -66,9 +72,39 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
Lists.newArrayList("/jdbc_sqlserver_source_to_sink.conf");
private static final String CREATE_SQL =
"CREATE TABLE %s (\n"
- + " [age] bigint NOT NULL,\n"
- + " [name] varchar(255) COLLATE Chinese_PRC_CI_AS NULL\n"
- + ")";
+ + "\tBIGINT_TEST bigint NOT NULL,\n"
+ + "\tBINARY_TEST binary(255) NULL,\n"
+ + "\tBIT_TEST bit NULL,\n"
+ + "\tCHAR_TEST char(255) COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tDATE_TEST date NULL,\n"
+ + "\tDATETIME_TEST datetime NULL,\n"
+ + "\tDATETIME2_TEST datetime2 NULL,\n"
+ + "\tDATETIMEOFFSET_TEST datetimeoffset NULL,\n"
+ + "\tDECIMAL_TEST decimal(18,2) NULL,\n"
+ + "\tFLOAT_TEST float NULL,\n"
+ + "\tIMAGE_TEST image NULL,\n"
+ + "\tINT_TEST int NULL,\n"
+ + "\tMONEY_TEST money NULL,\n"
+ + "\tNCHAR_TEST nchar(1) COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tNTEXT_TEST ntext COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tNUMERIC_TEST numeric(18,2) NULL,\n"
+ + "\tNVARCHAR_TEST nvarchar(16) COLLATE Chinese_PRC_CS_AS
NULL,\n"
+ + "\tNVARCHAR_MAX_TEST nvarchar(MAX) COLLATE
Chinese_PRC_CS_AS NULL,\n"
+ + "\tREAL_TEST real NULL,\n"
+ + "\tSMALLDATETIME_TEST smalldatetime NULL,\n"
+ + "\tSMALLINT_TEST smallint NULL,\n"
+ + "\tSMALLMONEY_TEST smallmoney NULL,\n"
+ + "\tSQL_VARIANT_TEST sql_variant NULL,\n"
+ + "\tTEXT_TEST text COLLATE Chinese_PRC_CS_AS NULL,\n"
+ + "\tTIME_TEST time NULL,\n"
+ + "\tTINYINT_TEST tinyint NULL,\n"
+ + "\tUNIQUEIDENTIFIER_TEST uniqueidentifier NULL,\n"
+ + "\tVARBINARY_TEST varbinary(255) NULL,\n"
+ + "\tVARBINARY_MAX_TEST varbinary(MAX) NULL,\n"
+ + "\tVARCHAR_TEST varchar(16) COLLATE Chinese_PRC_CS_AS
NULL,\n"
+ + "\tVARCHAR_MAX_TEST varchar(MAX) COLLATE
Chinese_PRC_CS_AS DEFAULT NULL NULL,\n"
+ + "\tXML_TEST xml NULL\n"
+ + ");";
private String username;
@@ -121,7 +157,38 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
- "age", "name",
+ "BIGINT_TEST",
+ "BINARY_TEST",
+ "BIT_TEST",
+ "CHAR_TEST",
+ "DATE_TEST",
+ "DATETIME_TEST",
+ "DATETIME2_TEST",
+ "DATETIMEOFFSET_TEST",
+ "DECIMAL_TEST",
+ "FLOAT_TEST",
+ "IMAGE_TEST",
+ "INT_TEST",
+ "MONEY_TEST",
+ "NCHAR_TEST",
+ "NTEXT_TEST",
+ "NUMERIC_TEST",
+ "NVARCHAR_TEST",
+ "NVARCHAR_MAX_TEST",
+ "REAL_TEST",
+ "SMALLDATETIME_TEST",
+ "SMALLINT_TEST",
+ "SMALLMONEY_TEST",
+ "SQL_VARIANT_TEST",
+ "TEXT_TEST",
+ "TIME_TEST",
+ "TINYINT_TEST",
+ "UNIQUEIDENTIFIER_TEST",
+ "VARBINARY_TEST",
+ "VARBINARY_MAX_TEST",
+ "VARCHAR_TEST",
+ "VARCHAR_MAX_TEST",
+ "XML_TEST",
};
List<SeaTunnelRow> rows = new ArrayList<>();
@@ -129,7 +196,38 @@ public class JdbcSqlServerIT extends AbstractJdbcIT {
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
- i, "f_" + i,
+ (long) i, // BIGINT_TEST
+ new byte[255], // BINARY_TEST
+ i % 2 == 0, // BIT_TEST
+ "CharValue" + i, // CHAR_TEST
+ LocalDate.now(), // DATE_TEST
+ LocalDateTime.now(), // DATETIME_TEST
+ LocalDateTime.now(), // DATETIME2_TEST
+ OffsetDateTime.now(), // DATETIMEOFFSET_TEST
+ new BigDecimal("123.45"), // DECIMAL_TEST
+ 3.14f, // FLOAT_TEST
+ new byte[255], // IMAGE_TEST
+ 42, // INT_TEST
+ new BigDecimal("567.89"), // MONEY_TEST
+ "N", // NCHAR_TEST
+ "NTextValue" + i, // NTEXT_TEST
+ new BigDecimal("987.65"), // NUMERIC_TEST
+ "NVarCharValue" + i, // NVARCHAR_TEST
+ "NVarCharMaxValue" + i, // NVARCHAR_MAX_TEST
+ 2.71f, // REAL_TEST
+ LocalDateTime.now(), // SMALLDATETIME_TEST
+ (short) 123, // SMALLINT_TEST
+ new BigDecimal("456.78"), // SMALLMONEY_TEST
+ "SQL Variant Value" + i, // SQL_VARIANT_TEST
+ "TextValue" + i, // TEXT_TEST
+ LocalTime.now(), // TIME_TEST
+ (short) 5, // TINYINT_TEST
+ UUID.randomUUID(), // UNIQUEIDENTIFIER_TEST
+ new byte[255], // VARBINARY_TEST
+ new byte[8000], // VARBINARY_MAX_TEST
+ "VarCharValue" + i, // VARCHAR_TEST
+ "VarCharMaxValue" + i, // VARCHAR_MAX_TEST
+ "<xml>Test" + i + "</xml>", // XML_TEST
});
rows.add(row);
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf
index 0046cdbcec..47eb53ffce 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_sqlserver_source_to_sink.conf
@@ -30,7 +30,7 @@ source {
url = "jdbc:sqlserver://sqlserver;encrypt=false;"
user = SA
password = "A_Str0ng_Required_Password"
- query = "select age, name from source"
+ query = "select * from dbo.source"
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -49,7 +49,9 @@ sink {
url = "jdbc:sqlserver://sqlserver;encrypt=false;"
user = SA
password = "A_Str0ng_Required_Password"
- query = "insert into sink(age, name) values(?,?)"
+ database = "master"
+ table = "dbo.sink"
+ generate_sink_sql = true
}
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,