This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 82776bffe7d [FLINK-32257][table] Add built-in ARRAY_MAX function.
82776bffe7d is described below

commit 82776bffe7d93b5eb26c82dcce00b60155a82450
Author: Hanyu Zheng <[email protected]>
AuthorDate: Wed Jun 7 16:45:39 2023 -0700

    [FLINK-32257][table] Add built-in ARRAY_MAX function.
    
    This closes #22909
---
 docs/data/sql_functions.yml                        |   3 +
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |   7 +
 .../flink/table/api/internal/BaseExpressions.java  |  10 ++
 .../functions/BuiltInFunctionDefinitions.java      |  13 ++
 .../table/types/inference/InputTypeStrategies.java |   5 +
 .../ArrayComparableElementTypeStrategy.java        | 138 +++++++++++++++++++
 .../strategies/ArrayElementOutputTypeStrategy.java |  47 +++++++
 .../strategies/SpecificTypeStrategies.java         |   3 +
 .../ArrayComparableElementTypeStrategyTest.java    |  55 ++++++++
 .../ArrayElementOutputTypeStrategyTest.java        |  48 +++++++
 .../functions/CollectionFunctionsITCase.java       | 149 ++++++++++++++++++++-
 .../runtime/functions/scalar/ArrayMaxFunction.java |  89 ++++++++++++
 13 files changed, 567 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 87c8b0f9486..b64f03ad050 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -649,6 +649,9 @@ collection:
   - sql: ARRAY_CONCAT(array1, ...)
     table: array1.arrayConcat(...)
     description: Returns an array that is the result of concatenating at least 
one array. This array contains all the elements in the first array, followed by 
all the elements in the second array, and so forth, up to the Nth array. If any 
input array is NULL, the function returns NULL.
+  - sql: ARRAY_MAX(array)
+    table: array.arrayMax()
+    description: Returns the maximum value from the array, if array itself is 
null, the function returns null.
   - sql: MAP_KEYS(map)
     table: MAP.mapKeys()
     description: Returns the keys of the map as array. No order guaranteed.
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 0f9ac9bdc1d..72df9e5377e 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -231,6 +231,7 @@ advanced type helper functions
     Expression.array_position
     Expression.array_remove
     Expression.array_reverse
+    Expression.array_max
     Expression.array_union
     Expression.map_entries
     Expression.map_keys
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 78984fd5d32..cba94230c4e 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1528,6 +1528,13 @@ class Expression(Generic[T]):
         """
         return _binary_op("arrayConcat")(self, *arrays)
 
+    def array_max(self) -> 'Expression':
+        """
+        Returns the maximum value from the array.
+        if array itself is null, the function returns null.
+        """
+        return _unary_op("arrayMax")(self)
+
     @property
     def map_keys(self) -> 'Expression':
         """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 5caefd52b2d..5100caf136d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -57,6 +57,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_ELEMENT;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MAX;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_POSITION;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REVERSE;
@@ -1443,6 +1444,15 @@ public abstract class BaseExpressions<InType, OutType> {
         }
     }
 
+    /**
+     * Returns the maximum value from the array.
+     *
+     * <p>if array itself is null, the function returns null.
+     */
+    public OutType arrayMax() {
+        return toApiSpecificExpression(unresolvedCall(ARRAY_MAX, toExpr()));
+    }
+
     /** Returns the keys of the map as an array. */
     public OutType mapKeys() {
         return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index a694e2b56c9..bfddb328758 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.TypeStrategies;
+import 
org.apache.flink.table.types.inference.strategies.ArrayElementOutputTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies;
 import 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -69,6 +70,7 @@ import static 
org.apache.flink.table.types.inference.InputTypeStrategies.NO_ARGS
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.arrayFullyComparableElementType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
@@ -295,6 +297,17 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeClass(
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayConcatFunction")
                     .build();
+
+    public static final BuiltInFunctionDefinition ARRAY_MAX =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ARRAY_MAX")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(arrayFullyComparableElementType())
+                    .outputTypeStrategy(new ArrayElementOutputTypeStrategy())
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ArrayMaxFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("$REPLICATE_ROWS$1")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index 8fb44053096..7ed5ba771f5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.expressions.TableSymbol;
 import org.apache.flink.table.types.DataType;
 import 
org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
+import 
org.apache.flink.table.types.inference.strategies.ArrayComparableElementTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.CommonArrayInputTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
@@ -364,6 +365,10 @@ public final class InputTypeStrategies {
         return new 
CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount));
     }
 
