This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a2e075aa44 [Feature][Core] Support define any nested array and map
type in zeta (#9881)
a2e075aa44 is described below
commit a2e075aa446bc92e01a45b329a4032a01d2d1904
Author: dy102 <[email protected]>
AuthorDate: Tue Oct 14 10:59:09 2025 +0900
[Feature][Core] Support define any nested array and map type in zeta (#9881)
Co-authored-by: Jia Fan <[email protected]>
---
.../apache/seatunnel/api/table/type/ArrayType.java | 17 +++
.../seatunnel/api/table/type/SeaTunnelRow.java | 13 ++
.../apache/seatunnel/e2e/transform/TestSQLIT.java | 11 ++
.../test/resources/sql_transform/nested_type.conf | 57 ++++++++
.../transform/sql/zeta/ZetaSQLFunction.java | 4 +
.../seatunnel/transform/sql/zeta/ZetaSQLType.java | 16 +-
.../sql/zeta/functions/ArrayFunction.java | 105 +++----------
.../sql/zeta/functions/CommonFunction.java | 162 +++++++++++++++++++++
.../transform/sql/zeta/functions/MapFunction.java | 79 ++++++++++
.../sql/zeta/ZetaSQLTypeNestedTypeTest.java | 108 ++++++++++++++
.../sql/zeta/functions/ArrayFunctionTest.java | 75 ++++++++++
.../sql/zeta/functions/MapFunctionTest.java | 126 ++++++++++++++++
12 files changed, 683 insertions(+), 90 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 65f7651e79..63e6a7bcf0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.api.table.type;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import java.lang.reflect.Array;
import java.util.Objects;
public class ArrayType<T, E> implements SeaTunnelDataType<T> {
@@ -61,6 +64,20 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T>
{
this.elementType = elementType;
}
+ @SuppressWarnings("unchecked")
+ public static <E> ArrayType<E[], E> of(SeaTunnelDataType<E> elementType) {
+ if (elementType == null) {
+ throw CommonError.illegalArgument("elementType is null", "create
ArrayType");
+ }
+ Class<E[]> arrayClass = (Class<E[]>) toArrayClass(elementType);
+ return new ArrayType<>(arrayClass, elementType);
+ }
+
+ private static Class<?> toArrayClass(SeaTunnelDataType<?> elementType) {
+ Class<?> elementClass = elementType.getTypeClass();
+ return Array.newInstance(elementClass, 0).getClass();
+ }
+
public SeaTunnelDataType<E> getElementType() {
return elementType;
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index ed44437b82..64a8ee94f7 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -233,6 +233,12 @@ public final class SeaTunnelRow implements Serializable {
return getArrayNotNullSize((Long[]) v) * 8;
case DOUBLE:
return getArrayNotNullSize((Double[]) v) * 8;
+ case ARRAY:
+ int total = 0;
+ for (Object elem : (Object[]) v) {
+ total += getBytesForValue(elem, dataType);
+ }
+ return total;
case MAP:
return getArrayMapNotNullSize(v);
case NULL:
@@ -342,6 +348,13 @@ public final class SeaTunnelRow implements Serializable {
}
return rowSize;
default:
+ if (v.getClass().isArray() && v instanceof Object[]) {
+ int sum = 0;
+ for (Object o : (Object[]) v) {
+ sum += getBytesForValue(o);
+ }
+ return sum;
+ }
if (v instanceof Map) {
int mapSize = 0;
for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 6d9dd11856..5b04022a67 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -115,4 +115,15 @@ public class TestSQLIT extends TestSuiteBase {
container.executeJob("/sql_transform/inner_query.conf");
Assertions.assertEquals(0, innerQuerySql.getExitCode());
}
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "Spark and Flink translation has some issue on
nested type")
+ public void testNestedType(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult nestedTypeSql =
+ container.executeJob("/sql_transform/nested_type.conf");
+ Assertions.assertEquals(0, nestedTypeSql.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/nested_type.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/nested_type.conf
new file mode 100644
index 0000000000..7db99cd4c5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/nested_type.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ row.num = 1
+ string.template = ["nestedType"]
+ schema = {
+ fields {
+ name = "string"
+ }
+ }
+ }
+}
+
+transform {
+ Sql {
+ plugin_input = "fake"
+ plugin_output = "tmp_nested"
+ query = """
+ select
+ ARRAY(ARRAY(ARRAY(1,2,3), ARRAY(4,5,6), ARRAY(ARRAY(1,2,3),
ARRAY(4,5,6)), ARRAY(3, 4))) as arr_of_arr,
+ MAP('k', MAP('k', MAP('k', 1))) as map_of_map,
+ ARRAY(MAP('k', 1), MAP('k2', ARRAY(1, 2))) as arr_of_map,
+ MAP('k', ARRAY(1, 2)) as map_of_arr
+ from dual
+ """
+ }
+}
+
+sink {
+ Console {
+ plugin_input = "tmp_nested"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 1f2334ac6a..79e5c3b1ec 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -33,6 +33,7 @@ import
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction;
+import org.apache.seatunnel.transform.sql.zeta.functions.MapFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.NumericFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction;
@@ -195,6 +196,7 @@ public class ZetaSQLFunction {
// -------------------------lateralView
functions----------------------------
public static final String EXPLODE = "EXPLODE";
public static final String ARRAY = "ARRAY";
+ public static final String MAP = "MAP";
// -------------------------system functions----------------------------
public static final String COALESCE = "COALESCE";
@@ -619,6 +621,8 @@ public class ZetaSQLFunction {
return ArrayFunction.arrayMax(args);
case ARRAY_MIN:
return ArrayFunction.arrayMin(args);
+ case MAP:
+ return MapFunction.map(args);
case UUID:
return randomUUID().toString();
case COSINE_DISTANCE:
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 5300e62e08..73c92a17fe 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -30,6 +30,7 @@ import
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;
import org.apache.seatunnel.transform.sql.zeta.functions.CastFunction;
+import org.apache.seatunnel.transform.sql.zeta.functions.MapFunction;
import org.apache.commons.collections4.CollectionUtils;
@@ -416,6 +417,8 @@ public class ZetaSQLType {
return BasicType.DOUBLE_TYPE;
case ZetaSQLFunction.ARRAY:
return ArrayFunction.castArrayTypeMapping(function,
inputRowType);
+ case ZetaSQLFunction.MAP:
+ return MapFunction.castMapTypeMapping(function, inputRowType);
case ZetaSQLFunction.ARRAY_MAX:
case ZetaSQLFunction.ARRAY_MIN:
return ArrayFunction.getElementType(function, inputRowType);
@@ -521,15 +524,22 @@ public class ZetaSQLType {
}
private static List<Expression> getExpressions(Function function) {
- ExpressionList parameters = function.getParameters();
+ ExpressionList<Expression> parameters =
+ (ExpressionList<Expression>) function.getParameters();
if (parameters == null) {
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
function.getName() + " function requires at least one
parameter");
}
- List<Expression> expressions = parameters.getExpressions();
- if (expressions == null || expressions.isEmpty()) {
+ List<Expression> expressions = new ArrayList<>();
+ if (parameters != null) {
+ for (Expression expression : parameters) {
+ expressions.add(expression);
+ }
+ }
+
+ if (expressions.isEmpty()) {
throw new TransformException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
function.getName() + " function requires at least one
parameter");
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
index f901bd79ad..e95e7e4cc1 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
@@ -17,22 +17,16 @@
package org.apache.seatunnel.transform.sql.zeta.functions;
import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.transform.exception.TransformException;
-import net.sf.jsqlparser.expression.DoubleValue;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.Function;
-import net.sf.jsqlparser.expression.LongValue;
-import net.sf.jsqlparser.expression.NullValue;
-import net.sf.jsqlparser.expression.StringValue;
-import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
-import net.sf.jsqlparser.schema.Column;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -101,36 +95,27 @@ public class ArrayFunction {
}
public static ArrayType castArrayTypeMapping(Function function,
SeaTunnelRowType inputRowType) {
- return castArrayTypeMapping(getFunctionArgs(function, inputRowType));
- }
+ List<Expression> expressions = CommonFunction.getExpressions(function);
- public static ArrayType castArrayTypeMapping(List<Class<?>> args) {
- if (args == null || args.isEmpty()) {
+ if (expressions.isEmpty()) {
return ArrayType.STRING_ARRAY_TYPE;
}
- Class<?> arrayType = getClassType(args);
- return getSeaTunnelDataType(arrayType);
+ SeaTunnelDataType<?> elementType = null;
+ for (Expression expression : expressions) {
+ SeaTunnelDataType<?> t =
CommonFunction.resolveExpressionType(expression, inputRowType);
+ elementType = CommonFunction.unifyCollectionType(elementType, t);
+ }
+ if (elementType == null) {
+ elementType = BasicType.STRING_TYPE;
+ }
+ return createArrayType(elementType);
}
- private static ArrayType getSeaTunnelDataType(Class<?> clazz) {
- String className = clazz.getSimpleName();
- switch (className) {
- case "Integer":
- return ArrayType.INT_ARRAY_TYPE;
- case "Double":
- return ArrayType.DOUBLE_ARRAY_TYPE;
- case "Boolean":
- return ArrayType.BOOLEAN_ARRAY_TYPE;
- case "Long":
- return ArrayType.LONG_ARRAY_TYPE;
- case "float":
- return ArrayType.FLOAT_ARRAY_TYPE;
- case "short":
- return ArrayType.SHORT_ARRAY_TYPE;
- default:
- return ArrayType.STRING_ARRAY_TYPE;
- }
+ static ArrayType createArrayType(SeaTunnelDataType<?> elementType) {
+ if (elementType == BasicType.BYTE_TYPE || elementType ==
BasicType.VOID_TYPE)
+ return ArrayType.STRING_ARRAY_TYPE;
+ return ArrayType.of(elementType);
}
private static Class<?> getArrayType(Class<?> type1, Class<?> type2) {
@@ -173,21 +158,6 @@ public class ArrayFunction {
return String.class;
}
- private static Class<?> getClassType(List<Class<?>> args) {
- Class<?> arrayType = null;
- for (Class<?> obj : args) {
- if (obj == null) {
- continue;
- }
- if (arrayType == null) {
- arrayType = obj;
- } else {
- arrayType = getArrayType(arrayType, obj);
- }
- }
- return arrayType == null ? String.class : arrayType;
- }
-
private static Class<?> getDataClassType(List<Object> args) {
Class<?> arrayType = null;
for (Object obj : args) {
@@ -205,52 +175,13 @@ public class ArrayFunction {
public static SeaTunnelDataType<?> getElementType(
Function function, SeaTunnelRowType inputRowType) {
- String columnName =
function.getParameters().getExpressions().get(0).toString();
+ List<Expression> expressions = CommonFunction.getExpressions(function);
+ String columnName = expressions.get(0).toString();
int columnIndex = inputRowType.indexOf(columnName);
ArrayType arrayType = (ArrayType)
inputRowType.getFieldType(columnIndex);
return arrayType.getElementType();
}
- private static List<Class<?>> getFunctionArgs(
- Function function, SeaTunnelRowType inputRowType) {
- ExpressionList<Expression> expressionList =
- (ExpressionList<Expression>) function.getParameters();
- List<Class<?>> functionArgs = new ArrayList<>();
- if (expressionList != null) {
- for (Expression expression : expressionList.getExpressions()) {
- if (expression instanceof NullValue) {
- functionArgs.add(null);
- continue;
- }
- if (expression instanceof DoubleValue) {
- functionArgs.add(Double.class);
- continue;
- }
- if (expression instanceof Column) {
- int columnIndex = inputRowType.indexOf(((Column)
expression).getColumnName());
-
functionArgs.add(inputRowType.getFieldType(columnIndex).getTypeClass());
- continue;
- }
-
- if (expression instanceof LongValue) {
- long longVal = ((LongValue) expression).getValue();
- if (longVal <= Integer.MAX_VALUE && longVal >=
Integer.MIN_VALUE) {
- functionArgs.add(Integer.class);
- } else {
- functionArgs.add(Long.class);
- }
- continue;
- }
- if (expression instanceof StringValue) {
- functionArgs.add(String.class);
- continue;
- }
- throw new SeaTunnelException("unSupport expression: " +
expression.toString());
- }
- }
- return functionArgs;
- }
-
private static Object convertToType(Object obj, Class<?> targetType) {
if (obj == null || targetType.isInstance(obj)) {
return obj;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CommonFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CommonFunction.java
new file mode 100644
index 0000000000..b0d867a652
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CommonFunction.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import net.sf.jsqlparser.expression.DoubleValue;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.NullValue;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+import net.sf.jsqlparser.schema.Column;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommonFunction {
+ private CommonFunction() {}
+
+ public static SeaTunnelDataType resolveExpressionType(
+ Expression expression, SeaTunnelRowType rowType) {
+ if (expression instanceof NullValue) {
+ return null;
+ }
+ if (expression instanceof DoubleValue) {
+ return BasicType.DOUBLE_TYPE;
+ }
+ if (expression instanceof LongValue) {
+ long v = ((LongValue) expression).getValue();
+ if (v <= Integer.MAX_VALUE && v >= Integer.MIN_VALUE) {
+ return BasicType.INT_TYPE;
+ }
+ return BasicType.LONG_TYPE;
+ }
+ if (expression instanceof StringValue) {
+ return BasicType.STRING_TYPE;
+ }
+ if (expression instanceof Column) {
+ Column c = (Column) expression;
+ int idx = rowType.indexOf(c.getColumnName());
+ if (idx < 0) {
+ throw CommonError.illegalArgument(
+ "column not found: " + c.getColumnName(), "derive
expression type");
+ }
+ return rowType.getFieldType(idx);
+ }
+ if (expression instanceof Function) {
+ Function function = (Function) expression;
+ String name = function.getName();
+ if (name != null && "ARRAY".equalsIgnoreCase(name)) {
+ return ArrayFunction.castArrayTypeMapping(function, rowType);
+ }
+ if (name != null && "MAP".equalsIgnoreCase(name)) {
+ return MapFunction.castMapTypeMapping(function, rowType);
+ }
+ }
+ throw CommonError.unsupportedDataType(
+ "SeaTunnel", expression.getClass().getTypeName(),
expression.toString());
+ }
+
+ public static SeaTunnelDataType unifyCollectionType(
+ SeaTunnelDataType type1, SeaTunnelDataType type2) {
+ if (type1 == null || BasicType.VOID_TYPE.equals(type1)) return type2;
+ if (type2 == null || BasicType.VOID_TYPE.equals(type2)) return type1;
+
+ if (type1.equals(type2)) return type1;
+
+ if (isNumeric(type1) && isNumeric(type2)) {
+ return widenNumeric(type1, type2);
+ }
+
+ if (type1 instanceof ArrayType && type2 instanceof ArrayType) {
+ ArrayType at = (ArrayType) type1;
+ ArrayType bt = (ArrayType) type2;
+ SeaTunnelDataType ae = at.getElementType();
+ SeaTunnelDataType be = bt.getElementType();
+ SeaTunnelDataType ue = unifyCollectionType(ae, be);
+ return ArrayFunction.createArrayType(ue);
+ }
+
+ if (type1 instanceof MapType && type2 instanceof MapType) {
+ MapType map1 = (MapType) type1;
+ MapType map2 = (MapType) type2;
+ SeaTunnelDataType uk = unifyCollectionType(map1.getKeyType(),
map2.getKeyType());
+ SeaTunnelDataType uv = unifyCollectionType(map1.getValueType(),
map2.getValueType());
+ return new MapType<>(uk, uv);
+ }
+
+ return BasicType.STRING_TYPE;
+ }
+
+ public static boolean isNumeric(SeaTunnelDataType<?> type) {
+ return type == BasicType.BYTE_TYPE
+ || type == BasicType.SHORT_TYPE
+ || type == BasicType.INT_TYPE
+ || type == BasicType.LONG_TYPE
+ || type == BasicType.FLOAT_TYPE
+ || type == BasicType.DOUBLE_TYPE;
+ }
+
+ public static SeaTunnelDataType widenNumeric(SeaTunnelDataType type1,
SeaTunnelDataType type2) {
+ int rank1 = numericRank(type1);
+ int rank2 = numericRank(type2);
+ int max = Math.max(rank1, rank2);
+ switch (max) {
+ case 5:
+ return BasicType.DOUBLE_TYPE;
+ case 4:
+ return BasicType.FLOAT_TYPE;
+ case 3:
+ return BasicType.LONG_TYPE;
+ case 2:
+ return BasicType.INT_TYPE;
+ case 1:
+ return BasicType.SHORT_TYPE;
+ default:
+ return BasicType.BYTE_TYPE;
+ }
+ }
+
+ private static int numericRank(SeaTunnelDataType<?> type) {
+ if (type == BasicType.DOUBLE_TYPE) return 5;
+ if (type == BasicType.FLOAT_TYPE) return 4;
+ if (type == BasicType.LONG_TYPE) return 3;
+ if (type == BasicType.INT_TYPE) return 2;
+ if (type == BasicType.SHORT_TYPE) return 1;
+ return 0; // BYTE
+ }
+
+ public static List<Expression> getExpressions(Function function) {
+ ExpressionList<Expression> params = (ExpressionList<Expression>)
function.getParameters();
+ List<Expression> expressions = new ArrayList<>();
+ if (params != null) {
+ for (Expression expression : params) {
+ expressions.add(expression);
+ }
+ }
+ return expressions;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunction.java
new file mode 100644
index 0000000000..8dfd75ed30
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MapFunction {
+ private MapFunction() {}
+
+ public static Map<String, Object> map(List<Object> args) {
+ if (args == null || args.isEmpty()) {
+ return new LinkedHashMap<>();
+ }
+ if (args.size() % 2 != 0) {
+ throw CommonError.illegalArgument(
+ args.toString(), "MAP requires even number of arguments");
+ }
+ Map<String, Object> result = new LinkedHashMap<>(args.size() / 2);
+ for (int i = 0; i < args.size(); i += 2) {
+ Object keyObj = args.get(i);
+ Object val = args.get(i + 1);
+ if (keyObj == null) {
+ throw CommonError.illegalArgument(args.toString(), "MAP key
cannot be null");
+ }
+ String key = (keyObj instanceof String) ? (String) keyObj :
String.valueOf(keyObj);
+ result.put(key, val);
+ }
+ return result;
+ }
+
+ public static MapType castMapTypeMapping(Function function,
SeaTunnelRowType rowType) {
+ List<Expression> expressions = CommonFunction.getExpressions(function);
+ if (expressions.size() < 2 || (expressions.size() % 2 != 0)) {
+ throw CommonError.illegalArgument(
+ String.valueOf(expressions.size()),
+ "MAP requires even number of arguments >= 2");
+ }
+
+ SeaTunnelDataType keyType = null;
+ SeaTunnelDataType valType = null;
+ for (int i = 0; i < expressions.size(); i += 2) {
+ SeaTunnelDataType kt =
+ CommonFunction.resolveExpressionType(expressions.get(i),
rowType);
+ SeaTunnelDataType vt =
+ CommonFunction.resolveExpressionType(expressions.get(i +
1), rowType);
+ keyType = CommonFunction.unifyCollectionType(keyType, kt);
+ valType = CommonFunction.unifyCollectionType(valType, vt);
+ }
+ if (keyType == null) keyType = BasicType.STRING_TYPE;
+ if (valType == null) valType = BasicType.STRING_TYPE;
+ return new MapType<>(keyType, valType);
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLTypeNestedTypeTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLTypeNestedTypeTest.java
new file mode 100644
index 0000000000..99d9c96647
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLTypeNestedTypeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+class ZetaSQLTypeNestedTypeTest {
+
+ private static Function arr(Expression... expressions) {
+ Function function = new Function();
+ function.setName("ARRAY");
+ function.setParameters(new ExpressionList(Arrays.asList(expressions)));
+ return function;
+ }
+
+ private static Function map(Expression key, Expression value) {
+ Function function = new Function();
+ function.setName("MAP");
+ function.setParameters(new ExpressionList(Arrays.asList(key, value)));
+ return function;
+ }
+
+ private ZetaSQLType zeta() {
+ SeaTunnelRowType rowType =
+ new SeaTunnelRowType(
+ new String[] {"col"}, new SeaTunnelDataType[]
{BasicType.STRING_TYPE});
+ return new ZetaSQLType(rowType, Collections.emptyList());
+ }
+
+ @Test
+ void testArrayOfArrayTypePreserved() {
+ // ARRAY(ARRAY(1,2), ARRAY(3,4))
+ Function inner1 = arr(new LongValue(1), new LongValue(2));
+ Function inner2 = arr(new LongValue(3), new LongValue(4));
+ Function outer = arr(inner1, inner2);
+
+ SeaTunnelDataType type = zeta().getExpressionType(outer);
+ Assertions.assertEquals(ArrayType.of(ArrayType.INT_ARRAY_TYPE), type);
+ }
+
+ @Test
+ void testArrayOfMapTypePreserved() {
+ // ARRAY(MAP('k',1), MAP('k2',2))
+ Function map1 = map(new StringValue("k"), new LongValue(1));
+ Function map2 = map(new StringValue("k2"), new LongValue(2));
+ Function outer = arr(map1, map2);
+
+ SeaTunnelDataType type = zeta().getExpressionType(outer);
+ Assertions.assertEquals(
+ ArrayType.of(new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE)), type);
+ }
+
+ @Test
+ void testMapOfArrayTypePreserved() {
+ // MAP('k', ARRAY(1,2))
+ Function valueArr = arr(new LongValue(1), new LongValue(2));
+ Function map = map(new StringValue("k"), valueArr);
+
+ SeaTunnelDataType type = zeta().getExpressionType(map);
+ Assertions.assertEquals(
+ new MapType<>(BasicType.STRING_TYPE,
ArrayType.INT_ARRAY_TYPE), type);
+ }
+
+ @Test
+ void testMapOfMapTypePreserved() {
+ // MAP('k', Map('k',2))
+ Function innerMap = map(new StringValue("k"), new LongValue(2));
+ Function outerMap = map(new StringValue("k"), innerMap);
+
+ SeaTunnelDataType type = zeta().getExpressionType(outerMap);
+ Assertions.assertEquals(
+ new MapType<>(
+ BasicType.STRING_TYPE,
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.INT_TYPE)),
+ type);
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunctionTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunctionTest.java
new file mode 100644
index 0000000000..a36f7d5eac
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunctionTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.seatunnel.transform.sql.SQLEngineFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+class ArrayFunctionTest {
+ private SQLEngine zeta() {
+ return SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+ }
+
+ private SeaTunnelRowType dummyInputType() {
+ return new SeaTunnelRowType(
+ new String[] {"dummy"}, new SeaTunnelDataType[]
{BasicType.INT_TYPE});
+ }
+
+ private SeaTunnelRow dummyRow() {
+ return new SeaTunnelRow(new Object[] {1});
+ }
+
+ @Test
+ void testNestedArrayEvaluateWithSQLEngine() {
+ SQLEngine sql = zeta();
+ SeaTunnelRowType inType = dummyInputType();
+
+ String sqlText = "select ARRAY(ARRAY(1,2), ARRAY(3,4)) as a from test";
+ sql.init("test", null, inType, sqlText);
+
+ List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+ Assertions.assertEquals(1, out.size());
+
+ Object field0 = out.get(0).getField(0);
+ Assertions.assertTrue(field0 instanceof Object[], "outer should be
array");
+ Object[] outer = (Object[]) field0;
+ Assertions.assertEquals(2, outer.length);
+
+ Assertions.assertTrue(outer[0] instanceof Object[], "inner[0] should
be array");
+ Assertions.assertTrue(outer[1] instanceof Object[], "inner[1] should
be array");
+
+ Object[] inner1 = (Object[]) outer[0];
+ Object[] inner2 = (Object[]) outer[1];
+ Assertions.assertEquals(2, inner1.length);
+ Assertions.assertEquals(2, inner2.length);
+
+ Assertions.assertEquals(1, ((Number) inner1[0]).intValue());
+ Assertions.assertEquals(2, ((Number) inner1[1]).intValue());
+ Assertions.assertEquals(3, ((Number) inner2[0]).intValue());
+ Assertions.assertEquals(4, ((Number) inner2[1]).intValue());
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunctionTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunctionTest.java
new file mode 100644
index 0000000000..805cf7e8fe
--- /dev/null
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunctionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.seatunnel.transform.sql.SQLEngineFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+class MapFunctionTest {
+ private SQLEngine zeta() {
+ return SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+ }
+
+ private SeaTunnelRowType dummyInputType() {
+ return new SeaTunnelRowType(
+ new String[] {"dummy"}, new SeaTunnelDataType[]
{BasicType.INT_TYPE});
+ }
+
+ private SeaTunnelRow dummyRow() {
+ return new SeaTunnelRow(new Object[] {1});
+ }
+
+ @Test
+ void testNestedMapLiteralEvaluation() {
+ SQLEngine sql = zeta();
+ SeaTunnelRowType inType = dummyInputType();
+
+ String sqlText =
+ "select "
+ + " MAP('k1', MAP('a', 1, 'b', 2), 'k2', MAP('c', 3,
'd', 4)) as m1 "
+ + "from test";
+
+ sql.init("test", null, inType, sqlText);
+
+ List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+ Assertions.assertEquals(1, out.size());
+
+ Map m1 = (Map) out.get(0).getField(0);
+ Assertions.assertNotNull(m1);
+
+ Map k1 = (Map) m1.get("k1");
+ Map k2 = (Map) m1.get("k2");
+ Assertions.assertNotNull(k1);
+ Assertions.assertNotNull(k2);
+
+ Assertions.assertEquals(1, ((Number) k1.get("a")).intValue());
+ Assertions.assertEquals(2, ((Number) k1.get("b")).intValue());
+ Assertions.assertEquals(3, ((Number) k2.get("c")).intValue());
+ Assertions.assertEquals(4, ((Number) k2.get("d")).intValue());
+ }
+
+ @Test
+ void testMapWithArrayValues() {
+ SQLEngine sql = zeta();
+ SeaTunnelRowType inType = dummyInputType();
+
+ String sqlText =
+ "select " + " MAP('x', ARRAY(1,2,3), 'y', ARRAY(4,5)) as m2 "
+ "from test";
+
+ sql.init("test", null, inType, sqlText);
+
+ List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+ Assertions.assertEquals(1, out.size());
+
+ Map m2 = (Map) out.get(0).getField(0);
+ Assertions.assertNotNull(m2);
+
+ Object[] x = (Object[]) m2.get("x");
+ Object[] y = (Object[]) m2.get("y");
+ Assertions.assertArrayEquals(
+ new int[] {1, 2, 3},
+ new int[] {
+ ((Number) x[0]).intValue(),
+ ((Number) x[1]).intValue(),
+ ((Number) x[2]).intValue()
+ });
+ Assertions.assertArrayEquals(
+ new int[] {4, 5},
+ new int[] {((Number) y[0]).intValue(), ((Number)
y[1]).intValue()});
+ }
+
+ @Test
+ void testArrayOfMapLiterals() {
+ SQLEngine sql = zeta();
+ SeaTunnelRowType inType = dummyInputType();
+
+ String sqlText = "select " + " ARRAY(MAP('aa', 10), MAP('bb', 20)) as
a1 " + "from test";
+
+ sql.init("test", null, inType, sqlText);
+
+ List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+ Assertions.assertEquals(1, out.size());
+
+ Object[] a1 = (Object[]) out.get(0).getField(0);
+ Assertions.assertEquals(2, a1.length);
+
+ Map m0 = (Map) a1[0];
+ Map m1 = (Map) a1[1];
+ Assertions.assertEquals(10, ((Number) m0.get("aa")).intValue());
+ Assertions.assertEquals(20, ((Number) m1.get("bb")).intValue());
+ }
+}