This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 75b814bc3d [improve][connector-v2][sqlserver-cdc]Unified sqlserver
TypeUtils type conversion mode (#5668)
75b814bc3d is described below
commit 75b814bc3df5fb3d335625ea3f8534e7d6729d51
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Oct 25 11:29:57 2023 +0800
[improve][connector-v2][sqlserver-cdc]Unified sqlserver TypeUtils type
conversion mode (#5668)
---
.../sqlserver/source/utils/SqlServerTypeUtils.java | 109 +++++++++++++++------
1 file changed, 77 insertions(+), 32 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
index b8463407f1..179fcfbcf9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -27,56 +27,101 @@ import
org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
-import java.sql.Types;
import java.util.List;
/** Utilities for converting from SqlServer types to SeaTunnel types. */
public class SqlServerTypeUtils {
- private static final String DATETIME_OFFSET = "datetimeoffset";
- public static SeaTunnelDataType<?> convertFromColumn(Column column) {
+ // ============================data types=====================
- switch (column.typeName().toLowerCase()) {
- case DATETIME_OFFSET:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- }
+ // -------------------------string----------------------------
+ private static final String SQLSERVER_CHAR = "CHAR";
+ private static final String SQLSERVER_VARCHAR = "VARCHAR";
+ private static final String SQLSERVER_NCHAR = "NCHAR";
+ private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
+ private static final String SQLSERVER_STRUCT = "STRUCT";
+ private static final String SQLSERVER_CLOB = "CLOB";
+ private static final String SQLSERVER_LONGVARCHAR = "LONGVARCHAR";
+ private static final String SQLSERVER_LONGNVARCHAR = "LONGNVARCHAR";
+ private static final String SQLSERVER_TEXT = "TEXT";
+ private static final String SQLSERVER_NTEXT = "NTEXT";
+ private static final String SQLSERVER_XML = "XML";
+
+ // ------------------------------blob-------------------------
+ private static final String SQLSERVER_BLOB = "BLOB";
+ private static final String SQLSERVER_VARBINARY = "VARBINARY";
- switch (column.jdbcType()) {
- case Types.CHAR:
- case Types.VARCHAR:
- case Types.NCHAR:
- case Types.NVARCHAR:
- case Types.STRUCT:
- case Types.CLOB:
- case Types.LONGVARCHAR:
- case Types.LONGNVARCHAR:
+ // ------------------------------number-------------------------
+ private static final String SQLSERVER_INTEGER = "INT";
+ private static final String SQLSERVER_SMALLINT = "SMALLINT";
+ private static final String SQLSERVER_TINYINT = "TINYINT";
+ private static final String SQLSERVER_BIGINT = "BIGINT";
+ private static final String SQLSERVER_FLOAT = "FLOAT";
+ private static final String SQLSERVER_REAL = "REAL";
+ private static final String SQLSERVER_DOUBLE = "DOUBLE";
+ private static final String SQLSERVER_NUMERIC = "NUMERIC";
+ private static final String SQLSERVER_DECIMAL = "DECIMAL";
+ private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY";
+ private static final String SQLSERVER_MONEY = "MONEY";
+
+ // ------------------------------date-------------------------
+ private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP";
+ private static final String SQLSERVER_DATE = "DATE";
+ private static final String SQLSERVER_TIME = "TIME";
+ private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET";
+ private static final String SQLSERVER_DATETIME2 = "DATETIME2";
+ private static final String SQLSERVER_DATETIME = "DATETIME";
+ private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME";
+
+ // ------------------------------bool-------------------------
+ private static final String SQLSERVER_BIT = "BIT";
+
+ public static SeaTunnelDataType<?> convertFromColumn(Column column) {
+ String typeName = column.typeName().toUpperCase();
+ switch (typeName) {
+ case SQLSERVER_CHAR:
+ case SQLSERVER_VARCHAR:
+ case SQLSERVER_NCHAR:
+ case SQLSERVER_NVARCHAR:
+ case SQLSERVER_STRUCT:
+ case SQLSERVER_CLOB:
+ case SQLSERVER_LONGVARCHAR:
+ case SQLSERVER_LONGNVARCHAR:
+ case SQLSERVER_TEXT:
+ case SQLSERVER_NTEXT:
+ case SQLSERVER_XML:
return BasicType.STRING_TYPE;
- case Types.BLOB:
- case Types.VARBINARY:
+ case SQLSERVER_BLOB:
+ case SQLSERVER_VARBINARY:
return PrimitiveByteArrayType.INSTANCE;
- case Types.INTEGER:
+ case SQLSERVER_INTEGER:
return BasicType.INT_TYPE;
- case Types.SMALLINT:
- case Types.TINYINT:
+ case SQLSERVER_SMALLINT:
+ case SQLSERVER_TINYINT:
return BasicType.SHORT_TYPE;
- case Types.BIGINT:
+ case SQLSERVER_BIGINT:
return BasicType.LONG_TYPE;
- case Types.FLOAT:
- case Types.REAL:
+ case SQLSERVER_REAL:
return BasicType.FLOAT_TYPE;
- case Types.DOUBLE:
+ case SQLSERVER_DOUBLE:
+ case SQLSERVER_FLOAT:
return BasicType.DOUBLE_TYPE;
- case Types.NUMERIC:
- case Types.DECIMAL:
+ case SQLSERVER_NUMERIC:
+ case SQLSERVER_DECIMAL:
+ case SQLSERVER_SMALLMONEY:
+ case SQLSERVER_MONEY:
return new DecimalType(column.length(),
column.scale().orElse(0));
- case Types.TIMESTAMP:
+ case SQLSERVER_TIMESTAMP:
+ case SQLSERVER_DATETIMEOFFSET:
+ case SQLSERVER_DATETIME2:
+ case SQLSERVER_DATETIME:
+ case SQLSERVER_SMALLDATETIME:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- case Types.DATE:
+ case SQLSERVER_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
- case Types.TIME:
+ case SQLSERVER_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
- case Types.BOOLEAN:
- case Types.BIT:
+ case SQLSERVER_BIT:
return BasicType.BOOLEAN_TYPE;
default:
throw new UnsupportedOperationException(