+    public static InputTypeStrategy arrayFullyComparableElementType() {
+        return new 
ArrayComparableElementTypeStrategy(StructuredComparison.FULL);
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private InputTypeStrategies() {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
new file mode 100644
index 00000000000..175a41d45dd
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY 
type and check whether
+ * its' elements are comparable.
+ *
+ * <p>It requires one argument.
+ *
+ * <p>For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+@Internal
+public final class ArrayComparableElementTypeStrategy implements 
InputTypeStrategy {
+    private final StructuredComparison requiredComparison;
+    private final ConstantArgumentCount argumentCount;
+
+    public ArrayComparableElementTypeStrategy(StructuredComparison 
requiredComparison) {
+        Preconditions.checkArgument(requiredComparison != 
StructuredComparison.NONE);
+        this.requiredComparison = requiredComparison;
+        this.argumentCount = ConstantArgumentCount.of(1);
+    }
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return argumentCount;
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+        final DataType argumentType = argumentDataTypes.get(0);
+        if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
+            return callContext.fail(throwOnFailure, "All arguments requires to 
be an ARRAY type");
+        }
+        final DataType elementDataType = ((CollectionDataType) 
argumentType).getElementDataType();
+        final LogicalType elementLogicalDataType = 
elementDataType.getLogicalType();
+        if (!areComparable(elementLogicalDataType, elementLogicalDataType)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Type '%s' should support %s comparison with itself.",
+                    elementLogicalDataType,
+                    comparisonToString());
+        }
+        return Optional.of(argumentDataTypes);
+    }
+
+    private String comparisonToString() {
+        return requiredComparison == StructuredComparison.EQUALS
+                ? "'EQUALS'"
+                : "both 'EQUALS' and 'ORDER'";
+    }
+
+    private boolean areComparable(LogicalType firstType, LogicalType 
secondType) {
+        return areComparableWithNormalizedNullability(firstType.copy(true), 
secondType.copy(true));
+    }
+
+    private boolean areComparableWithNormalizedNullability(
+            LogicalType firstType, LogicalType secondType) {
+        // A hack to support legacy types. To be removed when we drop the 
legacy types.
+        if (firstType instanceof LegacyTypeInformationType
+                || secondType instanceof LegacyTypeInformationType) {
+            return true;
+        }
+
+        // everything is comparable with null, it should return null in that 
case
+        if (firstType.is(LogicalTypeRoot.NULL) || 
secondType.is(LogicalTypeRoot.NULL)) {
+            return true;
+        }
+
+        if (firstType.is(LogicalTypeFamily.NUMERIC) && 
secondType.is(LogicalTypeFamily.NUMERIC)) {
+            return true;
+        }
+
+        // DATE + ALL TIMESTAMPS
+        if (firstType.is(LogicalTypeFamily.DATETIME) && 
secondType.is(LogicalTypeFamily.DATETIME)) {
+            return true;
+        }
+
+        // VARCHAR + CHAR (we do not compare collations here)
+        if (firstType.is(LogicalTypeFamily.CHARACTER_STRING)
+                && secondType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+            return true;
+        }
+
+        // VARBINARY + BINARY
+        if (firstType.is(LogicalTypeFamily.BINARY_STRING)
+                && secondType.is(LogicalTypeFamily.BINARY_STRING)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+        return Collections.singletonList(
+                Signature.of(Signature.Argument.ofGroup("ARRAY<COMPARABLE>")));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java
new file mode 100644
index 00000000000..5c95292b3e2
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Optional;
+
+/** Specific {@link TypeStrategy} for {@link 
BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayElementOutputTypeStrategy implements TypeStrategy {
+    @Override
+    public Optional<DataType> inferType(CallContext callContext) {
+        DataType inputDataType = callContext.getArgumentDataTypes().get(0);
+        if (inputDataType.getLogicalType().getTypeRoot() != 
LogicalTypeRoot.ARRAY) {
+            return Optional.empty();
+        }
+        final DataType elementDataType = ((CollectionDataType) 
inputDataType).getElementDataType();
+        if (inputDataType.getLogicalType().isNullable()) {
+            return Optional.of(elementDataType.nullable());
+        } else {
+            return Optional.of(elementDataType);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index ed725a4edb8..3514123f0aa 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -58,6 +58,9 @@ public final class SpecificTypeStrategies {
     /** See {@link ArrayTypeStrategy}. */
     public static final TypeStrategy ARRAY = new ArrayTypeStrategy();
 
+    /** See {@link ArrayElementOutputTypeStrategy}. */
+    public static final TypeStrategy ARRAY_ELEMENT = new 
ArrayElementOutputTypeStrategy();
+
     /** See {@link GetTypeStrategy}. */
     public static final TypeStrategy GET = new GetTypeStrategy();
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
new file mode 100644
index 00000000000..3e9d298ebc7
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+/** Tests for {@link ArrayComparableElementTypeStrategy}. */
+public class ArrayComparableElementTypeStrategyTest extends 
InputTypeStrategiesTestBase {
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                
TestSpec.forStrategy(InputTypeStrategies.arrayFullyComparableElementType())
+                        .expectSignature("f(<ARRAY<COMPARABLE>>)")
+                        
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+                        .expectErrorMessage(
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "f(<ARRAY<COMPARABLE>>)"),
+                TestSpec.forStrategy(
+                                "Strategy fails if input argument type is not 
ARRAY",
+                                
InputTypeStrategies.arrayFullyComparableElementType())
+                        .calledWithArgumentTypes(DataTypes.INT())
+                        .expectErrorMessage(
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "f(<ARRAY<COMPARABLE>>)"),
+                TestSpec.forStrategy(
+                                "Strategy fails if the number of input 
arguments are not one",
+                                
InputTypeStrategies.arrayFullyComparableElementType())
+                        .calledWithArgumentTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .expectErrorMessage(
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "f(<ARRAY<COMPARABLE>>)"));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategyTest.java
new file mode 100644
index 00000000000..420c5ad307e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategyTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.TypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+/** Tests for {@link ArrayElementOutputTypeStrategy}. */
+class ArrayElementOutputTypeStrategyTest extends TypeStrategiesTestBase {
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                TestSpec.forStrategy(
+                                "infer an array's element type",
+                                SpecificTypeStrategies.ARRAY_ELEMENT)
+                        
.inputTypes(DataTypes.ARRAY(DataTypes.INT().notNull()).notNull())
+                        
.expectDataType(DataTypes.INT().notNull().bridgedTo(int.class)),
+                TestSpec.forStrategy(
+                                "infer an array's element type",
+                                SpecificTypeStrategies.ARRAY_ELEMENT)
+                        .inputTypes(DataTypes.ARRAY(DataTypes.INT()))
+                        .expectDataType(DataTypes.INT()),
+                TestSpec.forStrategy(
+                                "infer an array's element type",
+                                SpecificTypeStrategies.ARRAY_ELEMENT)
+                        .inputTypes(DataTypes.ARRAY(DataTypes.INT().notNull()))
+                        .expectDataType(DataTypes.INT().bridgedTo(int.class)));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index 689c13d38ac..31131ec519a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -44,7 +44,8 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                         arrayRemoveTestCases(),
                         arrayReverseTestCases(),
                         arrayUnionTestCases(),
-                        arrayConcatTestCases())
+                        arrayConcatTestCases(),
+                        arrayMaxTestCases())
                 .flatMap(s -> s);
     }
 
