This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a2e075aa44 [Feature][Core] Support define any nested array and map 
type in zeta (#9881)
a2e075aa44 is described below

commit a2e075aa446bc92e01a45b329a4032a01d2d1904
Author: dy102 <[email protected]>
AuthorDate: Tue Oct 14 10:59:09 2025 +0900

    [Feature][Core] Support define any nested array and map type in zeta (#9881)
    
    Co-authored-by: Jia Fan <[email protected]>
---
 .../apache/seatunnel/api/table/type/ArrayType.java |  17 +++
 .../seatunnel/api/table/type/SeaTunnelRow.java     |  13 ++
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |  11 ++
 .../test/resources/sql_transform/nested_type.conf  |  57 ++++++++
 .../transform/sql/zeta/ZetaSQLFunction.java        |   4 +
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |  16 +-
 .../sql/zeta/functions/ArrayFunction.java          | 105 +++----------
 .../sql/zeta/functions/CommonFunction.java         | 162 +++++++++++++++++++++
 .../transform/sql/zeta/functions/MapFunction.java  |  79 ++++++++++
 .../sql/zeta/ZetaSQLTypeNestedTypeTest.java        | 108 ++++++++++++++
 .../sql/zeta/functions/ArrayFunctionTest.java      |  75 ++++++++++
 .../sql/zeta/functions/MapFunctionTest.java        | 126 ++++++++++++++++
 12 files changed, 683 insertions(+), 90 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
index 65f7651e79..63e6a7bcf0 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/ArrayType.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.api.table.type;
 
+import org.apache.seatunnel.common.exception.CommonError;
+
+import java.lang.reflect.Array;
 import java.util.Objects;
 
 public class ArrayType<T, E> implements SeaTunnelDataType<T> {
@@ -61,6 +64,20 @@ public class ArrayType<T, E> implements SeaTunnelDataType<T> 
{
         this.elementType = elementType;
     }
 
+    @SuppressWarnings("unchecked")
+    public static <E> ArrayType<E[], E> of(SeaTunnelDataType<E> elementType) {
+        if (elementType == null) {
+            throw CommonError.illegalArgument("elementType is null", "create 
ArrayType");
+        }
+        Class<E[]> arrayClass = (Class<E[]>) toArrayClass(elementType);
+        return new ArrayType<>(arrayClass, elementType);
+    }
+
+    private static Class<?> toArrayClass(SeaTunnelDataType<?> elementType) {
+        Class<?> elementClass = elementType.getTypeClass();
+        return Array.newInstance(elementClass, 0).getClass();
+    }
+
     public SeaTunnelDataType<E> getElementType() {
         return elementType;
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index ed44437b82..64a8ee94f7 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -233,6 +233,12 @@ public final class SeaTunnelRow implements Serializable {
                 return getArrayNotNullSize((Long[]) v) * 8;
             case DOUBLE:
                 return getArrayNotNullSize((Double[]) v) * 8;
+            case ARRAY:
+                int total = 0;
+                for (Object elem : (Object[]) v) {
+                    total += getBytesForValue(elem, dataType);
+                }
+                return total;
             case MAP:
                 return getArrayMapNotNullSize(v);
             case NULL:
@@ -342,6 +348,13 @@ public final class SeaTunnelRow implements Serializable {
                 }
                 return rowSize;
             default:
+                if (v.getClass().isArray() && v instanceof Object[]) {
+                    int sum = 0;
+                    for (Object o : (Object[]) v) {
+                        sum += getBytesForValue(o);
+                    }
+                    return sum;
+                }
                 if (v instanceof Map) {
                     int mapSize = 0;
                     for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 6d9dd11856..5b04022a67 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -115,4 +115,15 @@ public class TestSQLIT extends TestSuiteBase {
                 container.executeJob("/sql_transform/inner_query.conf");
         Assertions.assertEquals(0, innerQuerySql.getExitCode());
     }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Spark and Flink translation has some issue on 
nested type")
+    public void testNestedType(TestContainer container) throws IOException, 
InterruptedException {
+        Container.ExecResult nestedTypeSql =
+                container.executeJob("/sql_transform/nested_type.conf");
+        Assertions.assertEquals(0, nestedTypeSql.getExitCode());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/nested_type.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/nested_type.conf
new file mode 100644
index 0000000000..7db99cd4c5
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/nested_type.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    row.num = 1
+    string.template = ["nestedType"]
+    schema = {
+      fields {
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+  Sql {
+    plugin_input = "fake"
+    plugin_output = "tmp_nested"
+    query = """
+      select
+        ARRAY(ARRAY(ARRAY(1,2,3), ARRAY(4,5,6), ARRAY(ARRAY(1,2,3), 
ARRAY(4,5,6)), ARRAY(3, 4))) as arr_of_arr,
+        MAP('k', MAP('k', MAP('k', 1))) as map_of_map,
+        ARRAY(MAP('k', 1), MAP('k2', ARRAY(1, 2))) as arr_of_map,
+        MAP('k', ARRAY(1, 2)) as map_of_arr
+      from dual
+    """
+  }
+}
+
+sink {
+  Console {
+    plugin_input = "tmp_nested"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 1f2334ac6a..79e5c3b1ec 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -33,6 +33,7 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.transform.exception.TransformException;
 import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction;
+import org.apache.seatunnel.transform.sql.zeta.functions.MapFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.NumericFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.StringFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.SystemFunction;
@@ -195,6 +196,7 @@ public class ZetaSQLFunction {
     // -------------------------lateralView 
functions----------------------------
     public static final String EXPLODE = "EXPLODE";
     public static final String ARRAY = "ARRAY";
+    public static final String MAP = "MAP";
 
     // -------------------------system functions----------------------------
     public static final String COALESCE = "COALESCE";
@@ -619,6 +621,8 @@ public class ZetaSQLFunction {
                 return ArrayFunction.arrayMax(args);
             case ARRAY_MIN:
                 return ArrayFunction.arrayMin(args);
+            case MAP:
+                return MapFunction.map(args);
             case UUID:
                 return randomUUID().toString();
             case COSINE_DISTANCE:
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index 5300e62e08..73c92a17fe 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -30,6 +30,7 @@ import 
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.transform.exception.TransformException;
 import org.apache.seatunnel.transform.sql.zeta.functions.ArrayFunction;
 import org.apache.seatunnel.transform.sql.zeta.functions.CastFunction;
+import org.apache.seatunnel.transform.sql.zeta.functions.MapFunction;
 
 import org.apache.commons.collections4.CollectionUtils;
 
@@ -416,6 +417,8 @@ public class ZetaSQLType {
                 return BasicType.DOUBLE_TYPE;
             case ZetaSQLFunction.ARRAY:
                 return ArrayFunction.castArrayTypeMapping(function, 
inputRowType);
+            case ZetaSQLFunction.MAP:
+                return MapFunction.castMapTypeMapping(function, inputRowType);
             case ZetaSQLFunction.ARRAY_MAX:
             case ZetaSQLFunction.ARRAY_MIN:
                 return ArrayFunction.getElementType(function, inputRowType);
@@ -521,15 +524,22 @@ public class ZetaSQLType {
     }
 
     private static List<Expression> getExpressions(Function function) {
-        ExpressionList parameters = function.getParameters();
+        ExpressionList<Expression> parameters =
+                (ExpressionList<Expression>) function.getParameters();
         if (parameters == null) {
             throw new TransformException(
                     CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                     function.getName() + " function requires at least one 
parameter");
         }
 
-        List<Expression> expressions = parameters.getExpressions();
-        if (expressions == null || expressions.isEmpty()) {
+        List<Expression> expressions = new ArrayList<>();
+        if (parameters != null) {
+            for (Expression expression : parameters) {
+                expressions.add(expression);
+            }
+        }
+
+        if (expressions.isEmpty()) {
             throw new TransformException(
                     CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                     function.getName() + " function requires at least one 
parameter");
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
index f901bd79ad..e95e7e4cc1 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
@@ -17,22 +17,16 @@
 package org.apache.seatunnel.transform.sql.zeta.functions;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.transform.exception.TransformException;
 
-import net.sf.jsqlparser.expression.DoubleValue;
 import net.sf.jsqlparser.expression.Expression;
 import net.sf.jsqlparser.expression.Function;
-import net.sf.jsqlparser.expression.LongValue;
-import net.sf.jsqlparser.expression.NullValue;
-import net.sf.jsqlparser.expression.StringValue;
-import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
-import net.sf.jsqlparser.schema.Column;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
@@ -101,36 +95,27 @@ public class ArrayFunction {
     }
 
     public static ArrayType castArrayTypeMapping(Function function, 
SeaTunnelRowType inputRowType) {
-        return castArrayTypeMapping(getFunctionArgs(function, inputRowType));
-    }
+        List<Expression> expressions = CommonFunction.getExpressions(function);
 
-    public static ArrayType castArrayTypeMapping(List<Class<?>> args) {
-        if (args == null || args.isEmpty()) {
+        if (expressions.isEmpty()) {
             return ArrayType.STRING_ARRAY_TYPE;
         }
 
-        Class<?> arrayType = getClassType(args);
-        return getSeaTunnelDataType(arrayType);
+        SeaTunnelDataType<?> elementType = null;
+        for (Expression expression : expressions) {
+            SeaTunnelDataType<?> t = 
CommonFunction.resolveExpressionType(expression, inputRowType);
+            elementType = CommonFunction.unifyCollectionType(elementType, t);
+        }
+        if (elementType == null) {
+            elementType = BasicType.STRING_TYPE;
+        }
+        return createArrayType(elementType);
     }
 
-    private static ArrayType getSeaTunnelDataType(Class<?> clazz) {
-        String className = clazz.getSimpleName();
-        switch (className) {
-            case "Integer":
-                return ArrayType.INT_ARRAY_TYPE;
-            case "Double":
-                return ArrayType.DOUBLE_ARRAY_TYPE;
-            case "Boolean":
-                return ArrayType.BOOLEAN_ARRAY_TYPE;
-            case "Long":
-                return ArrayType.LONG_ARRAY_TYPE;
-            case "float":
-                return ArrayType.FLOAT_ARRAY_TYPE;
-            case "short":
-                return ArrayType.SHORT_ARRAY_TYPE;
-            default:
-                return ArrayType.STRING_ARRAY_TYPE;
-        }
+    static ArrayType createArrayType(SeaTunnelDataType<?> elementType) {
+        if (elementType == BasicType.BYTE_TYPE || elementType == 
BasicType.VOID_TYPE)
+            return ArrayType.STRING_ARRAY_TYPE;
+        return ArrayType.of(elementType);
     }
 
     private static Class<?> getArrayType(Class<?> type1, Class<?> type2) {
@@ -173,21 +158,6 @@ public class ArrayFunction {
         return String.class;
     }
 
-    private static Class<?> getClassType(List<Class<?>> args) {
-        Class<?> arrayType = null;
-        for (Class<?> obj : args) {
-            if (obj == null) {
-                continue;
-            }
-            if (arrayType == null) {
-                arrayType = obj;
-            } else {
-                arrayType = getArrayType(arrayType, obj);
-            }
-        }
-        return arrayType == null ? String.class : arrayType;
-    }
-
     private static Class<?> getDataClassType(List<Object> args) {
         Class<?> arrayType = null;
         for (Object obj : args) {
@@ -205,52 +175,13 @@ public class ArrayFunction {
 
     public static SeaTunnelDataType<?> getElementType(
             Function function, SeaTunnelRowType inputRowType) {
-        String columnName = 
function.getParameters().getExpressions().get(0).toString();
+        List<Expression> expressions = CommonFunction.getExpressions(function);
+        String columnName = expressions.get(0).toString();
         int columnIndex = inputRowType.indexOf(columnName);
         ArrayType arrayType = (ArrayType) 
inputRowType.getFieldType(columnIndex);
         return arrayType.getElementType();
     }
 
-    private static List<Class<?>> getFunctionArgs(
-            Function function, SeaTunnelRowType inputRowType) {
-        ExpressionList<Expression> expressionList =
-                (ExpressionList<Expression>) function.getParameters();
-        List<Class<?>> functionArgs = new ArrayList<>();
-        if (expressionList != null) {
-            for (Expression expression : expressionList.getExpressions()) {
-                if (expression instanceof NullValue) {
-                    functionArgs.add(null);
-                    continue;
-                }
-                if (expression instanceof DoubleValue) {
-                    functionArgs.add(Double.class);
-                    continue;
-                }
-                if (expression instanceof Column) {
-                    int columnIndex = inputRowType.indexOf(((Column) 
expression).getColumnName());
-                    
functionArgs.add(inputRowType.getFieldType(columnIndex).getTypeClass());
-                    continue;
-                }
-
-                if (expression instanceof LongValue) {
-                    long longVal = ((LongValue) expression).getValue();
-                    if (longVal <= Integer.MAX_VALUE && longVal >= 
Integer.MIN_VALUE) {
-                        functionArgs.add(Integer.class);
-                    } else {
-                        functionArgs.add(Long.class);
-                    }
-                    continue;
-                }
-                if (expression instanceof StringValue) {
-                    functionArgs.add(String.class);
-                    continue;
-                }
-                throw new SeaTunnelException("unSupport expression: " + 
expression.toString());
-            }
-        }
-        return functionArgs;
-    }
-
     private static Object convertToType(Object obj, Class<?> targetType) {
         if (obj == null || targetType.isInstance(obj)) {
             return obj;
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CommonFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CommonFunction.java
new file mode 100644
index 0000000000..b0d867a652
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/CommonFunction.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import net.sf.jsqlparser.expression.DoubleValue;
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.NullValue;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+import net.sf.jsqlparser.schema.Column;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CommonFunction {
+    private CommonFunction() {}
+
+    public static SeaTunnelDataType resolveExpressionType(
+            Expression expression, SeaTunnelRowType rowType) {
+        if (expression instanceof NullValue) {
+            return null;
+        }
+        if (expression instanceof DoubleValue) {
+            return BasicType.DOUBLE_TYPE;
+        }
+        if (expression instanceof LongValue) {
+            long v = ((LongValue) expression).getValue();
+            if (v <= Integer.MAX_VALUE && v >= Integer.MIN_VALUE) {
+                return BasicType.INT_TYPE;
+            }
+            return BasicType.LONG_TYPE;
+        }
+        if (expression instanceof StringValue) {
+            return BasicType.STRING_TYPE;
+        }
+        if (expression instanceof Column) {
+            Column c = (Column) expression;
+            int idx = rowType.indexOf(c.getColumnName());
+            if (idx < 0) {
+                throw CommonError.illegalArgument(
+                        "column not found: " + c.getColumnName(), "derive 
expression type");
+            }
+            return rowType.getFieldType(idx);
+        }
+        if (expression instanceof Function) {
+            Function function = (Function) expression;
+            String name = function.getName();
+            if (name != null && "ARRAY".equalsIgnoreCase(name)) {
+                return ArrayFunction.castArrayTypeMapping(function, rowType);
+            }
+            if (name != null && "MAP".equalsIgnoreCase(name)) {
+                return MapFunction.castMapTypeMapping(function, rowType);
+            }
+        }
+        throw CommonError.unsupportedDataType(
+                "SeaTunnel", expression.getClass().getTypeName(), 
expression.toString());
+    }
+
+    public static SeaTunnelDataType unifyCollectionType(
+            SeaTunnelDataType type1, SeaTunnelDataType type2) {
+        if (type1 == null || BasicType.VOID_TYPE.equals(type1)) return type2;
+        if (type2 == null || BasicType.VOID_TYPE.equals(type2)) return type1;
+
+        if (type1.equals(type2)) return type1;
+
+        if (isNumeric(type1) && isNumeric(type2)) {
+            return widenNumeric(type1, type2);
+        }
+
+        if (type1 instanceof ArrayType && type2 instanceof ArrayType) {
+            ArrayType at = (ArrayType) type1;
+            ArrayType bt = (ArrayType) type2;
+            SeaTunnelDataType ae = at.getElementType();
+            SeaTunnelDataType be = bt.getElementType();
+            SeaTunnelDataType ue = unifyCollectionType(ae, be);
+            return ArrayFunction.createArrayType(ue);
+        }
+
+        if (type1 instanceof MapType && type2 instanceof MapType) {
+            MapType map1 = (MapType) type1;
+            MapType map2 = (MapType) type2;
+            SeaTunnelDataType uk = unifyCollectionType(map1.getKeyType(), 
map2.getKeyType());
+            SeaTunnelDataType uv = unifyCollectionType(map1.getValueType(), 
map2.getValueType());
+            return new MapType<>(uk, uv);
+        }
+
+        return BasicType.STRING_TYPE;
+    }
+
+    public static boolean isNumeric(SeaTunnelDataType<?> type) {
+        return type == BasicType.BYTE_TYPE
+                || type == BasicType.SHORT_TYPE
+                || type == BasicType.INT_TYPE
+                || type == BasicType.LONG_TYPE
+                || type == BasicType.FLOAT_TYPE
+                || type == BasicType.DOUBLE_TYPE;
+    }
+
+    public static SeaTunnelDataType widenNumeric(SeaTunnelDataType type1, 
SeaTunnelDataType type2) {
+        int rank1 = numericRank(type1);
+        int rank2 = numericRank(type2);
+        int max = Math.max(rank1, rank2);
+        switch (max) {
+            case 5:
+                return BasicType.DOUBLE_TYPE;
+            case 4:
+                return BasicType.FLOAT_TYPE;
+            case 3:
+                return BasicType.LONG_TYPE;
+            case 2:
+                return BasicType.INT_TYPE;
+            case 1:
+                return BasicType.SHORT_TYPE;
+            default:
+                return BasicType.BYTE_TYPE;
+        }
+    }
+
+    private static int numericRank(SeaTunnelDataType<?> type) {
+        if (type == BasicType.DOUBLE_TYPE) return 5;
+        if (type == BasicType.FLOAT_TYPE) return 4;
+        if (type == BasicType.LONG_TYPE) return 3;
+        if (type == BasicType.INT_TYPE) return 2;
+        if (type == BasicType.SHORT_TYPE) return 1;
+        return 0; // BYTE
+    }
+
+    public static List<Expression> getExpressions(Function function) {
+        ExpressionList<Expression> params = (ExpressionList<Expression>) 
function.getParameters();
+        List<Expression> expressions = new ArrayList<>();
+        if (params != null) {
+            for (Expression expression : params) {
+                expressions.add(expression);
+            }
+        }
+        return expressions;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunction.java
new file mode 100644
index 0000000000..8dfd75ed30
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MapFunction {
+    private MapFunction() {}
+
+    public static Map<String, Object> map(List<Object> args) {
+        if (args == null || args.isEmpty()) {
+            return new LinkedHashMap<>();
+        }
+        if (args.size() % 2 != 0) {
+            throw CommonError.illegalArgument(
+                    args.toString(), "MAP requires even number of arguments");
+        }
+        Map<String, Object> result = new LinkedHashMap<>(args.size() / 2);
+        for (int i = 0; i < args.size(); i += 2) {
+            Object keyObj = args.get(i);
+            Object val = args.get(i + 1);
+            if (keyObj == null) {
+                throw CommonError.illegalArgument(args.toString(), "MAP key 
cannot be null");
+            }
+            String key = (keyObj instanceof String) ? (String) keyObj : 
String.valueOf(keyObj);
+            result.put(key, val);
+        }
+        return result;
+    }
+
+    public static MapType castMapTypeMapping(Function function, 
SeaTunnelRowType rowType) {
+        List<Expression> expressions = CommonFunction.getExpressions(function);
+        if (expressions.size() < 2 || (expressions.size() % 2 != 0)) {
+            throw CommonError.illegalArgument(
+                    String.valueOf(expressions.size()),
+                    "MAP requires even number of arguments >= 2");
+        }
+
+        SeaTunnelDataType keyType = null;
+        SeaTunnelDataType valType = null;
+        for (int i = 0; i < expressions.size(); i += 2) {
+            SeaTunnelDataType kt =
+                    CommonFunction.resolveExpressionType(expressions.get(i), 
rowType);
+            SeaTunnelDataType vt =
+                    CommonFunction.resolveExpressionType(expressions.get(i + 
1), rowType);
+            keyType = CommonFunction.unifyCollectionType(keyType, kt);
+            valType = CommonFunction.unifyCollectionType(valType, vt);
+        }
+        if (keyType == null) keyType = BasicType.STRING_TYPE;
+        if (valType == null) valType = BasicType.STRING_TYPE;
+        return new MapType<>(keyType, valType);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLTypeNestedTypeTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLTypeNestedTypeTest.java
new file mode 100644
index 0000000000..99d9c96647
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLTypeNestedTypeTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import net.sf.jsqlparser.expression.Expression;
+import net.sf.jsqlparser.expression.Function;
+import net.sf.jsqlparser.expression.LongValue;
+import net.sf.jsqlparser.expression.StringValue;
+import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+class ZetaSQLTypeNestedTypeTest {
+
+    private static Function arr(Expression... expressions) {
+        Function function = new Function();
+        function.setName("ARRAY");
+        function.setParameters(new ExpressionList(Arrays.asList(expressions)));
+        return function;
+    }
+
+    private static Function map(Expression key, Expression value) {
+        Function function = new Function();
+        function.setName("MAP");
+        function.setParameters(new ExpressionList(Arrays.asList(key, value)));
+        return function;
+    }
+
+    private ZetaSQLType zeta() {
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"col"}, new SeaTunnelDataType[] 
{BasicType.STRING_TYPE});
+        return new ZetaSQLType(rowType, Collections.emptyList());
+    }
+
+    @Test
+    void testArrayOfArrayTypePreserved() {
+        // ARRAY(ARRAY(1,2), ARRAY(3,4))
+        Function inner1 = arr(new LongValue(1), new LongValue(2));
+        Function inner2 = arr(new LongValue(3), new LongValue(4));
+        Function outer = arr(inner1, inner2);
+
+        SeaTunnelDataType type = zeta().getExpressionType(outer);
+        Assertions.assertEquals(ArrayType.of(ArrayType.INT_ARRAY_TYPE), type);
+    }
+
+    @Test
+    void testArrayOfMapTypePreserved() {
+        // ARRAY(MAP('k',1), MAP('k2',2))
+        Function map1 = map(new StringValue("k"), new LongValue(1));
+        Function map2 = map(new StringValue("k2"), new LongValue(2));
+        Function outer = arr(map1, map2);
+
+        SeaTunnelDataType type = zeta().getExpressionType(outer);
+        Assertions.assertEquals(
+                ArrayType.of(new MapType<>(BasicType.STRING_TYPE, 
BasicType.INT_TYPE)), type);
+    }
+
+    @Test
+    void testMapOfArrayTypePreserved() {
+        // MAP('k', ARRAY(1,2))
+        Function valueArr = arr(new LongValue(1), new LongValue(2));
+        Function map = map(new StringValue("k"), valueArr);
+
+        SeaTunnelDataType type = zeta().getExpressionType(map);
+        Assertions.assertEquals(
+                new MapType<>(BasicType.STRING_TYPE, 
ArrayType.INT_ARRAY_TYPE), type);
+    }
+
+    @Test
+    void testMapOfMapTypePreserved() {
+        // MAP('k', Map('k',2))
+        Function innerMap = map(new StringValue("k"), new LongValue(2));
+        Function outerMap = map(new StringValue("k"), innerMap);
+
+        SeaTunnelDataType type = zeta().getExpressionType(outerMap);
+        Assertions.assertEquals(
+                new MapType<>(
+                        BasicType.STRING_TYPE,
+                        new MapType<>(BasicType.STRING_TYPE, 
BasicType.INT_TYPE)),
+                type);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunctionTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunctionTest.java
new file mode 100644
index 0000000000..a36f7d5eac
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunctionTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.seatunnel.transform.sql.SQLEngineFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+class ArrayFunctionTest {
+    private SQLEngine zeta() {
+        return SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+    }
+
+    private SeaTunnelRowType dummyInputType() {
+        return new SeaTunnelRowType(
+                new String[] {"dummy"}, new SeaTunnelDataType[] 
{BasicType.INT_TYPE});
+    }
+
+    private SeaTunnelRow dummyRow() {
+        return new SeaTunnelRow(new Object[] {1});
+    }
+
+    @Test
+    void testNestedArrayEvaluateWithSQLEngine() {
+        SQLEngine sql = zeta();
+        SeaTunnelRowType inType = dummyInputType();
+
+        String sqlText = "select ARRAY(ARRAY(1,2), ARRAY(3,4)) as a from test";
+        sql.init("test", null, inType, sqlText);
+
+        List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+        Assertions.assertEquals(1, out.size());
+
+        Object field0 = out.get(0).getField(0);
+        Assertions.assertTrue(field0 instanceof Object[], "outer should be 
array");
+        Object[] outer = (Object[]) field0;
+        Assertions.assertEquals(2, outer.length);
+
+        Assertions.assertTrue(outer[0] instanceof Object[], "inner[0] should 
be array");
+        Assertions.assertTrue(outer[1] instanceof Object[], "inner[1] should 
be array");
+
+        Object[] inner1 = (Object[]) outer[0];
+        Object[] inner2 = (Object[]) outer[1];
+        Assertions.assertEquals(2, inner1.length);
+        Assertions.assertEquals(2, inner2.length);
+
+        Assertions.assertEquals(1, ((Number) inner1[0]).intValue());
+        Assertions.assertEquals(2, ((Number) inner1[1]).intValue());
+        Assertions.assertEquals(3, ((Number) inner2[0]).intValue());
+        Assertions.assertEquals(4, ((Number) inner2[1]).intValue());
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunctionTest.java
 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunctionTest.java
new file mode 100644
index 0000000000..805cf7e8fe
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/functions/MapFunctionTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta.functions;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.transform.sql.SQLEngine;
+import org.apache.seatunnel.transform.sql.SQLEngineFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+class MapFunctionTest {
+    private SQLEngine zeta() {
+        return SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);
+    }
+
+    private SeaTunnelRowType dummyInputType() {
+        return new SeaTunnelRowType(
+                new String[] {"dummy"}, new SeaTunnelDataType[] 
{BasicType.INT_TYPE});
+    }
+
+    private SeaTunnelRow dummyRow() {
+        return new SeaTunnelRow(new Object[] {1});
+    }
+
+    @Test
+    void testNestedMapLiteralEvaluation() {
+        SQLEngine sql = zeta();
+        SeaTunnelRowType inType = dummyInputType();
+
+        String sqlText =
+                "select "
+                        + "  MAP('k1', MAP('a', 1, 'b', 2), 'k2', MAP('c', 3, 
'd', 4)) as m1 "
+                        + "from test";
+
+        sql.init("test", null, inType, sqlText);
+
+        List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+        Assertions.assertEquals(1, out.size());
+
+        Map m1 = (Map) out.get(0).getField(0);
+        Assertions.assertNotNull(m1);
+
+        Map k1 = (Map) m1.get("k1");
+        Map k2 = (Map) m1.get("k2");
+        Assertions.assertNotNull(k1);
+        Assertions.assertNotNull(k2);
+
+        Assertions.assertEquals(1, ((Number) k1.get("a")).intValue());
+        Assertions.assertEquals(2, ((Number) k1.get("b")).intValue());
+        Assertions.assertEquals(3, ((Number) k2.get("c")).intValue());
+        Assertions.assertEquals(4, ((Number) k2.get("d")).intValue());
+    }
+
+    @Test
+    void testMapWithArrayValues() {
+        SQLEngine sql = zeta();
+        SeaTunnelRowType inType = dummyInputType();
+
+        String sqlText =
+                "select " + "  MAP('x', ARRAY(1,2,3), 'y', ARRAY(4,5)) as m2 " 
+ "from test";
+
+        sql.init("test", null, inType, sqlText);
+
+        List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+        Assertions.assertEquals(1, out.size());
+
+        Map m2 = (Map) out.get(0).getField(0);
+        Assertions.assertNotNull(m2);
+
+        Object[] x = (Object[]) m2.get("x");
+        Object[] y = (Object[]) m2.get("y");
+        Assertions.assertArrayEquals(
+                new int[] {1, 2, 3},
+                new int[] {
+                    ((Number) x[0]).intValue(),
+                    ((Number) x[1]).intValue(),
+                    ((Number) x[2]).intValue()
+                });
+        Assertions.assertArrayEquals(
+                new int[] {4, 5},
+                new int[] {((Number) y[0]).intValue(), ((Number) 
y[1]).intValue()});
+    }
+
+    @Test
+    void testArrayOfMapLiterals() {
+        SQLEngine sql = zeta();
+        SeaTunnelRowType inType = dummyInputType();
+
+        String sqlText = "select " + "  ARRAY(MAP('aa', 10), MAP('bb', 20)) as 
a1 " + "from test";
+
+        sql.init("test", null, inType, sqlText);
+
+        List<SeaTunnelRow> out = sql.transformBySQL(dummyRow(), inType);
+        Assertions.assertEquals(1, out.size());
+
+        Object[] a1 = (Object[]) out.get(0).getField(0);
+        Assertions.assertEquals(2, a1.length);
+
+        Map m0 = (Map) a1[0];
+        Map m1 = (Map) a1[1];
+        Assertions.assertEquals(10, ((Number) m0.get("aa")).intValue());
+        Assertions.assertEquals(20, ((Number) m1.get("bb")).intValue());
+    }
+}


Reply via email to