This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 7c38feeed Add redshift datatype convertor (#4245)
7c38feeed is described below
commit 7c38feeedba852c5423182a2633776b25cb3d527
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 2 12:08:04 2023 +0800
Add redshift datatype convertor (#4245)
---
.../cdc/mysql/source/MySqlIncrementalSource.java | 2 +-
.../file/s3/catalog/S3DataTypeConvertor.java | 2 +-
.../jdbc/catalog/{ => mysql}/MySqlCatalog.java | 4 +-
.../catalog/{ => mysql}/MySqlCatalogFactory.java | 3 +-
.../{sql => mysql}/MysqlCreateTableSqlBuilder.java | 3 +-
.../{ => mysql}/MysqlDataTypeConvertor.java | 2 +-
.../redshift/RedshiftDataTypeConvertor.java | 179 +++++++++++++++++++++
.../jdbc/catalog/MysqlDataTypeConvertorTest.java | 1 +
.../sql/MysqlCreateTableSqlBuilderTest.java | 1 +
9 files changed, 189 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 81d6581e4..805ef934c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -39,7 +39,7 @@ import
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDese
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config.MySqlSourceConfigFactory;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffsetFactory;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MySqlCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MySqlCatalogFactory;
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
index f4d7a257e..4c245d161 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -55,6 +55,6 @@ public class S3DataTypeConvertor implements
DataTypeConvertor<SeaTunnelRowType>
@Override
public String getIdentity() {
- return FileSystemType.S3.getFileSystemPluginName();
+ return "S3";
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
similarity index 98%
rename from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
rename to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
index c8ac92f6e..d61a0a529 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
@@ -29,7 +29,7 @@ import
org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql.MysqlCreateTableSqlBuilder;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetImpl;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
similarity index 93%
rename from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
rename to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
index 1dcf02382..d24e254a0 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalogFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalogFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import com.google.auto.service.AutoService;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
similarity index 97%
rename from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
rename to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
index 333508fe2..7a9b57d89 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -26,7 +26,6 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MysqlDataTypeConvertor;
import org.apache.commons.collections4.CollectionUtils;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
similarity index 99%
rename from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
rename to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
index 584437390..130fea1fc 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql;
import static com.google.common.base.Preconditions.checkNotNull;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
new file mode 100644
index 000000000..88a1065d2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -0,0 +1,179 @@
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import com.google.auto.service.AutoService;
+import org.apache.commons.collections4.MapUtils;
+
+import java.util.Collections;
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class RedshiftDataTypeConvertor implements DataTypeConvertor<String> {
+
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
+ public static final Integer DEFAULT_PRECISION = 10;
+
+ public static final Integer DEFAULT_SCALE = 0;
+
+ /* ============================ data types ===================== */
+ private static final String REDSHIFT_SMALLINT = "SMALLINT";
+ private static final String REDSHIFT_INT2 = "INT2";
+ private static final String REDSHIFT_INTEGER = "INTEGER";
+ private static final String REDSHIFT_INT = "INT";
+ private static final String REDSHIFT_INT4 = "INT4";
+ private static final String REDSHIFT_BIGINT = "BIGINT";
+ private static final String REDSHIFT_INT8 = "INT8";
+
+ private static final String REDSHIFT_DECIMAL = "DECIMAL";
+ private static final String REDSHIFT_NUMERIC = "NUMERIC";
+ private static final String REDSHIFT_REAL = "REAL";
+ private static final String REDSHIFT_FLOAT4 = "FLOAT4";
+ private static final String REDSHIFT_DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String REDSHIFT_FLOAT8 = "FLOAT8";
+ private static final String REDSHIFT_FLOAT = "FLOAT";
+
+ private static final String REDSHIFT_BOOLEAN = "BOOLEAN";
+ private static final String REDSHIFT_BOOL = "BOOL";
+
+ private static final String REDSHIFT_CHAR = "CHAR";
+ private static final String REDSHIFT_CHARACTER = "CHARACTER";
+ private static final String REDSHIFT_NCHAR = "NCHAR";
+ private static final String REDSHIFT_BPCHAR = "BPCHAR";
+
+ private static final String REDSHIFT_VARCHAR = "VARCHAR";
+ private static final String REDSHIFT_CHARACTER_VARYING = "CHARACTER
VARYING";
+ private static final String REDSHIFT_NVARCHAR = "NVARCHAR";
+ private static final String REDSHIFT_TEXT = "TEXT";
+
+ private static final String REDSHIFT_DATE = "DATE";
+ /*FIXME*/
+
+ private static final String REDSHIFT_GEOMETRY = "GEOMETRY";
+ private static final String REDSHIFT_OID = "OID";
+ private static final String REDSHIFT_SUPER = "SUPER";
+
+ private static final String REDSHIFT_TIME = "TIME";
+ private static final String REDSHIFT_TIME_WITH_TIME_ZONE = "TIME WITH TIME
ZONE";
+
+ private static final String REDSHIFT_TIMETZ = "TIMETZ";
+ private static final String REDSHIFT_TIMESTAMP = "TIMESTAMP";
+ private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE =
"TIMESTAMP WITHOUT TIME ZONE";
+
+ private static final String REDSHIFT_TIMESTAMPTZ = "TIMESTAMPTZ";
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ return toSeaTunnelType(connectorDataType, Collections.emptyMap());
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType,
+ Map<String, Object>
dataTypeProperties) throws DataTypeConvertException {
+ checkNotNull(connectorDataType, "redshiftType cannot be null");
+ switch (connectorDataType) {
+ case REDSHIFT_SMALLINT:
+ case REDSHIFT_INT2:
+ return BasicType.SHORT_TYPE;
+ case REDSHIFT_INTEGER:
+ case REDSHIFT_INT:
+ case REDSHIFT_INT4:
+ return BasicType.INT_TYPE;
+ case REDSHIFT_BIGINT:
+ case REDSHIFT_INT8:
+ case REDSHIFT_OID:
+ return BasicType.LONG_TYPE;
+ case REDSHIFT_DECIMAL:
+ case REDSHIFT_NUMERIC:
+ Integer precision = MapUtils.getInteger(dataTypeProperties,
PRECISION, DEFAULT_PRECISION);
+ Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE,
DEFAULT_SCALE);
+ return new DecimalType(precision, scale);
+ case REDSHIFT_REAL:
+ case REDSHIFT_FLOAT4:
+ return BasicType.FLOAT_TYPE;
+ case REDSHIFT_DOUBLE_PRECISION:
+ case REDSHIFT_FLOAT8:
+ case REDSHIFT_FLOAT:
+ return BasicType.DOUBLE_TYPE;
+ case REDSHIFT_BOOLEAN:
+ case REDSHIFT_BOOL:
+ return BasicType.BOOLEAN_TYPE;
+ case REDSHIFT_CHAR:
+ case REDSHIFT_CHARACTER:
+ case REDSHIFT_NCHAR:
+ case REDSHIFT_BPCHAR:
+ case REDSHIFT_VARCHAR:
+ case REDSHIFT_CHARACTER_VARYING:
+ case REDSHIFT_NVARCHAR:
+ case REDSHIFT_TEXT:
+ case REDSHIFT_SUPER:
+ return BasicType.STRING_TYPE;
+ case REDSHIFT_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case REDSHIFT_GEOMETRY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case REDSHIFT_TIME:
+ case REDSHIFT_TIME_WITH_TIME_ZONE:
+ case REDSHIFT_TIMETZ:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case REDSHIFT_TIMESTAMP:
+ case REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE:
+ case REDSHIFT_TIMESTAMPTZ:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ default:
+ throw new UnsupportedOperationException(String.format("Doesn't
support REDSHIFT type '%s'' yet.", connectorDataType));
+ }
+ }
+
+ @Override
+ public String toConnectorType(SeaTunnelDataType<?> seaTunnelDataType,
+ Map<String, Object> dataTypeProperties)
throws DataTypeConvertException {
+ checkNotNull(seaTunnelDataType, "seaTunnelDataType cannot be null");
+ SqlType sqlType = seaTunnelDataType.getSqlType();
+ switch (sqlType) {
+ case TINYINT:
+ case SMALLINT:
+ return REDSHIFT_SMALLINT;
+ case INT:
+ return REDSHIFT_INTEGER;
+ case BIGINT:
+ return REDSHIFT_BIGINT;
+ case DECIMAL:
+ return REDSHIFT_DECIMAL;
+ case FLOAT:
+ return REDSHIFT_FLOAT4;
+ case DOUBLE:
+ return REDSHIFT_DOUBLE_PRECISION;
+ case BOOLEAN:
+ return REDSHIFT_BOOLEAN;
+ case STRING:
+ return REDSHIFT_TEXT;
+ case DATE:
+ return REDSHIFT_DATE;
+ case BYTES:
+ return REDSHIFT_GEOMETRY;
+ case TIME:
+ return REDSHIFT_TIME;
+ case TIMESTAMP:
+ return REDSHIFT_TIMESTAMP;
+ default:
+ throw new UnsupportedOperationException(String.format("Doesn't
support SeaTunnel type '%s'' yet.", seaTunnelDataType));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return "REDSHIFT";
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
index b7a05ac84..29fff3e6d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlDataTypeConvertor;
import com.mysql.cj.MysqlType;
import org.junit.jupiter.api.Assertions;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
index dca2304a7..d686a78bc 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;