@@ -578,4 +579,150 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 "Invalid function call:\n"
                                         + "ARRAY_CONCAT(ARRAY<STRING> NOT 
NULL, ARRAY<INT NOT NULL> NOT NULL)"));
     }
+
+    private Stream<TestSetSpec> arrayMaxTestCases() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_MAX)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2, null},
+                                null,
+                                new Double[] {1.2, null, 3.4, 8.0},
+                                new String[] {"a", null, "bc", "d", "def"},
+                                new Row[] {
+                                    Row.of(true, LocalDate.of(2022, 4, 20)),
+                                    Row.of(true, LocalDate.of(1990, 10, 14)),
+                                    null
+                                },
+                                new Map[] {
+                                    CollectionUtil.map(entry(1, "a"), entry(2, 
"b")),
+                                    CollectionUtil.map(entry(3, "c"), entry(4, 
"d")),
+                                    null
+                                },
+                                new Integer[][] {{1, 2, 3}, {4, 5, 6}, {7, 8, 
9}, null},
+                                new Row[] {
+                                    Row.of(LocalDate.of(2022, 4, 20)),
+                                    Row.of(LocalDate.of(1990, 10, 14)),
+                                    Row.of(LocalDate.of(2022, 4, 20)),
+                                    Row.of(LocalDate.of(1990, 10, 14)),
+                                    Row.of(LocalDate.of(2022, 4, 20)),
+                                    Row.of(LocalDate.of(1990, 10, 14)),
+                                    null
+                                },
+                                new Boolean[] {true, false, true, false, true, 
null},
+                                new Row[] {
+                                    Row.of(true),
+                                    Row.of(false),
+                                    Row.of(true),
+                                    Row.of(false),
+                                    Row.of(true),
+                                    Row.of(false),
+                                    null
+                                },
+                                new Row[] {
+                                    Row.of(1), Row.of(2), Row.of(8), 
Row.of(4), Row.of(5),
+                                    Row.of(8), null
+                                },
+                                1,
+                                new Integer[][] {{1, 2}, {2, 3}, null},
+                                new LocalDate[] {
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDate.of(2023, 4, 21),
+                                    LocalDate.of(2022, 12, 24),
+                                    LocalDate.of(2026, 2, 10),
+                                    LocalDate.of(2012, 5, 16),
+                                    LocalDate.of(2092, 7, 19)
+                                },
+                                null)
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.DOUBLE()),
+                                DataTypes.ARRAY(DataTypes.STRING()),
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+                                DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+                                
DataTypes.ARRAY(DataTypes.ROW(DataTypes.DATE())),
+                                DataTypes.ARRAY(DataTypes.BOOLEAN()),
+                                
DataTypes.ARRAY(DataTypes.ROW(DataTypes.BOOLEAN())),
+                                
DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT())),
+                                DataTypes.INT().notNull(),
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+                                DataTypes.ARRAY(DataTypes.DATE()),
+                                DataTypes.ARRAY(DataTypes.INT().notNull()))
+                        .testResult($("f0").arrayMax(), "ARRAY_MAX(f0)", 2, 
DataTypes.INT())
+                        .testResult($("f1").arrayMax(), "ARRAY_MAX(f1)", null, 
DataTypes.INT())
+                        .testResult($("f2").arrayMax(), "ARRAY_MAX(f2)", 8.0, 
DataTypes.DOUBLE())
+                        .testResult($("f3").arrayMax(), "ARRAY_MAX(f3)", 
"def", DataTypes.STRING())
+                        .testResult($("f14").arrayMax(), "ARRAY_MAX(f1)", 
null, DataTypes.INT())
+                        .testResult(
+                                $("f13").arrayMax(),
+                                "ARRAY_MAX(f13)",
+                                LocalDate.of(2092, 7, 19),
+                                DataTypes.DATE())
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f4)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN, 
`f1` DATE>>")
+                        .testTableApiValidationError(
+                                $("f4").arrayMax(),
+                                "Invalid function call:\n"
+                                        + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN, 
`f1` DATE>>")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f5)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<MAP<INT, STRING>>")
+                        .testTableApiValidationError(
+                                $("f5").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<MAP<INT, STRING>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f6)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+                        .testTableApiValidationError(
+                                $("f6").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f7)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<ROW<`f0` DATE>>)")
+                        .testTableApiValidationError(
+                                $("f7").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<ROW<`f0` DATE>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f8)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<BOOLEAN>)")
+                        .testTableApiValidationError(
+                                $("f8").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<BOOLEAN>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f9)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<ROW<`f0` 
BOOLEAN>>)")
+                        .testTableApiValidationError(
+                                $("f9").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f10)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<ROW<`f0` INT>>)")
+                        .testTableApiValidationError(
+                                $("f10").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<ROW<`f0` INT>>)")
+                        .testTableApiValidationError(
+                                $("f11").arrayMax(),
+                                "Invalid function call:\n" + "ARRAY_MAX(INT 
NOT NULL)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f11)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(INT NOT NULL)")
+                        .testTableApiValidationError(
+                                $("f12").arrayMax(),
+                                "Invalid function call:\n" + 
"ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+                        .testSqlValidationError(
+                                "ARRAY_MAX(f12)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_MAX(ARRAY<ARRAY<INT>>)"));
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java
new file mode 100644
index 00000000000..5f77d94ac20
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayMaxFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+    private final SpecializedFunction.ExpressionEvaluator compareEvaluator;
+    private transient MethodHandle compareHandle;
+
+    public ArrayMaxFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_MAX, context);
+
+        final DataType dataType =
+                ((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+        compareEvaluator =
+                context.createEvaluator(
+                        $("element1").isGreater($("element2")),
+                        DataTypes.BOOLEAN().notNull(),
+                        DataTypes.FIELD("element1", 
dataType.notNull().toInternal()),
+                        DataTypes.FIELD("element2", 
dataType.notNull().toInternal()));
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        compareHandle = compareEvaluator.open(context);
+    }
+
+    public @Nullable Object eval(ArrayData array) {
+        try {
+            if (array == null || array.size() == 0) {
+                return null;
+            }
+
+            Object maxElement = null;
+            for (int i = 0; i < array.size(); i++) {
+                Object element = elementGetter.getElementOrNull(array, i);
+                if (element != null) {
+                    if (maxElement == null || (boolean) 
compareHandle.invoke(element, maxElement)) {
+                        maxElement = element;
+                    }
+                }
+            }
+            return maxElement;
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(t);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        compareEvaluator.close();
+    }
+}


Reply via email to