This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c70bd346d37f96f01007c89e3eb66e919c0c0a8
Author: Hanyu Zheng <[email protected]>
AuthorDate: Fri Jun 9 20:09:52 2023 -0700

    [FLINK-32256] Add ARRAY_MIN function
    
    Find the minimum among all elements in the array for which ordering is 
supported.
---
 docs/data/sql_functions.yml                        |   3 +
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |   7 +
 .../flink/table/api/internal/BaseExpressions.java  |  10 ++
 .../functions/BuiltInFunctionDefinitions.java      |  10 ++
 .../table/types/inference/TypeInferenceUtil.java   |   5 +
 .../ArrayComparableElementTypeStrategy.java        |   5 +
 .../functions/CollectionFunctionsITCase.java       | 167 ++++++++++++++++++++-
 .../runtime/functions/scalar/ArrayMinFunction.java |  89 +++++++++++
 9 files changed, 295 insertions(+), 2 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 09a5c9543e2..dd95bed7778 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -658,6 +658,9 @@ collection:
   - sql: ARRAY_JOIN(array, delimiter[, nullReplacement])
     table: array.arrayJoin(delimiter[, nullReplacement])
     description: Returns a string that represents the concatenation of the 
elements in the given array and the elements' data type in the given array is 
string. The delimiter is a string that separates each pair of consecutive 
elements of the array. The optional nullReplacement is a string that replaces 
null elements in the array. If nullReplacement is not specified, null elements 
in the array will be omitted from the resulting string. Returns null if input 
array or delimiter or nullRepl [...]
+  - sql: ARRAY_MIN(array)
+    table: array.arrayMin()
+    description: Returns the minimum value from the array, if array itself is 
null, the function returns null.
   - sql: MAP_KEYS(map)
     table: MAP.mapKeys()
     description: Returns the keys of the map as array. No order guaranteed.
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 93541c53983..908a6ceda5a 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -234,6 +234,7 @@ advanced type helper functions
     Expression.array_reverse
     Expression.array_max
     Expression.array_slice
+    Expression.array_min
     Expression.array_union
     Expression.map_entries
     Expression.map_keys
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 3f55b762292..cb72ba40b21 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1564,6 +1564,13 @@ class Expression(Generic[T]):
         else:
             return _ternary_op("array_join")(self, delimiter, null_replacement)
 
