This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2c70bd346d37f96f01007c89e3eb66e919c0c0a8 Author: Hanyu Zheng <[email protected]> AuthorDate: Fri Jun 9 20:09:52 2023 -0700 [FLINK-32256] Add ARRAY_MIN function Find the minimum among all elements in the array for which ordering is supported. --- docs/data/sql_functions.yml | 3 + .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 7 + .../flink/table/api/internal/BaseExpressions.java | 10 ++ .../functions/BuiltInFunctionDefinitions.java | 10 ++ .../table/types/inference/TypeInferenceUtil.java | 5 + .../ArrayComparableElementTypeStrategy.java | 5 + .../functions/CollectionFunctionsITCase.java | 167 ++++++++++++++++++++- .../runtime/functions/scalar/ArrayMinFunction.java | 89 +++++++++++ 9 files changed, 295 insertions(+), 2 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 09a5c9543e2..dd95bed7778 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -658,6 +658,9 @@ collection: - sql: ARRAY_JOIN(array, delimiter[, nullReplacement]) table: array.arrayJoin(delimiter[, nullReplacement]) description: Returns a string that represents the concatenation of the elements in the given array and the elements' data type in the given array is string. The delimiter is a string that separates each pair of consecutive elements of the array. The optional nullReplacement is a string that replaces null elements in the array. If nullReplacement is not specified, null elements in the array will be omitted from the resulting string. Returns null if input array or delimiter or nullRepl [...] + - sql: ARRAY_MIN(array) + table: array.arrayMin() + description: Returns the minimum value from the array, if array itself is null, the function returns null. - sql: MAP_KEYS(map) table: MAP.mapKeys() description: Returns the keys of the map as array. No order guaranteed. diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 93541c53983..908a6ceda5a 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -234,6 +234,7 @@ advanced type helper functions Expression.array_reverse Expression.array_max Expression.array_slice + Expression.array_min Expression.array_union Expression.map_entries Expression.map_keys diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 3f55b762292..cb72ba40b21 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1564,6 +1564,13 @@ class Expression(Generic[T]): else: return _ternary_op("array_join")(self, delimiter, null_replacement) + def array_min(self) -> 'Expression': + """ + Returns the minimum value from the array. + if array itself is null, the function returns null. + """ + return _unary_op("arrayMin")(self) + @property def map_keys(self) -> 'Expression': """ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index c3b356d67bf..cdc108d3672 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -58,6 +58,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MAX; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_POSITION; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REVERSE; @@ -1477,6 +1478,15 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(ARRAY_MAX, toExpr())); } + /** + * Returns the minimum value from the array. + * + * <p>if array itself is null, the function returns null. + */ + public OutType arrayMin() { + return toApiSpecificExpression(unresolvedCall(ARRAY_MIN, toExpr())); + } + /** Returns the keys of the map as an array. */ public OutType mapKeys() { return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr())); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 669f4012003..dedeb6860c7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -351,6 +351,16 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.ArrayJoinFunction") .build(); + public static final BuiltInFunctionDefinition ARRAY_MIN = + BuiltInFunctionDefinition.newBuilder() + .name("ARRAY_MIN") + .kind(SCALAR) + .inputTypeStrategy(arrayFullyComparableElementType()) + .outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT)) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ArrayMinFunction") + .build(); + public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS = BuiltInFunctionDefinition.newBuilder() .name("$REPLICATE_ROWS$1") diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java index 8247ccad1b0..2e69044d7cf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java @@ -344,6 +344,11 @@ public final class TypeInferenceUtil { // -------------------------------------------------------------------------------------------- + public static boolean checkInputArgumentNumber( + ArgumentCount argumentCount, int actualCount, boolean throwOnFailure) { + return validateArgumentCount(argumentCount, actualCount, throwOnFailure); + } + private static Result runTypeInferenceInternal( TypeInference typeInference, CallContext callContext, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java index 175a41d45dd..c3e40e26647 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java @@ -27,6 +27,7 @@ import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; @@ -67,6 +68,10 @@ public final class ArrayComparableElementTypeStrategy implements InputTypeStrate public Optional<List<DataType>> inferInputTypes( CallContext callContext, boolean throwOnFailure) { final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); + if (!TypeInferenceUtil.checkInputArgumentNumber( + argumentCount, argumentDataTypes.size(), throwOnFailure)) { + return callContext.fail(throwOnFailure, "the input argument number should be one"); + } final DataType argumentType = argumentDataTypes.get(0); if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) { return callContext.fail(throwOnFailure, "All arguments requires to be an ARRAY type"); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java index a758d7cdf3b..1bd9a0e8cab 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java @@ -50,7 +50,8 @@ class CollectionFunctionsITCase extends BuiltInFunctionTestBase { arrayConcatTestCases(), arrayMaxTestCases(), arrayJoinTestCases(), - arraySliceTestCases()) + arraySliceTestCases(), + arrayMinTestCases()) .flatMap(s -> s); } @@ -738,7 +739,169 @@ class CollectionFunctionsITCase extends BuiltInFunctionTestBase { .testSqlValidationError( "ARRAY_MAX(f12)", "SQL validation failed. Invalid function call:\n" - + "ARRAY_MAX(ARRAY<ARRAY<INT>>)")); + + "ARRAY_MAX(ARRAY<ARRAY<INT>>)") + .testSqlValidationError( + "ARRAY_MAX()", "No match found for function signature ARRAY_MAX()") + .testSqlValidationError( + "ARRAY_MAX(ARRAY[1], ARRAY[2])", + "No match found for function signature ARRAY_MAX(<INTEGER ARRAY>, <INTEGER ARRAY>)")); + } + + private Stream<TestSetSpec> arrayMinTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_MIN) + .onFieldsWithData( + new Integer[] {1, 2, null}, + null, + new Double[] {1.2, null, 3.4, 8.0}, + new String[] {"a", null, "bc", "d", "def"}, + new Row[] { + Row.of(true, LocalDate.of(2022, 4, 20)), + Row.of(true, LocalDate.of(1990, 10, 14)), + null + }, + new Map[] { + CollectionUtil.map(entry(1, "a"), entry(2, "b")), + CollectionUtil.map(entry(3, "c"), entry(4, "d")), + null + }, + new Integer[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 9}, null}, + new Row[] { + Row.of(LocalDate.of(2022, 4, 20)), + Row.of(LocalDate.of(1990, 10, 14)), + Row.of(LocalDate.of(2022, 4, 20)), + Row.of(LocalDate.of(1990, 10, 14)), + Row.of(LocalDate.of(2022, 4, 20)), + Row.of(LocalDate.of(1990, 10, 14)), + null + }, + new Boolean[] {true, false, true, false, true, null}, + new Row[] { + Row.of(true), + Row.of(false), + Row.of(true), + Row.of(false), + Row.of(true), + Row.of(false), + null + }, + new Row[] { + Row.of(1), Row.of(2), Row.of(8), Row.of(4), Row.of(5), + Row.of(8), null + }, + 1, + new Integer[][] {{1, 2}, {2, 3}, null}, + new LocalDate[] { + LocalDate.of(2022, 1, 2), + LocalDate.of(2023, 4, 21), + LocalDate.of(2022, 12, 24), + LocalDate.of(2026, 2, 10), + LocalDate.of(2012, 5, 16), + LocalDate.of(2092, 7, 19) + }, + null) + .andDataTypes( + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.INT()), + DataTypes.ARRAY(DataTypes.DOUBLE()), + DataTypes.ARRAY(DataTypes.STRING()), + DataTypes.ARRAY( + DataTypes.ROW(DataTypes.BOOLEAN(), DataTypes.DATE())), + DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.ROW(DataTypes.DATE())), + DataTypes.ARRAY(DataTypes.BOOLEAN()), + DataTypes.ARRAY(DataTypes.ROW(DataTypes.BOOLEAN())), + DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT())), + DataTypes.INT().notNull(), + DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())), + DataTypes.ARRAY(DataTypes.DATE()), + DataTypes.ARRAY(DataTypes.INT().notNull())) + .testResult($("f0").arrayMin(), "ARRAY_MIN(f0)", 1, DataTypes.INT()) + .testResult($("f1").arrayMin(), "ARRAY_MIN(f1)", null, DataTypes.INT()) + .testResult($("f2").arrayMin(), "ARRAY_MIN(f2)", 1.2, DataTypes.DOUBLE()) + .testResult($("f3").arrayMin(), "ARRAY_MIN(f3)", "a", DataTypes.STRING()) + .testResult($("f14").arrayMin(), "ARRAY_MIN(f14)", null, DataTypes.INT()) + .testResult( + $("f13").arrayMin(), + "ARRAY_MIN(f13)", + LocalDate.of(2012, 5, 16), + DataTypes.DATE()) + .testSqlValidationError( + "ARRAY_MIN(f4)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN, `f1` DATE>>") + .testTableApiValidationError( + $("f4").arrayMin(), + "Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN, `f1` DATE>>") + .testSqlValidationError( + "ARRAY_MIN(f5)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<MAP<INT, STRING>>") + .testTableApiValidationError( + $("f5").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<MAP<INT, STRING>>)") + .testSqlValidationError( + "ARRAY_MIN(f6)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ARRAY<INT>>)") + .testTableApiValidationError( + $("f6").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<ARRAY<INT>>)") + .testSqlValidationError( + "ARRAY_MIN(f7)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ROW<`f0` DATE>>)") + .testTableApiValidationError( + $("f7").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<ROW<`f0` DATE>>)") + .testSqlValidationError( + "ARRAY_MIN(f8)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<BOOLEAN>)") + .testTableApiValidationError( + $("f8").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<BOOLEAN>)") + .testSqlValidationError( + "ARRAY_MIN(f9)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN>>)") + .testTableApiValidationError( + $("f9").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<ROW<`f0` BOOLEAN>>)") + .testSqlValidationError( + "ARRAY_MIN(f10)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ROW<`f0` INT>>)") + .testTableApiValidationError( + $("f10").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<ROW<`f0` INT>>)") + .testTableApiValidationError( + $("f11").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(INT NOT NULL)") + .testSqlValidationError( + "ARRAY_MIN(f11)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(INT NOT NULL)") + .testTableApiValidationError( + $("f12").arrayMin(), + "Invalid function call:\n" + "ARRAY_MIN(ARRAY<ARRAY<INT>>)") + .testSqlValidationError( + "ARRAY_MIN(f12)", + "SQL validation failed. Invalid function call:\n" + + "ARRAY_MIN(ARRAY<ARRAY<INT>>)") + .testSqlValidationError( + "ARRAY_MIN()", "No match found for function signature ARRAY_MIN()") + .testSqlValidationError( + "ARRAY_MIN(ARRAY[1], ARRAY[2])", + "No match found for function signature ARRAY_MIN(<INTEGER ARRAY>, <INTEGER ARRAY>)") + .withFunction(CreateEmptyArray.class) + .testResult( + call("CreateEmptyArray").arrayMin(), + "ARRAY_MIN(CreateEmptyArray())", + null, + DataTypes.INT())); } private Stream<TestSetSpec> arrayJoinTestCases() { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMinFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMinFunction.java new file mode 100644 index 00000000000..92fe74aa68a --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMinFunction.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.types.CollectionDataType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; + +import java.lang.invoke.MethodHandle; + +import static org.apache.flink.table.api.Expressions.$; + +/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_MIN}. */ +@Internal +public class ArrayMinFunction extends BuiltInScalarFunction { + private final ArrayData.ElementGetter elementGetter; + private final SpecializedFunction.ExpressionEvaluator compareEvaluator; + private transient MethodHandle compareHandle; + + public ArrayMinFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.ARRAY_MIN, context); + + final DataType dataType = + ((CollectionDataType) context.getCallContext().getArgumentDataTypes().get(0)) + .getElementDataType(); + elementGetter = ArrayData.createElementGetter(dataType.getLogicalType()); + compareEvaluator = + context.createEvaluator( + $("element1").isLess($("element2")), + DataTypes.BOOLEAN().notNull(), + DataTypes.FIELD("element1", dataType.notNull().toInternal()), + DataTypes.FIELD("element2", dataType.notNull().toInternal())); + } + + @Override + public void open(FunctionContext context) throws Exception { + compareHandle = compareEvaluator.open(context); + } + + public @Nullable Object eval(ArrayData array) { + try { + if (array == null || array.size() == 0) { + return null; + } + + Object minElement = null; + for (int i = 0; i < array.size(); i++) { + Object element = elementGetter.getElementOrNull(array, i); + if (element != null) { + if (minElement == null || (boolean) compareHandle.invoke(element, minElement)) { + minElement = element; + } + } + } + return minElement; + } catch (Throwable t) { + throw new FlinkRuntimeException(t); + } + } + + @Override + public void close() throws Exception { + compareEvaluator.close(); + } +}
