This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 840c3e5eb Add DataTypeConvertor in Catalog (#4094)
840c3e5eb is described below
commit 840c3e5eb47d222c82dfe054182790b47c4fca30
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri Feb 10 22:14:41 2023 +0800
Add DataTypeConvertor in Catalog (#4094)
---
.../seatunnel/api/table/catalog/Catalog.java | 5 +++
.../table/catalog/DataTypeConvertException.java | 52 ++++++++++++++++++++++
.../api/table/catalog/DataTypeConvertor.java | 50 +++++++++++++++++++++
.../seatunnel/jdbc/catalog/MySqlCatalog.java | 15 +++++--
.../MysqlDataTypeConvertor.java} | 41 +++++++++++++----
.../catalog/sql/MysqlCreateTableSqlBuilder.java | 4 +-
.../jdbc/catalog/MysqlDataTypeConvertorTest.java | 37 +++++++++++++++
7 files changed, 189 insertions(+), 15 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
index 90a443d03..42e74ff8d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/Catalog.java
@@ -146,4 +146,9 @@ public interface Catalog {
// todo: Support for update table metadata
+
+ /**
+ * Return a {@link DataTypeConvertor} used to convert the data type
between SeaTunnel and the connector.
+ */
+ DataTypeConvertor<?> getDataTypeConvertor();
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
new file mode 100644
index 000000000..a4e26dd06
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class DataTypeConvertException extends SeaTunnelRuntimeException {
+ private static final String CONVERT_TO_SEA_TUNNEL_ERROR_MSG = "Convert
type: %s to SeaTunnel data type error.";
+
+ private static final String CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG =
"Convert SeaTunnel data type: %s to connector data type error.";
+
+ public DataTypeConvertException(String message) {
+ this(message, null);
+ }
+
+ public DataTypeConvertException(String message, Throwable cause) {
+ super(CommonErrorCode.UNSUPPORTED_DATA_TYPE, message, cause);
+ }
+
+ public static DataTypeConvertException
convertToSeaTunnelDataTypeException(Object dataType) {
+ return new
DataTypeConvertException(String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG,
dataType));
+ }
+
+ public static DataTypeConvertException
convertToSeaTunnelDataTypeException(Object dataType, Throwable cause) {
+ return new
DataTypeConvertException(String.format(CONVERT_TO_SEA_TUNNEL_ERROR_MSG,
dataType), cause);
+ }
+
+ public static DataTypeConvertException
convertToConnectorDataTypeException(SeaTunnelDataType<?> seaTunnelDataType) {
+ return new
DataTypeConvertException(String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG,
seaTunnelDataType));
+ }
+
+ public static DataTypeConvertException
convertToConnectorDataTypeException(SeaTunnelDataType<?> seaTunnelDataType,
Throwable cause) {
+ return new
DataTypeConvertException(String.format(CONVERT_TO_CONNECTOR_DATA_TYPE_ERROR_MSG,
seaTunnelDataType), cause);
+ }
+}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
new file mode 100644
index 000000000..fd51ce001
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/DataTypeConvertor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.catalog;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+
+import java.util.Map;
+
+/**
+ * DataTypeConvertor is used to convert the data type between connector and
SeaTunnel.
+ */
+public interface DataTypeConvertor<T> {
+
+ /**
+ * Transfer the data type from connector to SeaTunnel.
+ *
+ * @param t origin data type
+ * @param dataTypeProperties origin data type properties, e.g. precision,
scale, length
+ * @return SeaTunnel data type
+ */
+ // todo: If the origin data type contains the properties, we can remove
the dataTypeProperties.
+ SeaTunnelDataType<?> toSeaTunnelType(T t, Map<String, Object>
dataTypeProperties) throws DataTypeConvertException;
+
+ /**
+ * Transfer the data type from SeaTunnel to connector.
+ *
+ * @param seaTunnelDataType seaTunnel data type
+ * @param dataTypeProperties seaTunnel data type properties, e.g.
precision, scale, length
+ * @return origin data type
+ */
+ // todo: If the SeaTunnel data type contains the properties, we can remove
the dataTypeProperties.
+ T toConnectorType(SeaTunnelDataType<?> seaTunnelDataType, Map<String,
Object> dataTypeProperties) throws DataTypeConvertException;
+
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
index 8683d571f..7456f5f9c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MySqlCatalog.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
@@ -30,7 +31,6 @@ import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistExceptio
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql.MysqlCreateTableSqlBuilder;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetImpl;
@@ -156,6 +156,11 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
}
}
+ @Override
+ public DataTypeConvertor<MysqlType> getDataTypeConvertor() {
+ return MysqlDataTypeConvertor.getInstance();
+ }
+
// todo: If the origin source is mysql, we can directly use create table
like to create the target table?
@Override
protected boolean createTableInternal(TablePath tablePath, CatalogTable
table) throws CatalogException {
@@ -208,11 +213,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
* @see com.mysql.cj.MysqlType
* @see ResultSetImpl#getObjectStoredProc(int, int)
*/
+ @SuppressWarnings("unchecked")
private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int
colIndex) throws SQLException {
- int precision = metadata.getPrecision(colIndex);
- int scale = metadata.getScale(colIndex);
MysqlType mysqlType =
MysqlType.getByName(metadata.getColumnTypeName(colIndex));
- return DataTypeUtils.toSeaTunnelDataType(mysqlType, precision, scale);
+ Map<String, Object> dataTypeProperties = new HashMap<>();
+ dataTypeProperties.put(MysqlDataTypeConvertor.PRECISION,
metadata.getPrecision(colIndex));
+ dataTypeProperties.put(MysqlDataTypeConvertor.SCALE,
metadata.getScale(colIndex));
+ return getDataTypeConvertor().toSeaTunnelType(mysqlType,
dataTypeProperties);
}
@SuppressWarnings("MagicNumber")
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
similarity index 73%
rename from
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
rename to
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
index 83af004f5..247177033 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/DataTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertor.java
@@ -15,8 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -27,14 +29,24 @@ import
org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import com.mysql.cj.MysqlType;
-import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.MapUtils;
-@Slf4j
-@UtilityClass
-public class DataTypeUtils {
+import java.util.Map;
- public static SeaTunnelDataType<?> toSeaTunnelDataType(MysqlType
mysqlType, int precision, int scale) {
+public class MysqlDataTypeConvertor implements DataTypeConvertor<MysqlType> {
+
+ private MysqlDataTypeConvertor() {
+
+ }
+
+ private static final MysqlDataTypeConvertor INSTANCE = new
MysqlDataTypeConvertor();
+
+ public static final String PRECISION = "precision";
+ public static final String SCALE = "scale";
+
+ // todo: It's better to wrapper MysqlType to a pojo in ST, since MysqlType
doesn't contains properties.
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(MysqlType mysqlType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
switch (mysqlType) {
case NULL:
return BasicType.VOID_TYPE;
@@ -88,14 +100,21 @@ public class DataTypeUtils {
case BIGINT_UNSIGNED:
case DECIMAL:
case DECIMAL_UNSIGNED:
+ Integer precision = MapUtils.getInteger(dataTypeProperties,
PRECISION);
+ Integer scale = MapUtils.getInteger(dataTypeProperties, SCALE);
+ if (precision == null || scale == null) {
+ throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType,
+ new IllegalArgumentException("Decimal type must have
precision and scale"));
+ }
return new DecimalType(precision, scale);
// TODO: support 'SET' & 'YEAR' type
default:
- throw new
JdbcConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
String.format("Doesn't support MySQL type '%s' yet", mysqlType.getName()));
+ throw
DataTypeConvertException.convertToSeaTunnelDataTypeException(mysqlType);
}
}
- public MysqlType toMysqlType(SeaTunnelDataType<?> seaTunnelDataType) {
+ @Override
+ public MysqlType toConnectorType(SeaTunnelDataType<?> seaTunnelDataType,
Map<String, Object> dataTypeProperties) throws DataTypeConvertException {
SqlType sqlType = seaTunnelDataType.getSqlType();
// todo: verify
switch (sqlType) {
@@ -135,4 +154,8 @@ public class DataTypeUtils {
}
}
+
+ public static MysqlDataTypeConvertor getInstance() {
+ return INSTANCE;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
index c06b93160..af54696dc 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilder.java
@@ -26,7 +26,7 @@ import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DataTypeUtils;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.MysqlDataTypeConvertor;
import org.apache.commons.collections4.CollectionUtils;
@@ -147,7 +147,7 @@ public class MysqlCreateTableSqlBuilder {
// Column name
columnSqls.add(column.getName());
// Column type
-
columnSqls.add(DataTypeUtils.toMysqlType(column.getDataType()).getName());
+
columnSqls.add(MysqlDataTypeConvertor.getInstance().toConnectorType(column.getDataType(),
null).getName());
// Column length
if (column.getColumnLength() != null) {
columnSqls.add("(" + column.getColumnLength() + ")");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
new file mode 100644
index 000000000..936e7ea12
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/MysqlDataTypeConvertorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import com.mysql.cj.MysqlType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+public class MysqlDataTypeConvertorTest {
+
+ private MysqlDataTypeConvertor mysqlDataTypeConvertor =
MysqlDataTypeConvertor.getInstance();
+
+ @Test
+ public void from() {
+ Assertions.assertEquals(BasicType.VOID_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.NULL, Collections.emptyMap()));
+ Assertions.assertEquals(BasicType.STRING_TYPE,
mysqlDataTypeConvertor.toSeaTunnelType(MysqlType.VARCHAR,
Collections.emptyMap()));
+ }
+}