This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 044bd3cfaf [Feature][SQL Transform] Check SQL Cast will fail or not
during the SQL parsing stage (#9600)
044bd3cfaf is described below
commit 044bd3cfafae08ec9eb2391f1915184b5adbecbe
Author: liuwei178 <[email protected]>
AuthorDate: Mon Jul 28 13:48:18 2025 +0800
[Feature][SQL Transform] Check SQL Cast will fail or not during the SQL
parsing stage (#9600)
Co-authored-by: liuwei <liuwei258258!>
---
.../seatunnel/transform/sql/zeta/ZetaSQLType.java | 70 +--------
.../transform/sql/zeta/functions/CastFunction.java | 161 +++++++++++++++++++++
.../sql/zeta/functions/SystemFunction.java | 9 --
.../seatunnel/transform/sql/SQLTransformTest.java | 53 +++++++
4 files changed, 219 insertions(+), 74 deletions(-)
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 1fb8dd0626..bf40565aa8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -22,13 +22,13 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;
+import org.apache.seatunnel.transform.sql.zeta.functions.CastFunction;
import org.apache.commons.collections4.CollectionUtils;
@@ -65,25 +65,6 @@ import java.util.List;
import java.util.stream.Collectors;
public class ZetaSQLType {
- public static final String DECIMAL = "DECIMAL";
- public static final String VARCHAR = "VARCHAR";
- public static final String STRING = "STRING";
- public static final String TINYINT = "TINYINT";
- public static final String SMALLINT = "SMALLINT";
- public static final String INT = "INT";
- public static final String INTEGER = "INTEGER";
- public static final String BIGINT = "BIGINT";
- public static final String LONG = "LONG";
- public static final String BYTE = "BYTE";
- public static final String BYTES = "BYTES";
- public static final String BINARY = "BINARY";
- public static final String DOUBLE = "DOUBLE";
- public static final String FLOAT = "FLOAT";
- public static final String TIMESTAMP = "TIMESTAMP";
- public static final String DATETIME = "DATETIME";
- public static final String DATE = "DATE";
- public static final String TIME = "TIME";
- public static final String BOOLEAN = "BOOLEAN";
private final SeaTunnelRowType inputRowType;
@@ -203,7 +184,10 @@ public class ZetaSQLType {
}
if (expression instanceof CastExpression) {
- return getCastType((CastExpression) expression);
+ CastExpression castExpression = (CastExpression) expression;
+ Expression leftExpression = castExpression.getLeftExpression();
+ SqlType originType =
getExpressionType(leftExpression).getSqlType();
+ return CastFunction.getCastType(originType,
castExpression.getColDataType());
}
if (expression instanceof BinaryExpression) {
@@ -328,50 +312,6 @@ public class ZetaSQLType {
return getMaxType(types);
}
- private SeaTunnelDataType<?> getCastType(CastExpression castExpression) {
- String dataType = castExpression.getColDataType().getDataType();
- switch (dataType.toUpperCase()) {
- case DECIMAL:
- List<String> ps =
castExpression.getColDataType().getArgumentsStringList();
- return new DecimalType(Integer.parseInt(ps.get(0)),
Integer.parseInt(ps.get(1)));
- case VARCHAR:
- case STRING:
- return BasicType.STRING_TYPE;
- case TINYINT:
- return BasicType.BYTE_TYPE;
- case SMALLINT:
- return BasicType.SHORT_TYPE;
- case INT:
- case INTEGER:
- return BasicType.INT_TYPE;
- case BIGINT:
- case LONG:
- return BasicType.LONG_TYPE;
- case BYTE:
- return BasicType.BYTE_TYPE;
- case BYTES:
- case BINARY:
- return PrimitiveByteArrayType.INSTANCE;
- case DOUBLE:
- return BasicType.DOUBLE_TYPE;
- case FLOAT:
- return BasicType.FLOAT_TYPE;
- case TIMESTAMP:
- case DATETIME:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- case DATE:
- return LocalTimeType.LOCAL_DATE_TYPE;
- case TIME:
- return LocalTimeType.LOCAL_TIME_TYPE;
- case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
- default:
- throw new TransformException(
- CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
- String.format("Unsupported CAST AS type: %s",
dataType));
- }
- }
-
private SeaTunnelDataType<?> getFunctionType(Function function) {
switch (function.getName().toUpperCase()) {
case ZetaSQLFunction.CHAR:
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CastFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CastFunction.java
new file mode 100644
index 0000000000..9950515cd7
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CastFunction.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import net.sf.jsqlparser.statement.create.table.ColDataType;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class CastFunction {
+
+ public static final String DECIMAL = "DECIMAL";
+ public static final String VARCHAR = "VARCHAR";
+ public static final String STRING = "STRING";
+ public static final String TINYINT = "TINYINT";
+ public static final String SMALLINT = "SMALLINT";
+ public static final String INT = "INT";
+ public static final String INTEGER = "INTEGER";
+ public static final String BIGINT = "BIGINT";
+ public static final String LONG = "LONG";
+ public static final String BYTE = "BYTE";
+ public static final String BYTES = "BYTES";
+ public static final String BINARY = "BINARY";
+ public static final String DOUBLE = "DOUBLE";
+ public static final String FLOAT = "FLOAT";
+ public static final String TIMESTAMP = "TIMESTAMP";
+ public static final String DATETIME = "DATETIME";
+ public static final String DATE = "DATE";
+ public static final String TIME = "TIME";
+ public static final String BOOLEAN = "BOOLEAN";
+
+ public static final List<SqlType> INT_CAST_TYPE =
+ Arrays.asList(
+ SqlType.TINYINT, SqlType.SMALLINT, SqlType.INT,
SqlType.BIGINT, SqlType.STRING);
+ public static final List<SqlType> LONG_CAST_TYPES =
+ Arrays.asList(
+ SqlType.TINYINT, SqlType.SMALLINT, SqlType.INT,
SqlType.BIGINT, SqlType.STRING);
+ public static final List<SqlType> FLOAT_CAST_TYPES =
+ Arrays.asList(
+ SqlType.TINYINT,
+ SqlType.SMALLINT,
+ SqlType.INT,
+ SqlType.BIGINT,
+ SqlType.FLOAT,
+ SqlType.DOUBLE,
+ SqlType.STRING);
+ public static final List<SqlType> BOOLEAN_CAST_TYPES =
+ Arrays.asList(
+ SqlType.BOOLEAN,
+ SqlType.STRING,
+ SqlType.BIGINT,
+ SqlType.INT,
+ SqlType.SMALLINT,
+ SqlType.TINYINT,
+ SqlType.FLOAT,
+ SqlType.DOUBLE);
+ public static final List<SqlType> DATETIME_CAST_TYPES =
+ Arrays.asList(SqlType.TIMESTAMP, SqlType.TIMESTAMP_TZ,
SqlType.BIGINT);
+ public static final List<SqlType> DATE_CAST_TYPES =
+ Arrays.asList(SqlType.TIMESTAMP, SqlType.TIMESTAMP_TZ,
SqlType.DATE, SqlType.INT);
+ public static final List<SqlType> TIME_CAST_TYPES =
+ Arrays.asList(SqlType.TIMESTAMP, SqlType.TIMESTAMP_TZ,
SqlType.TIME, SqlType.INT);
+
+ public static SeaTunnelDataType<?> getCastType(SqlType originType,
ColDataType colDataType) {
+ String dataType = colDataType.getDataType();
+ switch (dataType.toUpperCase()) {
+ case DECIMAL:
+ List<String> ps = colDataType.getArgumentsStringList();
+ return new DecimalType(Integer.parseInt(ps.get(0)),
Integer.parseInt(ps.get(1)));
+ case VARCHAR:
+ case STRING:
+ return BasicType.STRING_TYPE;
+ case BYTE:
+ case TINYINT:
+ if (SqlType.TINYINT.equals(originType) ||
SqlType.STRING.equals(originType)) {
+ return BasicType.BYTE_TYPE;
+ }
+ break;
+ case SMALLINT:
+ if (SqlType.TINYINT.equals(originType)
+ || SqlType.SMALLINT.equals(originType)
+ || SqlType.STRING.equals(originType)) {
+ return BasicType.SHORT_TYPE;
+ }
+ break;
+ case INT:
+ case INTEGER:
+ if (INT_CAST_TYPE.contains(originType)) {
+ return BasicType.INT_TYPE;
+ }
+ break;
+ case BIGINT:
+ case LONG:
+ if (LONG_CAST_TYPES.contains(originType)) {
+ return BasicType.LONG_TYPE;
+ }
+ break;
+ case FLOAT:
+ if (FLOAT_CAST_TYPES.contains(originType)) {
+ return BasicType.FLOAT_TYPE;
+ }
+ break;
+ case DOUBLE:
+ if (FLOAT_CAST_TYPES.contains(originType)) {
+ return BasicType.DOUBLE_TYPE;
+ }
+ break;
+ case BYTES:
+ case BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case TIMESTAMP:
+ case DATETIME:
+ if (DATETIME_CAST_TYPES.contains(originType)) {
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ }
+ break;
+ case DATE:
+ if (DATE_CAST_TYPES.contains(originType)) {
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ }
+ break;
+ case TIME:
+ if (TIME_CAST_TYPES.contains(originType)) {
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ }
+ break;
+ case BOOLEAN:
+ if (BOOLEAN_CAST_TYPES.contains(originType)) {
+ return BasicType.BOOLEAN_TYPE;
+ }
+ break;
+ }
+ throw new TransformException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ String.format("Unsupported CAST FROM %s AS type: %s",
originType.name(), dataType));
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
index c7b79b9baf..5a4b54bc0e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/SystemFunction.java
@@ -127,12 +127,6 @@ public class SystemFunction {
if (v1 instanceof LocalDateTime) {
return v1;
}
- if (v1 instanceof LocalDate) {
- return LocalDateTime.of((LocalDate) v1, LocalTime.of(0, 0,
0));
- }
- if (v1 instanceof LocalTime) {
- return LocalDateTime.of(LocalDate.now(), (LocalTime) v1);
- }
if (v1 instanceof Long) {
Instant instant = Instant.ofEpochMilli(((Long)
v1).longValue());
ZoneId zone = ZoneId.systemDefault();
@@ -162,9 +156,6 @@ public class SystemFunction {
if (v1 instanceof LocalDateTime) {
return ((LocalDateTime) v1).toLocalTime();
}
- if (v1 instanceof LocalDate) {
- return LocalTime.of(0, 0, 0);
- }
if (v1 instanceof LocalTime) {
return v1;
}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index 0d88aace0f..d2b214ec79 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -498,4 +498,57 @@ public class SQLTransformTest {
}
});
}
+
+ @Test
+ public void testCastTimestampValidate() {
+ String querySql = "select CAST(`id` AS TIMESTAMP) AS idStr, name AS
name from dual";
+ SQLTransform sqlTransform =
+ new SQLTransform(
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("query", querySql);
+ }
+ }),
+ getCatalogTable());
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> {
+ try {
+ sqlTransform.transformTableSchema();
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05],
ErrorDescription:[Unsupported operation] - Unsupported CAST FROM INT AS type:
TIMESTAMP",
+ e.getMessage());
+ throw e;
+ }
+ });
+ }
+
+ @Test
+ public void testCastIntValidate() {
+ String querySql =
+ "select id AS id, name AS name, CAST(create_time AS INT) AS
timeInt from dual";
+ SQLTransform sqlTransform =
+ new SQLTransform(
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put("query", querySql);
+ }
+ }),
+ getCatalogTable());
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> {
+ try {
+ sqlTransform.transformTableSchema();
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05],
ErrorDescription:[Unsupported operation] - Unsupported CAST FROM TIMESTAMP AS
type: INT",
+ e.getMessage());
+ throw e;
+ }
+ });
+ }
}