This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9d201bb103 [Fix][transform-v2]SQL transform support max/min function 
(#8625)
9d201bb103 is described below

commit 9d201bb103d9e4525a1a1ee1ce71aff51b855bab
Author: CosmosNi <[email protected]>
AuthorDate: Mon Feb 10 21:52:10 2025 +0800

    [Fix][transform-v2]SQL transform support max/min function (#8625)
---
 docs/en/transform-v2/sql-functions.md              |  20 ++++
 docs/zh/transform-v2/sql-functions.md              |  21 ++++
 .../apache/seatunnel/e2e/transform/TestSQLIT.java  |   4 +
 .../sql_transform/func_array_max_min.conf          | 113 +++++++++++++++++++++
 .../transform/sql/zeta/ZetaSQLFunction.java        |   6 ++
 .../seatunnel/transform/sql/zeta/ZetaSQLType.java  |   3 +
 .../sql/zeta/functions/ArrayFunction.java          |  61 +++++++++++
 7 files changed, 228 insertions(+)

diff --git a/docs/en/transform-v2/sql-functions.md 
b/docs/en/transform-v2/sql-functions.md
index 870471c866..48a3112e4b 100644
--- a/docs/en/transform-v2/sql-functions.md
+++ b/docs/en/transform-v2/sql-functions.md
@@ -385,6 +385,26 @@ Example:
 
 ACOS(D)
 
+### ARRAY_MAX
+
+```ARRAY_MAX(ARRAY)```
+
+The MAX function returns the maximum value of the expression.
+
+Example:
+
+ARRAY_MAX(I)
+
+### ARRAY_MIN
+
+```ARRAY_MIN(ARRAY)```
+
+The MIN function returns the minimum value of the expression.
+
+Example:
+
+ARRAY_MIN(I)
+
 ### ASIN
 
 ```ASIN(numeric)```
diff --git a/docs/zh/transform-v2/sql-functions.md 
b/docs/zh/transform-v2/sql-functions.md
index ae14cdf204..b4b879d27f 100644
--- a/docs/zh/transform-v2/sql-functions.md
+++ b/docs/zh/transform-v2/sql-functions.md
@@ -386,6 +386,27 @@ ABS(I)
 
 ACOS(D)
 
+### ARRAY_MAX
+
+```ARRAY_MAX(ARRAY)```
+
+MAX 函数返回表达式的最大值。
+
+示例:
+
+ARRAY_MAX(I)
+
+### ARRAY_MIN
+
+```ARRAY_MIN(ARRAY)```
+
+MIN 函数返回表达式的最小值。
+
+示例:
+
+ARRAY_MIN(I)
+
+
 ### ASIN
 
 ```ASIN(numeric)```
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
index 042fcaf15f..d47a5c9972 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java
@@ -77,6 +77,10 @@ public class TestSQLIT extends TestSuiteBase {
 
         Container.ExecResult splitSql = 
container.executeJob("/sql_transform/func_split.conf");
         Assertions.assertEquals(0, splitSql.getExitCode());
+
+        Container.ExecResult maxMinSql =
+                container.executeJob("/sql_transform/func_array_max_min.conf");
+        Assertions.assertEquals(0, maxMinSql.getExitCode());
     }
 
     @TestTemplate
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_array_max_min.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_array_max_min.conf
new file mode 100644
index 0000000000..1c7b66bb88
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/func_array_max_min.conf
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+
+env {
+   job.mode = "BATCH"
+   parallelism = 1
+ }
+
+source {
+  FakeSource {
+    plugin_output = "fake"
+    schema = {
+      fields {
+        c_string = string
+        c_num_array = "array<int>"
+        c_string_array = "array<string>"
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = ["c_string",[1,2,3], ["a","b","c"]]
+      }
+    ]
+  }
+}
+
+ transform {
+   Sql {
+     plugin_input = "fake"
+     plugin_output = "fake1"
+     query = """select c_string,
+                     ARRAY_MAX(c_num_array) as c_num_max_array,
+                     ARRAY_MIN(c_num_array) as c_num_min_array,
+                     ARRAY_MAX(c_string_array) as c_string_max_array,
+                     ARRAY_MIN(c_string_array) as c_string_min_array
+                     from fake1"""
+   }
+ }
+
+sink {
+  Assert {
+    plugin_input = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 1
+          },
+          {
+            rule_type = MAX_ROW
+            rule_value = 1
+          }
+        ],
+        field_rules = [
+          {
+            field_name = "c_string"
+            field_type = "string"
+            field_value = [
+              {equals_to = "c_string"}
+            ]
+          },
+         {
+           field_name = "c_num_max_array"
+           field_type = "int"
+           field_value = [
+             {equals_to = 3}
+           ]
+         },
+         {
+           field_name = "c_num_min_array"
+           field_type = "int"
+           field_value = [
+             {equals_to = 1}
+           ]
+         },
+         {
+           field_name = "c_string_max_array"
+           field_type = "string"
+           field_value = [
+             {equals_to = "c"}
+           ]
+         },
+         {
+           field_name = "c_string_min_array"
+           field_type = "string"
+           field_value = [
+             {equals_to = "a"}
+           ]
+         }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index 8a69142c1d..ae26f12335 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -148,6 +148,8 @@ public class ZetaSQLFunction {
     public static final String SIGN = "SIGN";
     public static final String TRUNC = "TRUNC";
     public static final String TRUNCATE = "TRUNCATE";
+    public static final String ARRAY_MAX = "ARRAY_MAX";
+    public static final String ARRAY_MIN = "ARRAY_MIN";
 
     // -------------------------time and date 
functions----------------------------
     public static final String CURRENT_DATE = "CURRENT_DATE";
@@ -554,6 +556,10 @@ public class ZetaSQLFunction {
                 return SystemFunction.nullif(args);
             case ARRAY:
                 return ArrayFunction.array(args);
+            case ARRAY_MAX:
+                return ArrayFunction.arrayMax(args);
+            case ARRAY_MIN:
+                return ArrayFunction.arrayMin(args);
             case UUID:
                 return randomUUID().toString();
             default:
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
index fda9105179..5d5a26bf40 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java
@@ -450,6 +450,9 @@ public class ZetaSQLType {
                 return BasicType.DOUBLE_TYPE;
             case ZetaSQLFunction.ARRAY:
                 return ArrayFunction.castArrayTypeMapping(function, 
inputRowType);
+            case ZetaSQLFunction.ARRAY_MAX:
+            case ZetaSQLFunction.ARRAY_MIN:
+                return ArrayFunction.getElementType(function, inputRowType);
             case ZetaSQLFunction.SPLIT:
                 return ArrayType.STRING_ARRAY_TYPE;
             case ZetaSQLFunction.NOW:
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
index dcbe814d6b..f901bd79ad 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java
@@ -17,8 +17,11 @@
 package org.apache.seatunnel.transform.sql.zeta.functions;
 
 import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.transform.exception.TransformException;
 
 import net.sf.jsqlparser.expression.DoubleValue;
 import net.sf.jsqlparser.expression.Expression;
@@ -30,10 +33,60 @@ import 
net.sf.jsqlparser.expression.operators.relational.ExpressionList;
 import net.sf.jsqlparser.schema.Column;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 public class ArrayFunction {
 
+    public static Object arrayMax(List<Object> args) {
+        if (args == null || args.isEmpty()) {
+            return null;
+        }
+        Object[] dataList = (Object[]) args.get(0);
+        if (dataList == null || dataList.length == 0) {
+            return null;
+        }
+        if (dataList[0] instanceof String) {
+            return Arrays.stream(dataList)
+                    .map(String.class::cast)
+                    .max(String::compareTo)
+                    .orElse(null);
+        } else if (dataList[0] instanceof Number) {
+            return Arrays.stream(dataList)
+                    .map(Number.class::cast)
+                    .max(Comparator.comparingDouble(Number::doubleValue))
+                    .orElse(null);
+        }
+        throw new TransformException(
+                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                String.format("Unsupported function max() arguments: %s", 
args));
+    }
+
+    public static Object arrayMin(List<Object> args) {
+        if (args == null || args.isEmpty()) {
+            return null;
+        }
+        Object[] dataList = (Object[]) args.get(0);
+        if (dataList == null || dataList.length == 0) {
+            return null;
+        }
+        if (dataList[0] instanceof String) {
+            return Arrays.stream(dataList)
+                    .map(String.class::cast)
+                    .min(String::compareTo)
+                    .orElse(null);
+        } else if (dataList[0] instanceof Number) {
+            return Arrays.stream(dataList)
+                    .map(Number.class::cast)
+                    .min(Comparator.comparingDouble(Number::doubleValue))
+                    .orElse(null);
+        }
+        throw new TransformException(
+                CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                String.format("Unsupported function max() arguments: %s", 
args));
+    }
+
     public static Object[] array(List<Object> args) {
         if (args == null || args.isEmpty()) {
             return new Object[0];
@@ -150,6 +203,14 @@ public class ArrayFunction {
         return arrayType == null ? String.class : arrayType;
     }
 
+    public static SeaTunnelDataType<?> getElementType(
+            Function function, SeaTunnelRowType inputRowType) {
+        String columnName = 
function.getParameters().getExpressions().get(0).toString();
+        int columnIndex = inputRowType.indexOf(columnName);
+        ArrayType arrayType = (ArrayType) 
inputRowType.getFieldType(columnIndex);
+        return arrayType.getElementType();
+    }
+
     private static List<Class<?>> getFunctionArgs(
             Function function, SeaTunnelRowType inputRowType) {
         ExpressionList<Expression> expressionList =

Reply via email to