This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 7c38feeed Add redshift datatype convertor (#4245)
7c38feeed is described below

commit 7c38feeedba852c5423182a2633776b25cb3d527
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 2 12:08:04 2023 +0800

    Add redshift datatype convertor (#4245)
---
 .../cdc/mysql/source/MySqlIncrementalSource.java   |   2 +-
 .../file/s3/catalog/S3DataTypeConvertor.java       |   2 +-
 .../jdbc/catalog/{ => mysql}/MySqlCatalog.java     |   4 +-
 .../catalog/{ => mysql}/MySqlCatalogFactory.java   |   3 +-
 .../{sql => mysql}/MysqlCreateTableSqlBuilder.java |   3 +-
 .../{ => mysql}/MysqlDataTypeConvertor.java        |   2 +-
 .../redshift/RedshiftDataTypeConvertor.java        | 179 +++++++++++++++++++++
 .../jdbc/catalog/MysqlDataTypeConvertorTest.java   |   1 +
 .../sql/MysqlCreateTableSqlBuilderTest.java        |   1 +
 9 files changed, 189 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 81d6581e4..805ef934c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -39,7 +39,7 @@ import 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDese
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalogFactory;
 
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
index f4d7a257e..4c245d161 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -55,6 +55,6 @@ public class S3DataTypeConvertor implements 
DataTypeConvertor<SeaTunnelRowType>
 
     @Override
     public String getIdentity() {
-        return FileSystemType.S3.getFileSystemPluginName();
+        return "S3";
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
similarity index 98%
rename from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
rename to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index c8ac92f6e..d61a0a529 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
@@ -29,7 +29,7 @@ import 
org.apache.seatunnel.api.table.catalog.exception.CatalogException;
 import 
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
 import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql.MysqlCreateTableSqlBuilder;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
 
 import com.mysql.cj.MysqlType;
 import com.mysql.cj.jdbc.result.ResultSetImpl;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
similarity index 93%
rename from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
rename to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
index 1dcf02382..d24e254a0 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
 import org.apache.seatunnel.api.table.factory.CatalogFactory;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 
 import com.google.auto.service.AutoService;
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
similarity index 97%
rename from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
rename to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 333508fe2..7a9b57d89 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MysqlDataTypeConvertor;
 
 import org.apache.commons.collections4.CollectionUtils;
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
similarity index 99%
rename from 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
rename to 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
index 584437390..130fea1fc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
new file mode 100644
index 000000000..88a1065d2
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -0,0 +1,179 @@
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.Collections;
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class RedshiftDataTypeConvertor implements DataTypeConvertor<String> {
+
+    public static final String PRECISION = "precision";
+    public static final String SCALE = "scale";
+
+    public static final Integer DEFAULT_PRECISION = 10;
+
+    public static final Integer DEFAULT_SCALE = 0;
+
+    /* ============================ data types ===================== */
+    private static final String REDSHIFT_SMALLINT = "SMALLINT";
+    private static final String REDSHIFT_INT2 = "INT2";
+    private static final String REDSHIFT_INTEGER = "INTEGER";
+    private static final String REDSHIFT_INT = "INT";
+    private static final String REDSHIFT_INT4 = "INT4";
+    private static final String REDSHIFT_BIGINT = "BIGINT";
+    private static final String REDSHIFT_INT8 = "INT8";
+
+    private static final String REDSHIFT_DECIMAL = "DECIMAL";
+    private static final String REDSHIFT_NUMERIC = "NUMERIC";
+    private static final String REDSHIFT_REAL = "REAL";
+    private static final String REDSHIFT_FLOAT4 = "FLOAT4";
+    private static final String REDSHIFT_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String REDSHIFT_FLOAT8 = "FLOAT8";
+    private static final String REDSHIFT_FLOAT = "FLOAT";
+
+    private static final String REDSHIFT_BOOLEAN = "BOOLEAN";
+    private static final String REDSHIFT_BOOL = "BOOL";
+
+    private static final String REDSHIFT_CHAR = "CHAR";
+    private static final String REDSHIFT_CHARACTER = "CHARACTER";
+    private static final String REDSHIFT_NCHAR = "NCHAR";
+    private static final String REDSHIFT_BPCHAR = "BPCHAR";
+
+    private static final String REDSHIFT_VARCHAR = "VARCHAR";
+    private static final String REDSHIFT_CHARACTER_VARYING = "CHARACTER 
VARYING";
+    private static final String REDSHIFT_NVARCHAR = "NVARCHAR";
+    private static final String REDSHIFT_TEXT = "TEXT";
+
+    private static final String REDSHIFT_DATE = "DATE";
+    /*FIXME*/
+
+    private static final String REDSHIFT_GEOMETRY = "GEOMETRY";
+    private static final String REDSHIFT_OID = "OID";
+    private static final String REDSHIFT_SUPER = "SUPER";
+
+    private static final String REDSHIFT_TIME = "TIME";
+    private static final String REDSHIFT_TIME_WITH_TIME_ZONE = "TIME WITH TIME 
ZONE";
+
+    private static final String REDSHIFT_TIMETZ = "TIMETZ";
+    private static final String REDSHIFT_TIMESTAMP = "TIMESTAMP";
+    private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE = 
"TIMESTAMP WITHOUT TIME ZONE";
+
+    private static final String REDSHIFT_TIMESTAMPTZ = "TIMESTAMPTZ";
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+        return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+    }
+
+    @Override
+    public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType,
+                                                Map<String, Object> 
dataTypeProperties) throws DataTypeConvertException {
+        checkNotNull(connectorDataType, "redshiftType cannot be null");
+        switch (connectorDataType) {
+            case REDSHIFT_SMALLINT:
+            case REDSHIFT_INT2:
+                return BasicType.SHORT_TYPE;
+            case REDSHIFT_INTEGER:
+            case REDSHIFT_INT:
+            case REDSHIFT_INT4:
+                return BasicType.INT_TYPE;
+            case REDSHIFT_BIGINT:
+            case REDSHIFT_INT8:
+            case REDSHIFT_OID:
+                return BasicType.LONG_TYPE;
+            case REDSHIFT_DECIMAL:
+            case REDSHIFT_NUMERIC:
+                Integer precision = MapUtils.getInteger(dataTypeProperties, 
PRECISION, DEFAULT_PRECISION);
+                Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE, 
DEFAULT_SCALE);
+                return new DecimalType(precision, scale);
+            case REDSHIFT_REAL:
+            case REDSHIFT_FLOAT4:
+                return BasicType.FLOAT_TYPE;
+            case REDSHIFT_DOUBLE_PRECISION:
+            case REDSHIFT_FLOAT8:
+            case REDSHIFT_FLOAT:
+                return BasicType.DOUBLE_TYPE;
+            case REDSHIFT_BOOLEAN:
+            case REDSHIFT_BOOL:
+                return BasicType.BOOLEAN_TYPE;
+            case REDSHIFT_CHAR:
+            case REDSHIFT_CHARACTER:
+            case REDSHIFT_NCHAR:
+            case REDSHIFT_BPCHAR:
+            case REDSHIFT_VARCHAR:
+            case REDSHIFT_CHARACTER_VARYING:
+            case REDSHIFT_NVARCHAR:
+            case REDSHIFT_TEXT:
+            case REDSHIFT_SUPER:
+                return BasicType.STRING_TYPE;
+            case REDSHIFT_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case REDSHIFT_GEOMETRY:
+                return PrimitiveByteArrayType.INSTANCE;
+            case REDSHIFT_TIME:
+            case REDSHIFT_TIME_WITH_TIME_ZONE:
+            case REDSHIFT_TIMETZ:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case REDSHIFT_TIMESTAMP:
+            case REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE:
+            case REDSHIFT_TIMESTAMPTZ:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            default:
+                throw new UnsupportedOperationException(String.format("Doesn't 
support REDSHIFT type '%s''  yet.", connectorDataType));
+        }
+    }
+
+    @Override
+    public String toConnectorType(SeaTunnelDataType<?> seaTunnelDataType,
+                                  Map<String, Object> dataTypeProperties) 
throws DataTypeConvertException {
+        checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+        SqlType sqlType = seaTunnelDataType.getSqlType();
+        switch (sqlType) {
+            case TINYINT:
+            case SMALLINT:
+                return REDSHIFT_SMALLINT;
+            case INT:
+                return REDSHIFT_INTEGER;
+            case BIGINT:
+                return REDSHIFT_BIGINT;
+            case DECIMAL:
+                return REDSHIFT_DECIMAL;
+            case FLOAT:
+                return REDSHIFT_FLOAT4;
+            case DOUBLE:
+                return REDSHIFT_DOUBLE_PRECISION;
+            case BOOLEAN:
+                return REDSHIFT_BOOLEAN;
+            case STRING:
+                return REDSHIFT_TEXT;
+            case DATE:
+                return REDSHIFT_DATE;
+            case BYTES:
+                return REDSHIFT_GEOMETRY;
+            case TIME:
+                return REDSHIFT_TIME;
+            case TIMESTAMP:
+                return REDSHIFT_TIMESTAMP;
+            default:
+                throw new UnsupportedOperationException(String.format("Doesn't 
support SeaTunnel type '%s''  yet.", seaTunnelDataType));
+        }
+    }
+
+    @Override
+    public String getIdentity() {
+        return "REDSHIFT";
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
index b7a05ac84..29fff3e6d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
 
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlDataTypeConvertor;
 
 import com.mysql.cj.MysqlType;
 import org.junit.jupiter.api.Assertions;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
index dca2304a7..d686a78bc 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
 
 import com.google.common.collect.Lists;
 import org.junit.jupiter.api.Assertions;

Reply via email to