This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 75b814bc3d [improve][connector-v2][sqlserver-cdc]Unified sqlserver 
TypeUtils type conversion mode (#5668)
75b814bc3d is described below

commit 75b814bc3df5fb3d335625ea3f8534e7d6729d51
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Oct 25 11:29:57 2023 +0800

    [improve][connector-v2][sqlserver-cdc]Unified sqlserver TypeUtils type 
conversion mode (#5668)
---
 .../sqlserver/source/utils/SqlServerTypeUtils.java | 109 +++++++++++++++------
 1 file changed, 77 insertions(+), 32 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
index b8463407f1..179fcfbcf9 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerTypeUtils.java
@@ -27,56 +27,101 @@ import 
org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import io.debezium.relational.Column;
 import io.debezium.relational.Table;
 
-import java.sql.Types;
 import java.util.List;
 
 /** Utilities for converting from SqlServer types to SeaTunnel types. */
 public class SqlServerTypeUtils {
-    private static final String DATETIME_OFFSET = "datetimeoffset";
 
-    public static SeaTunnelDataType<?> convertFromColumn(Column column) {
+    // ============================data types=====================
 
-        switch (column.typeName().toLowerCase()) {
-            case DATETIME_OFFSET:
-                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
-        }
+    // -------------------------string----------------------------
+    private static final String SQLSERVER_CHAR = "CHAR";
+    private static final String SQLSERVER_VARCHAR = "VARCHAR";
+    private static final String SQLSERVER_NCHAR = "NCHAR";
+    private static final String SQLSERVER_NVARCHAR = "NVARCHAR";
+    private static final String SQLSERVER_STRUCT = "STRUCT";
+    private static final String SQLSERVER_CLOB = "CLOB";
+    private static final String SQLSERVER_LONGVARCHAR = "LONGVARCHAR";
+    private static final String SQLSERVER_LONGNVARCHAR = "LONGNVARCHAR";
+    private static final String SQLSERVER_TEXT = "TEXT";
+    private static final String SQLSERVER_NTEXT = "NTEXT";
+    private static final String SQLSERVER_XML = "XML";
+
+    // ------------------------------blob-------------------------
+    private static final String SQLSERVER_BLOB = "BLOB";
+    private static final String SQLSERVER_VARBINARY = "VARBINARY";
 
-        switch (column.jdbcType()) {
-            case Types.CHAR:
-            case Types.VARCHAR:
-            case Types.NCHAR:
-            case Types.NVARCHAR:
-            case Types.STRUCT:
-            case Types.CLOB:
-            case Types.LONGVARCHAR:
-            case Types.LONGNVARCHAR:
+    // ------------------------------number-------------------------
+    private static final String SQLSERVER_INTEGER = "INT";
+    private static final String SQLSERVER_SMALLINT = "SMALLINT";
+    private static final String SQLSERVER_TINYINT = "TINYINT";
+    private static final String SQLSERVER_BIGINT = "BIGINT";
+    private static final String SQLSERVER_FLOAT = "FLOAT";
+    private static final String SQLSERVER_REAL = "REAL";
+    private static final String SQLSERVER_DOUBLE = "DOUBLE";
+    private static final String SQLSERVER_NUMERIC = "NUMERIC";
+    private static final String SQLSERVER_DECIMAL = "DECIMAL";
+    private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY";
+    private static final String SQLSERVER_MONEY = "MONEY";
+
+    // ------------------------------date-------------------------
+    private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP";
+    private static final String SQLSERVER_DATE = "DATE";
+    private static final String SQLSERVER_TIME = "TIME";
+    private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET";
+    private static final String SQLSERVER_DATETIME2 = "DATETIME2";
+    private static final String SQLSERVER_DATETIME = "DATETIME";
+    private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME";
+
+    // ------------------------------bool-------------------------
+    private static final String SQLSERVER_BIT = "BIT";
+
+    public static SeaTunnelDataType<?> convertFromColumn(Column column) {
+        String typeName = column.typeName().toUpperCase();
+        switch (typeName) {
+            case SQLSERVER_CHAR:
+            case SQLSERVER_VARCHAR:
+            case SQLSERVER_NCHAR:
+            case SQLSERVER_NVARCHAR:
+            case SQLSERVER_STRUCT:
+            case SQLSERVER_CLOB:
+            case SQLSERVER_LONGVARCHAR:
+            case SQLSERVER_LONGNVARCHAR:
+            case SQLSERVER_TEXT:
+            case SQLSERVER_NTEXT:
+            case SQLSERVER_XML:
                 return BasicType.STRING_TYPE;
-            case Types.BLOB:
-            case Types.VARBINARY:
+            case SQLSERVER_BLOB:
+            case SQLSERVER_VARBINARY:
                 return PrimitiveByteArrayType.INSTANCE;
-            case Types.INTEGER:
+            case SQLSERVER_INTEGER:
                 return BasicType.INT_TYPE;
-            case Types.SMALLINT:
-            case Types.TINYINT:
+            case SQLSERVER_SMALLINT:
+            case SQLSERVER_TINYINT:
                 return BasicType.SHORT_TYPE;
-            case Types.BIGINT:
+            case SQLSERVER_BIGINT:
                 return BasicType.LONG_TYPE;
-            case Types.FLOAT:
-            case Types.REAL:
+            case SQLSERVER_REAL:
                 return BasicType.FLOAT_TYPE;
-            case Types.DOUBLE:
+            case SQLSERVER_DOUBLE:
+            case SQLSERVER_FLOAT:
                 return BasicType.DOUBLE_TYPE;
-            case Types.NUMERIC:
-            case Types.DECIMAL:
+            case SQLSERVER_NUMERIC:
+            case SQLSERVER_DECIMAL:
+            case SQLSERVER_SMALLMONEY:
+            case SQLSERVER_MONEY:
                 return new DecimalType(column.length(), 
column.scale().orElse(0));
-            case Types.TIMESTAMP:
+            case SQLSERVER_TIMESTAMP:
+            case SQLSERVER_DATETIMEOFFSET:
+            case SQLSERVER_DATETIME2:
+            case SQLSERVER_DATETIME:
+            case SQLSERVER_SMALLDATETIME:
                 return LocalTimeType.LOCAL_DATE_TIME_TYPE;
-            case Types.DATE:
+            case SQLSERVER_DATE:
                 return LocalTimeType.LOCAL_DATE_TYPE;
-            case Types.TIME:
+            case SQLSERVER_TIME:
                 return LocalTimeType.LOCAL_TIME_TYPE;
-            case Types.BOOLEAN:
-            case Types.BIT:
+            case SQLSERVER_BIT:
                 return BasicType.BOOLEAN_TYPE;
             default:
                 throw new UnsupportedOperationException(

Reply via email to