+    def array_min(self) -> 'Expression':
+        """
+        Returns the minimum value from the array.
+        if array itself is null, the function returns null.
+        """
+        return _unary_op("arrayMin")(self)
+
     @property
     def map_keys(self) -> 'Expression':
         """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index c3b356d67bf..cdc108d3672 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -58,6 +58,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MAX;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MIN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_POSITION;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REVERSE;
@@ -1477,6 +1478,15 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(ARRAY_MAX, toExpr()));
     }
 
+    /**
+     * Returns the minimum value from the array.
+     *
+     * <p>if array itself is null, the function returns null.
+     */
+    public OutType arrayMin() {
+        return toApiSpecificExpression(unresolvedCall(ARRAY_MIN, toExpr()));
+    }
+
     /** Returns the keys of the map as an array. */
     public OutType mapKeys() {
         return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 669f4012003..dedeb6860c7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -351,6 +351,16 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayJoinFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition ARRAY_MIN =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ARRAY_MIN")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(arrayFullyComparableElementType())
+                    
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ArrayMinFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("$REPLICATE_ROWS$1")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
index 8247ccad1b0..2e69044d7cf 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
@@ -344,6 +344,11 @@ public final class TypeInferenceUtil {
 
     // 
--------------------------------------------------------------------------------------------
 
+    public static boolean checkInputArgumentNumber(
+            ArgumentCount argumentCount, int actualCount, boolean 
throwOnFailure) {
+        return validateArgumentCount(argumentCount, actualCount, 
throwOnFailure);
+    }
+
     private static Result runTypeInferenceInternal(
             TypeInference typeInference,
             CallContext callContext,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
index 175a41d45dd..c3e40e26647 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.TypeInferenceUtil;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
@@ -67,6 +68,10 @@ public final class ArrayComparableElementTypeStrategy 
implements InputTypeStrate
     public Optional<List<DataType>> inferInputTypes(
             CallContext callContext, boolean throwOnFailure) {
         final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+        if (!TypeInferenceUtil.checkInputArgumentNumber(
+                argumentCount, argumentDataTypes.size(), throwOnFailure)) {
+            return callContext.fail(throwOnFailure, "the input argument number 
should be one");
+        }
         final DataType argumentType = argumentDataTypes.get(0);
         if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
             return callContext.fail(throwOnFailure, "All arguments requires to 
be an ARRAY type");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index a758d7cdf3b..1bd9a0e8cab 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -50,7 +50,8 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                         arrayConcatTestCases(),
                         arrayMaxTestCases(),
                         arrayJoinTestCases(),
-                        arraySliceTestCases())
+                        arraySliceTestCases(),
+                        arrayMinTestCases())
                 .flatMap(s -> s);
     }
 
@@ -738,7 +739,169 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                         .testSqlValidationError(
                                 "ARRAY_MAX(f12)",
                                 "SQL validation failed. Invalid function 
call:\n"
-                                        + "ARRAY_MAX(ARRAY<ARRAY<INT>>)"));
+                                        + "ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX()", "No match found for function 
signature ARRAY_MAX()")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(ARRAY[1], ARRAY[2])",
+                                "No match found for function signature 
ARRAY_MAX(<INTEGER ARRAY>, <INTEGER ARRAY>)"));
+    }
+
+    private Stream<TestSetSpec> arrayMinTestCases() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_MIN)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2, null},
+                                null,
+                                new Double[] {1.2, null, 3.4, 8.0},
+                                new String[] {"a", null, "bc", "d", "def"},
+                                new Row[] {
+                                    Row.of(true, LocalDate.of(2022, 4, 20)),
+                                    Row.of(true, LocalDate.of(1990, 10, 14)),
+                                    null
+                                },
+                                new Map[] {
+                                    CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+                                    CollectionUtil.map(entry(3, "c"), entry(4, 
"d")),
+                                    null
+                                },
+                                new Integer[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 
9}, null},
+                                new Row[] {
+                                    Row.of(LocalDate.of(2022, 4, 20)),
+                                    Row.of(LocalDate.of(1990, 10, 14)),
+                                    Row.of(LocalDate.of(2022, 4, 20)),
+                                    Row.of(LocalDate.of(1990, 10, 14)),
+                                    Row.of(LocalDate.of(2022, 4, 20)),
+                                    Row.of(LocalDate.of(1990, 10, 14)),
+                                    null
+                                },
+                                new Boolean[] {true, false, true, false, true, 
null},
+                                new Row[] {
+                                    Row.of(true),
+                                    Row.of(false),
+                                    Row.of(true),
+                                    Row.of(false),
+                                    Row.of(true),
+                                    Row.of(false),
+                                    null
+                                },
+                                new Row[] {
+                                    Row.of(1), Row.of(2), Row.of(8), 
Row.of(4), Row.of(5),
+                                    Row.of(8), null
+                                },
+                                1,
+                                new Integer[][] {{1, 2}, {2, 3}, null},
+                                new LocalDate[] {
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDate.of(2023, 4, 21),
+                                    LocalDate.of(2022, 12, 24),
+                                    LocalDate.of(2026, 2, 10),
+                                    LocalDate.of(2012, 5, 16),
+                                    LocalDate.of(2092, 7, 19)
+                                },
+                                null)
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.DOUBLE()),
+                                DataTypes.ARRAY(DataTypes.STRING()),
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+                                
DataTypes.ARRAY(DataTypes.ROW(DataTypes.DATE())),
+                                DataTypes.ARRAY(DataTypes.BOOLEAN()),
+                                
DataTypes.ARRAY(DataTypes.ROW(DataTypes.BOOLEAN())),
+                                
DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT())),
+                                DataTypes.INT().notNull(),
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+                                DataTypes.ARRAY(DataTypes.DATE()),
+                                DataTypes.ARRAY(DataTypes.INT().notNull()))
+                        .testResult($("f0").arrayMin(), "ARRAY_MIN(f0)", 1, 
DataTypes.INT())
+                        .testResult($("f1").arrayMin(), "ARRAY_MIN(f1)", null, 
DataTypes.INT())
+                        .testResult($("f2").arrayMin(), "ARRAY_MIN(f2)", 1.2, 
DataTypes.DOUBLE())
+                        .testResult($("f3").arrayMin(), "ARRAY_MIN(f3)", "a", 
DataTypes.STRING())
+                        .testResult($("f14").arrayMin(), "ARRAY_MIN(f14)", 
null, DataTypes.INT())
+                        .testResult(
+                                $("f13").arrayMin(),
+                                "ARRAY_MIN(f13)",
+                                LocalDate.of(2012, 5, 16),
+                                DataTypes.DATE())
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f4)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN, 
`f1` DATE>>")
+                        .testTableApiValidationError(
+                                $("f4").arrayMin(),
+                                "Invalid function call:\n"
+                                        + "ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN, 
`f1` DATE>>")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f5)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<MAP<INT, STRING>>")
+                        .testTableApiValidationError(
+                                $("f5").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<MAP<INT, STRING>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f6)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<ARRAY<INT>>)")
+                        .testTableApiValidationError(
+                                $("f6").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<ARRAY<INT>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f7)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<ROW<`f0` DATE>>)")
+                        .testTableApiValidationError(
+                                $("f7").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<ROW<`f0` DATE>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f8)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<BOOLEAN>)")
+                        .testTableApiValidationError(
+                                $("f8").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<BOOLEAN>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f9)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<ROW<`f0` 
BOOLEAN>>)")
+                        .testTableApiValidationError(
+                                $("f9").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f10)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<ROW<`f0` INT>>)")
+                        .testTableApiValidationError(
+                                $("f10").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<ROW<`f0` INT>>)")
+                        .testTableApiValidationError(
+                                $("f11").arrayMin(),
+                                "Invalid function call:\n" + "ARRAY_MIN(INT 
NOT NULL)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f11)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(INT NOT NULL)")
+                        .testTableApiValidationError(
+                                $("f12").arrayMin(),
+                                "Invalid function call:\n" + 
"ARRAY_MIN(ARRAY<ARRAY<INT>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(f12)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MIN(ARRAY<ARRAY<INT>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MIN()", "No match found for function 
signature ARRAY_MIN()")
+                        .testSqlValidationError(
+                                "ARRAY_MIN(ARRAY[1], ARRAY[2])",
+                                "No match found for function signature 
ARRAY_MIN(<INTEGER ARRAY>, <INTEGER ARRAY>)")
+                        .withFunction(CreateEmptyArray.class)
+                        .testResult(
+                                call("CreateEmptyArray").arrayMin(),
+                                "ARRAY_MIN(CreateEmptyArray())",
+                                null,
+                                DataTypes.INT()));
     }
 
     private Stream<TestSetSpec> arrayJoinTestCases() {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMinFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMinFunction.java
new file mode 100644
index 00000000000..92fe74aa68a
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMinFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_MIN}. */
+@Internal
+public class ArrayMinFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+    private final SpecializedFunction.ExpressionEvaluator compareEvaluator;
+    private transient MethodHandle compareHandle;
+
+    public ArrayMinFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_MIN, context);
+
+        final DataType dataType =
+                ((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+        compareEvaluator =
+                context.createEvaluator(
+                        $("element1").isLess($("element2")),
+                        DataTypes.BOOLEAN().notNull(),
+                        DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+                        DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        compareHandle = compareEvaluator.open(context);
+    }
+
+    public @Nullable Object eval(ArrayData array) {
+        try {
+            if (array == null || array.size() == 0) {
+                return null;
+            }
+
+            Object minElement = null;
+            for (int i = 0; i < array.size(); i++) {
+                Object element = elementGetter.getElementOrNull(array, i);
+                if (element != null) {
+                    if (minElement == null || (boolean) 
compareHandle.invoke(element, minElement)) {
+                        minElement = element;
+                    }
+                }
+            }
+            return minElement;
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(t);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        compareEvaluator.close();
+    }
+}

Reply via email to