This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 82776bffe7d [FLINK-32257][table] Add built-in ARRAY_MAX function.
82776bffe7d is described below
commit 82776bffe7d93b5eb26c82dcce00b60155a82450
Author: Hanyu Zheng <[email protected]>
AuthorDate: Wed Jun 7 16:45:39 2023 -0700
[FLINK-32257][table] Add built-in ARRAY_MAX function.
This closes #22909
---
docs/data/sql_functions.yml | 3 +
.../docs/reference/pyflink.table/expressions.rst | 1 +
flink-python/pyflink/table/expression.py | 7 +
.../flink/table/api/internal/BaseExpressions.java | 10 ++
.../functions/BuiltInFunctionDefinitions.java | 13 ++
.../table/types/inference/InputTypeStrategies.java | 5 +
.../ArrayComparableElementTypeStrategy.java | 138 +++++++++++++++++++
.../strategies/ArrayElementOutputTypeStrategy.java | 47 +++++++
.../strategies/SpecificTypeStrategies.java | 3 +
.../ArrayComparableElementTypeStrategyTest.java | 55 ++++++++
.../ArrayElementOutputTypeStrategyTest.java | 48 +++++++
.../functions/CollectionFunctionsITCase.java | 149 ++++++++++++++++++++-
.../runtime/functions/scalar/ArrayMaxFunction.java | 89 ++++++++++++
13 files changed, 567 insertions(+), 1 deletion(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 87c8b0f9486..b64f03ad050 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -649,6 +649,9 @@ collection:
- sql: ARRAY_CONCAT(array1, ...)
table: array1.arrayConcat(...)
description: Returns an array that is the result of concatenating at least
one array. This array contains all the elements in the first array, followed by
all the elements in the second array, and so forth, up to the Nth array. If any
input array is NULL, the function returns NULL.
+ - sql: ARRAY_MAX(array)
+ table: array.arrayMax()
+ description: Returns the maximum value from the array, if array itself is
null, the function returns null.
- sql: MAP_KEYS(map)
table: MAP.mapKeys()
description: Returns the keys of the map as array. No order guaranteed.
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 0f9ac9bdc1d..72df9e5377e 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -231,6 +231,7 @@ advanced type helper functions
Expression.array_position
Expression.array_remove
Expression.array_reverse
+ Expression.array_max
Expression.array_union
Expression.map_entries
Expression.map_keys
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index 78984fd5d32..cba94230c4e 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1528,6 +1528,13 @@ class Expression(Generic[T]):
"""
return _binary_op("arrayConcat")(self, *arrays)
+ def array_max(self) -> 'Expression':
+ """
+ Returns the maximum value from the array.
+ if array itself is null, the function returns null.
+ """
+ return _unary_op("arrayMax")(self)
+
@property
def map_keys(self) -> 'Expression':
"""
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 5caefd52b2d..5100caf136d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -57,6 +57,7 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MAX;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_POSITION;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REVERSE;
@@ -1443,6 +1444,15 @@ public abstract class BaseExpressions<InType, OutType> {
}
}
+ /**
+ * Returns the maximum value from the array.
+ *
+ * <p>if array itself is null, the function returns null.
+ */
+ public OutType arrayMax() {
+ return toApiSpecificExpression(unresolvedCall(ARRAY_MAX, toExpr()));
+ }
+
/** Returns the keys of the map as an array. */
public OutType mapKeys() {
return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index a694e2b56c9..bfddb328758 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -33,6 +33,7 @@ import
org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.TypeStrategies;
+import
org.apache.flink.table.types.inference.strategies.ArrayElementOutputTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
import
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
import org.apache.flink.table.types.logical.LogicalType;
@@ -69,6 +70,7 @@ import static
org.apache.flink.table.types.inference.InputTypeStrategies.NO_ARGS
import static
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
+import static
org.apache.flink.table.types.inference.InputTypeStrategies.arrayFullyComparableElementType;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
@@ -295,6 +297,17 @@ public final class BuiltInFunctionDefinitions {
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayConcatFunction")
.build();
+
+ public static final BuiltInFunctionDefinition ARRAY_MAX =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("ARRAY_MAX")
+ .kind(SCALAR)
+ .inputTypeStrategy(arrayFullyComparableElementType())
+ .outputTypeStrategy(new ArrayElementOutputTypeStrategy())
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.ArrayMaxFunction")
+ .build();
+
public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
BuiltInFunctionDefinition.newBuilder()
.name("$REPLICATE_ROWS$1")
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 8fb44053096..7ed5ba771f5 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.expressions.TableSymbol;
import org.apache.flink.table.types.DataType;
import
org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
+import
org.apache.flink.table.types.inference.strategies.ArrayComparableElementTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.CommonArrayInputTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
@@ -364,6 +365,10 @@ public final class InputTypeStrategies {
return new
CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount));
}
+ public static InputTypeStrategy arrayFullyComparableElementType() {
+ return new
ArrayComparableElementTypeStrategy(StructuredComparison.FULL);
+ }
+
//
--------------------------------------------------------------------------------------------
private InputTypeStrategies() {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
new file mode 100644
index 00000000000..175a41d45dd
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY
type and check whether
+ * its' elements are comparable.
+ *
+ * <p>It requires one argument.
+ *
+ * <p>For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+@Internal
+public final class ArrayComparableElementTypeStrategy implements
InputTypeStrategy {
+ private final StructuredComparison requiredComparison;
+ private final ConstantArgumentCount argumentCount;
+
+ public ArrayComparableElementTypeStrategy(StructuredComparison
requiredComparison) {
+ Preconditions.checkArgument(requiredComparison !=
StructuredComparison.NONE);
+ this.requiredComparison = requiredComparison;
+ this.argumentCount = ConstantArgumentCount.of(1);
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ final DataType argumentType = argumentDataTypes.get(0);
+ if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
+ return callContext.fail(throwOnFailure, "All arguments requires to
be an ARRAY type");
+ }
+ final DataType elementDataType = ((CollectionDataType)
argumentType).getElementDataType();
+ final LogicalType elementLogicalDataType =
elementDataType.getLogicalType();
+ if (!areComparable(elementLogicalDataType, elementLogicalDataType)) {
+ return callContext.fail(
+ throwOnFailure,
+ "Type '%s' should support %s comparison with itself.",
+ elementLogicalDataType,
+ comparisonToString());
+ }
+ return Optional.of(argumentDataTypes);
+ }
+
+ private String comparisonToString() {
+ return requiredComparison == StructuredComparison.EQUALS
+ ? "'EQUALS'"
+ : "both 'EQUALS' and 'ORDER'";
+ }
+
+ private boolean areComparable(LogicalType firstType, LogicalType
secondType) {
+ return areComparableWithNormalizedNullability(firstType.copy(true),
secondType.copy(true));
+ }
+
+ private boolean areComparableWithNormalizedNullability(
+ LogicalType firstType, LogicalType secondType) {
+ // A hack to support legacy types. To be removed when we drop the
legacy types.
+ if (firstType instanceof LegacyTypeInformationType
+ || secondType instanceof LegacyTypeInformationType) {
+ return true;
+ }
+
+ // everything is comparable with null, it should return null in that
case
+ if (firstType.is(LogicalTypeRoot.NULL) ||
secondType.is(LogicalTypeRoot.NULL)) {
+ return true;
+ }
+
+ if (firstType.is(LogicalTypeFamily.NUMERIC) &&
secondType.is(LogicalTypeFamily.NUMERIC)) {
+ return true;
+ }
+
+ // DATE + ALL TIMESTAMPS
+ if (firstType.is(LogicalTypeFamily.DATETIME) &&
secondType.is(LogicalTypeFamily.DATETIME)) {
+ return true;
+ }
+
+ // VARCHAR + CHAR (we do not compare collations here)
+ if (firstType.is(LogicalTypeFamily.CHARACTER_STRING)
+ && secondType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+ return true;
+ }
+
+ // VARBINARY + BINARY
+ if (firstType.is(LogicalTypeFamily.BINARY_STRING)
+ && secondType.is(LogicalTypeFamily.BINARY_STRING)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return Collections.singletonList(
+ Signature.of(Signature.Argument.ofGroup("ARRAY<COMPARABLE>")));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java
new file mode 100644
index 00000000000..5c95292b3e2
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Optional;
+
+/** Specific {@link TypeStrategy} for {@link
BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayElementOutputTypeStrategy implements TypeStrategy {
+ @Override
+ public Optional<DataType> inferType(CallContext callContext) {
+ DataType inputDataType = callContext.getArgumentDataTypes().get(0);
+ if (inputDataType.getLogicalType().getTypeRoot() !=
LogicalTypeRoot.ARRAY) {
+ return Optional.empty();
+ }
+ final DataType elementDataType = ((CollectionDataType)
inputDataType).getElementDataType();
+ if (inputDataType.getLogicalType().isNullable()) {
+ return Optional.of(elementDataType.nullable());
+ } else {
+ return Optional.of(elementDataType);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index ed725a4edb8..3514123f0aa 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -58,6 +58,9 @@ public final class SpecificTypeStrategies {
/** See {@link ArrayTypeStrategy}. */
public static final TypeStrategy ARRAY = new ArrayTypeStrategy();
+ /** See {@link ArrayElementOutputTypeStrategy}. */
+ public static final TypeStrategy ARRAY_ELEMENT = new
ArrayElementOutputTypeStrategy();
+
/** See {@link GetTypeStrategy}. */
public static final TypeStrategy GET = new GetTypeStrategy();
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
new file mode 100644
index 00000000000..3e9d298ebc7
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+/** Tests for {@link ArrayComparableElementTypeStrategy}. */
+public class ArrayComparableElementTypeStrategyTest extends
InputTypeStrategiesTestBase {
+ @Override
+ protected Stream<TestSpec> testData() {
+ return Stream.of(
+
TestSpec.forStrategy(InputTypeStrategies.arrayFullyComparableElementType())
+ .expectSignature("f(<ARRAY<COMPARABLE>>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+ .expectErrorMessage(
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "f(<ARRAY<COMPARABLE>>)"),
+ TestSpec.forStrategy(
+ "Strategy fails if input argument type is not
ARRAY",
+
InputTypeStrategies.arrayFullyComparableElementType())
+ .calledWithArgumentTypes(DataTypes.INT())
+ .expectErrorMessage(
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "f(<ARRAY<COMPARABLE>>)"),
+ TestSpec.forStrategy(
+ "Strategy fails if the number of input
arguments are not one",
+
InputTypeStrategies.arrayFullyComparableElementType())
+ .calledWithArgumentTypes(
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.STRING()))
+ .expectErrorMessage(
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "f(<ARRAY<COMPARABLE>>)"));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategyTest.java
new file mode 100644
index 00000000000..420c5ad307e
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategyTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.TypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+/** Tests for {@link ArrayElementOutputTypeStrategy}. */
+class ArrayElementOutputTypeStrategyTest extends TypeStrategiesTestBase {
+
+ @Override
+ protected Stream<TestSpec> testData() {
+ return Stream.of(
+ TestSpec.forStrategy(
+ "infer an array's element type",
+ SpecificTypeStrategies.ARRAY_ELEMENT)
+
.inputTypes(DataTypes.ARRAY(DataTypes.INT().notNull()).notNull())
+
.expectDataType(DataTypes.INT().notNull().bridgedTo(int.class)),
+ TestSpec.forStrategy(
+ "infer an array's element type",
+ SpecificTypeStrategies.ARRAY_ELEMENT)
+ .inputTypes(DataTypes.ARRAY(DataTypes.INT()))
+ .expectDataType(DataTypes.INT()),
+ TestSpec.forStrategy(
+ "infer an array's element type",
+ SpecificTypeStrategies.ARRAY_ELEMENT)
+ .inputTypes(DataTypes.ARRAY(DataTypes.INT().notNull()))
+ .expectDataType(DataTypes.INT().bridgedTo(int.class)));
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index 689c13d38ac..31131ec519a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -44,7 +44,8 @@ class CollectionFunctionsITCase extends
BuiltInFunctionTestBase {
arrayRemoveTestCases(),
arrayReverseTestCases(),
arrayUnionTestCases(),
- arrayConcatTestCases())
+ arrayConcatTestCases(),
+ arrayMaxTestCases())
.flatMap(s -> s);
}
@@ -578,4 +579,150 @@ class CollectionFunctionsITCase extends
BuiltInFunctionTestBase {
"Invalid function call:\n"
+ "ARRAY_CONCAT(ARRAY<STRING> NOT
NULL, ARRAY<INT NOT NULL> NOT NULL)"));
}
+
+ private Stream<TestSetSpec> arrayMaxTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_MAX)
+ .onFieldsWithData(
+ new Integer[] {1, 2, null},
+ null,
+ new Double[] {1.2, null, 3.4, 8.0},
+ new String[] {"a", null, "bc", "d", "def"},
+ new Row[] {
+ Row.of(true, LocalDate.of(2022, 4, 20)),
+ Row.of(true, LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Map[] {
+ CollectionUtil.map(entry(1, "a"), entry(2,
"b")),
+ CollectionUtil.map(entry(3, "c"), entry(4,
"d")),
+ null
+ },
+ new Integer[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}, null},
+ new Row[] {
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Boolean[] {true, false, true, false, true,
null},
+ new Row[] {
+ Row.of(true),
+ Row.of(false),
+ Row.of(true),
+ Row.of(false),
+ Row.of(true),
+ Row.of(false),
+ null
+ },
+ new Row[] {
+ Row.of(1), Row.of(2), Row.of(8),
Row.of(4), Row.of(5),
+ Row.of(8), null
+ },
+ 1,
+ new Integer[][] {{1, 2}, {2, 3}, null},
+ new LocalDate[] {
+ LocalDate.of(2022, 1, 2),
+ LocalDate.of(2023, 4, 21),
+ LocalDate.of(2022, 12, 24),
+ LocalDate.of(2026, 2, 10),
+ LocalDate.of(2012, 5, 16),
+ LocalDate.of(2092, 7, 19)
+ },
+ null)
+ .andDataTypes(
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.DOUBLE()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(
+ DataTypes.ROW(DataTypes.BOOLEAN(),
DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.BOOLEAN()),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.BOOLEAN())),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT())),
+ DataTypes.INT().notNull(),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.ARRAY(DataTypes.DATE()),
+ DataTypes.ARRAY(DataTypes.INT().notNull()))
+ .testResult($("f0").arrayMax(), "ARRAY_MAX(f0)", 2,
DataTypes.INT())
+ .testResult($("f1").arrayMax(), "ARRAY_MAX(f1)", null,
DataTypes.INT())
+ .testResult($("f2").arrayMax(), "ARRAY_MAX(f2)", 8.0,
DataTypes.DOUBLE())
+ .testResult($("f3").arrayMax(), "ARRAY_MAX(f3)",
"def", DataTypes.STRING())
+ .testResult($("f14").arrayMax(), "ARRAY_MAX(f1)",
null, DataTypes.INT())
+ .testResult(
+ $("f13").arrayMax(),
+ "ARRAY_MAX(f13)",
+ LocalDate.of(2092, 7, 19),
+ DataTypes.DATE())
+ .testSqlValidationError(
+ "ARRAY_MAX(f4)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>")
+ .testTableApiValidationError(
+ $("f4").arrayMax(),
+ "Invalid function call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>")
+ .testSqlValidationError(
+ "ARRAY_MAX(f5)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<MAP<INT, STRING>>")
+ .testTableApiValidationError(
+ $("f5").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<MAP<INT, STRING>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f6)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testTableApiValidationError(
+ $("f6").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f7)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` DATE>>)")
+ .testTableApiValidationError(
+ $("f7").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ROW<`f0` DATE>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f8)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<BOOLEAN>)")
+ .testTableApiValidationError(
+ $("f8").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<BOOLEAN>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f9)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0`
BOOLEAN>>)")
+ .testTableApiValidationError(
+ $("f9").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f10)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` INT>>)")
+ .testTableApiValidationError(
+ $("f10").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ROW<`f0` INT>>)")
+ .testTableApiValidationError(
+ $("f11").arrayMax(),
+ "Invalid function call:\n" + "ARRAY_MAX(INT
NOT NULL)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f11)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(INT NOT NULL)")
+ .testTableApiValidationError(
+ $("f12").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f12)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ARRAY<INT>>)"));
+ }
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java
new file mode 100644
index 00000000000..5f77d94ac20
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayMaxFunction extends BuiltInScalarFunction {
+ private final ArrayData.ElementGetter elementGetter;
+ private final SpecializedFunction.ExpressionEvaluator compareEvaluator;
+ private transient MethodHandle compareHandle;
+
+ public ArrayMaxFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.ARRAY_MAX, context);
+
+ final DataType dataType =
+ ((CollectionDataType)
context.getCallContext().getArgumentDataTypes().get(0))
+ .getElementDataType();
+ elementGetter =
ArrayData.createElementGetter(dataType.getLogicalType());
+ compareEvaluator =
+ context.createEvaluator(
+ $("element1").isGreater($("element2")),
+ DataTypes.BOOLEAN().notNull(),
+ DataTypes.FIELD("element1",
dataType.notNull().toInternal()),
+ DataTypes.FIELD("element2",
dataType.notNull().toInternal()));
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ compareHandle = compareEvaluator.open(context);
+ }
+
+ public @Nullable Object eval(ArrayData array) {
+ try {
+ if (array == null || array.size() == 0) {
+ return null;
+ }
+
+ Object maxElement = null;
+ for (int i = 0; i < array.size(); i++) {
+ Object element = elementGetter.getElementOrNull(array, i);
+ if (element != null) {
+ if (maxElement == null || (boolean)
compareHandle.invoke(element, maxElement)) {
+ maxElement = element;
+ }
+ }
+ }
+ return maxElement;
+ } catch (Throwable t) {
+ throw new FlinkRuntimeException(t);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ compareEvaluator.close();
+ }
+}