dawidwys commented on code in PR #22730:
URL: https://github.com/apache/flink/pull/22730#discussion_r1242111562
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##########
@@ -1407,6 +1408,15 @@ public OutType arrayUnion(InType array) {
unresolvedCall(ARRAY_UNION, toExpr(),
objectToExpression(array)));
}
+ /**
+ * Return an element that is the maximum element in the array.
+ *
+ * <p>if array itself is null, the * function will return null.
Review Comment:
```suggestion
* <p>if array itself is null, the function returns null.
```
##########
flink-python/pyflink/table/expression.py:
##########
@@ -1519,6 +1519,13 @@ def array_union(self, array) -> 'Expression':
"""
return _binary_op("arrayUnion")(self, array)
+ def array_max(self) -> 'Expression':
+ """
+ Return the element that this element is the maximum one in the array
+ If the array is null, the function will return null.
Review Comment:
```suggestion
If the array is null, the function returns null.
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java:
##########
@@ -1407,6 +1408,15 @@ public OutType arrayUnion(InType array) {
unresolvedCall(ARRAY_UNION, toExpr(),
objectToExpression(array)));
}
+ /**
+ * Return an element that is the maximum element in the array.
Review Comment:
```suggestion
* Returns the maximum value from the array.
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##########
@@ -479,4 +480,165 @@ private Stream<TestSetSpec> arrayUnionTestCases() {
"Invalid input arguments. Expected signatures
are:\n"
+ "ARRAY_UNION(<COMMON>, <COMMON>)"));
}
+
+ private Stream<TestSetSpec> arrayMaxTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_MAX)
+ .onFieldsWithData(
+ new Integer[] {1, 2, null},
+ null,
+ new Double[] {1.2, null, 3.4, 8.0},
+ new String[] {"a", null, "bc", "d", "def"},
+ new Row[] {
+ Row.of(true, LocalDate.of(2022, 4, 20)),
+ Row.of(true, LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Map[] {
+ CollectionUtil.map(entry(1, "a"), entry(2,
"b")),
+ CollectionUtil.map(entry(3, "c"), entry(4,
"d")),
+ null
+ },
+ new Integer[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}, null},
+ new Row[] {
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Boolean[] {true, false, true, false, true,
null},
+ new Row[] {
+ Row.of(true),
+ Row.of(false),
+ Row.of(true),
+ Row.of(false),
+ Row.of(true),
+ Row.of(false),
+ null
+ },
+ new Row[] {
+ Row.of(1), Row.of(2), Row.of(8),
Row.of(4), Row.of(5),
+ Row.of(8), null
+ },
+ 1,
+ 1.5,
+ "abc",
+ new Integer[][] {{1, 2}, {2, 3}, null},
+ new LocalDate[] {
+ LocalDate.of(2022, 1, 2),
+ LocalDate.of(2023, 4, 21),
+ LocalDate.of(2022, 12, 24),
+ LocalDate.of(2026, 2, 10),
+ LocalDate.of(2012, 5, 16),
+ LocalDate.of(2092, 7, 19)
+ })
+ .andDataTypes(
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.DOUBLE()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(
+ DataTypes.ROW(DataTypes.BOOLEAN(),
DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.BOOLEAN()),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.BOOLEAN())),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT())),
+ DataTypes.INT().notNull(),
+ DataTypes.DOUBLE().notNull(),
+ DataTypes.STRING().notNull(),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.ARRAY(DataTypes.DATE()))
+ .testResult($("f0").arrayMax(), "ARRAY_MAX(f0)", 2,
DataTypes.INT())
+ .testResult($("f1").arrayMax(), "ARRAY_MAX(f1)", null,
DataTypes.INT())
+ .testResult($("f2").arrayMax(), "ARRAY_MAX(f2)", 8.0,
DataTypes.DOUBLE())
+ .testResult($("f3").arrayMax(), "ARRAY_MAX(f3)",
"def", DataTypes.STRING())
+ .testResult(
+ $("f15").arrayMax(),
+ "ARRAY_MAX(f15)",
+ LocalDate.of(2092, 7, 19),
+ DataTypes.DATE())
+ .testSqlValidationError(
+ "ARRAY_MAX(f4)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>")
+ .testTableApiValidationError(
+ $("f4").arrayMax(),
+ "Invalid function call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>")
+ .testSqlValidationError(
+ "ARRAY_MAX(f5)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<MAP<INT, STRING>>")
+ .testTableApiValidationError(
+ $("f5").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<MAP<INT, STRING>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f6)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testTableApiValidationError(
+ $("f6").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f7)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` DATE>>)")
+ .testTableApiValidationError(
+ $("f7").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ROW<`f0` DATE>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f8)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<BOOLEAN>)")
+ .testTableApiValidationError(
+ $("f8").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<BOOLEAN>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f9)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0`
BOOLEAN>>)")
+ .testTableApiValidationError(
+ $("f9").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f10)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` INT>>)")
+ .testTableApiValidationError(
+ $("f10").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ROW<`f0` INT>>)")
+ .testTableApiValidationError(
+ $("f11").arrayMax(),
+ "Invalid function call:\n" + "ARRAY_MAX(INT
NOT NULL)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f11)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(INT NOT NULL)")
+ .testTableApiValidationError(
+ $("f12").arrayMax(),
+ "Invalid function call:\n" + "ARRAY_MAX(DOUBLE
NOT NULL)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f12)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(DOUBLE NOT NULL)")
+ .testTableApiValidationError(
+ $("f13").arrayMax(),
+ "Invalid function call:\n" + "ARRAY_MAX(STRING
NOT NULL)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f13)",
+ "SQL validation failed. Invalid function
call:\n"
Review Comment:
How are those cases different?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java:
##########
@@ -479,4 +480,165 @@ private Stream<TestSetSpec> arrayUnionTestCases() {
"Invalid input arguments. Expected signatures
are:\n"
+ "ARRAY_UNION(<COMMON>, <COMMON>)"));
}
+
+ private Stream<TestSetSpec> arrayMaxTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_MAX)
+ .onFieldsWithData(
+ new Integer[] {1, 2, null},
+ null,
+ new Double[] {1.2, null, 3.4, 8.0},
+ new String[] {"a", null, "bc", "d", "def"},
+ new Row[] {
+ Row.of(true, LocalDate.of(2022, 4, 20)),
+ Row.of(true, LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Map[] {
+ CollectionUtil.map(entry(1, "a"), entry(2,
"b")),
+ CollectionUtil.map(entry(3, "c"), entry(4,
"d")),
+ null
+ },
+ new Integer[][] {{1, 2, 3}, {4, 5, 6}, {7, 8,
9}, null},
+ new Row[] {
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ Row.of(LocalDate.of(2022, 4, 20)),
+ Row.of(LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Boolean[] {true, false, true, false, true,
null},
+ new Row[] {
+ Row.of(true),
+ Row.of(false),
+ Row.of(true),
+ Row.of(false),
+ Row.of(true),
+ Row.of(false),
+ null
+ },
+ new Row[] {
+ Row.of(1), Row.of(2), Row.of(8),
Row.of(4), Row.of(5),
+ Row.of(8), null
+ },
+ 1,
+ 1.5,
+ "abc",
+ new Integer[][] {{1, 2}, {2, 3}, null},
+ new LocalDate[] {
+ LocalDate.of(2022, 1, 2),
+ LocalDate.of(2023, 4, 21),
+ LocalDate.of(2022, 12, 24),
+ LocalDate.of(2026, 2, 10),
+ LocalDate.of(2012, 5, 16),
+ LocalDate.of(2092, 7, 19)
+ })
+ .andDataTypes(
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.DOUBLE()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(
+ DataTypes.ROW(DataTypes.BOOLEAN(),
DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING())),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.BOOLEAN()),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.BOOLEAN())),
+
DataTypes.ARRAY(DataTypes.ROW(DataTypes.INT())),
+ DataTypes.INT().notNull(),
+ DataTypes.DOUBLE().notNull(),
+ DataTypes.STRING().notNull(),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())),
+ DataTypes.ARRAY(DataTypes.DATE()))
+ .testResult($("f0").arrayMax(), "ARRAY_MAX(f0)", 2,
DataTypes.INT())
+ .testResult($("f1").arrayMax(), "ARRAY_MAX(f1)", null,
DataTypes.INT())
+ .testResult($("f2").arrayMax(), "ARRAY_MAX(f2)", 8.0,
DataTypes.DOUBLE())
+ .testResult($("f3").arrayMax(), "ARRAY_MAX(f3)",
"def", DataTypes.STRING())
+ .testResult(
+ $("f15").arrayMax(),
+ "ARRAY_MAX(f15)",
+ LocalDate.of(2092, 7, 19),
+ DataTypes.DATE())
+ .testSqlValidationError(
+ "ARRAY_MAX(f4)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>")
+ .testTableApiValidationError(
+ $("f4").arrayMax(),
+ "Invalid function call:\n"
+ + "ARRAY_MAX(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>")
+ .testSqlValidationError(
+ "ARRAY_MAX(f5)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<MAP<INT, STRING>>")
+ .testTableApiValidationError(
+ $("f5").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<MAP<INT, STRING>>)")
+ .testSqlValidationError(
+ "ARRAY_MAX(f6)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testTableApiValidationError(
+ $("f6").arrayMax(),
+ "Invalid function call:\n" +
"ARRAY_MAX(ARRAY<ARRAY<INT>>)")
+ .testSqlValidationError(
Review Comment:
why is this invalid? I guess, because there is no coverage for `constructed`
types in the input strategy
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY
type and check whether
+ * its' elements are comparable.
+ *
+ * <p>It requires one argument.
+ *
+ * <p>For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+@Internal
+public final class ArrayComparableElementTypeStrategy implements
InputTypeStrategy {
+ private final StructuredComparison requiredComparison;
+ private final ConstantArgumentCount argumentCount;
+
+ public ArrayComparableElementTypeStrategy(StructuredComparison
requiredComparison) {
+ Preconditions.checkArgument(requiredComparison !=
StructuredComparison.NONE);
+ this.requiredComparison = requiredComparison;
+ this.argumentCount = ConstantArgumentCount.of(1);
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ List<LogicalType> argumentTypes =
+ argumentDataTypes.stream()
+ .map(DataType::getLogicalType)
+ .collect(Collectors.toList());
+
+ if (!argumentTypes.stream()
+ .allMatch(logicalType ->
logicalType.is(LogicalTypeRoot.ARRAY))) {
+ return callContext.fail(throwOnFailure, "All arguments requires to
be a ARRAY type");
+ }
+ if (argumentDataTypes.size() == 1) {
+ final DataType elementDataType =
+ ((CollectionDataType)
argumentDataTypes.get(0)).getElementDataType();
+ final LogicalType elementLogicalDataType =
elementDataType.getLogicalType();
+ if (!areComparable(elementLogicalDataType,
elementLogicalDataType)) {
+ return callContext.fail(
+ throwOnFailure,
+ "Type '%s' should support %s comparison with itself.",
+ elementLogicalDataType,
+ comparisonToString());
+ }
+ }
+ return Optional.of(argumentDataTypes);
+ }
+
+ private String comparisonToString() {
+ return requiredComparison == StructuredComparison.EQUALS
+ ? "'EQUALS'"
+ : "both 'EQUALS' and 'ORDER'";
+ }
+
+ private boolean areComparable(LogicalType firstType, LogicalType
secondType) {
+ return areComparableWithNormalizedNullability(firstType.copy(true),
secondType.copy(true));
+ }
+
+ private boolean areComparableWithNormalizedNullability(
+ LogicalType firstType, LogicalType secondType) {
+ // A hack to support legacy types. To be removed when we drop the
legacy types.
+ if (firstType instanceof LegacyTypeInformationType
+ || secondType instanceof LegacyTypeInformationType) {
+ return true;
+ }
+
+ // everything is comparable with null, it should return null in that
case
+ if (firstType.is(LogicalTypeRoot.NULL) ||
secondType.is(LogicalTypeRoot.NULL)) {
+ return true;
+ }
+
+ if (firstType.is(LogicalTypeFamily.NUMERIC) &&
secondType.is(LogicalTypeFamily.NUMERIC)) {
+ return true;
+ }
+
+ // DATE + ALL TIMESTAMPS
+ if (firstType.is(LogicalTypeFamily.DATETIME) &&
secondType.is(LogicalTypeFamily.DATETIME)) {
+ return true;
+ }
+
+ // VARCHAR + CHAR (we do not compare collations here)
+ if (firstType.is(LogicalTypeFamily.CHARACTER_STRING)
+ && secondType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+ return true;
+ }
+
+ // VARBINARY + BINARY
+ if (firstType.is(LogicalTypeFamily.BINARY_STRING)
+ && secondType.is(LogicalTypeFamily.BINARY_STRING)) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
+ return Collections.singletonList(
+ Signature.of(Signature.Argument.ofGroupVarying("COMPARABLE")));
Review Comment:
The expected signature for the function is `ARRAY<COMPARABLE>`, right? Not
`COMPARABLE...`
We expect the function to be called on an array, not with multiple
comparable arguments.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java:
##########
@@ -356,6 +357,9 @@ public static InputTypeStrategy commonArrayType(int count) {
return new
CommonArrayInputTypeStrategy(ConstantArgumentCount.of(count));
}
+ public static InputTypeStrategy arrayComparableElementType() {
Review Comment:
How about e.g. `arrayFullyComparableElementType` to reflect the type of
comparison? Or just instantiate the strategy inline.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY
type and check whether
+ * its' elements are comparable.
+ *
+ * <p>It requires one argument.
+ *
+ * <p>For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+@Internal
+public final class ArrayComparableElementTypeStrategy implements
InputTypeStrategy {
+ private final StructuredComparison requiredComparison;
+ private final ConstantArgumentCount argumentCount;
+
+ public ArrayComparableElementTypeStrategy(StructuredComparison
requiredComparison) {
+ Preconditions.checkArgument(requiredComparison !=
StructuredComparison.NONE);
+ this.requiredComparison = requiredComparison;
+ this.argumentCount = ConstantArgumentCount.of(1);
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ List<LogicalType> argumentTypes =
+ argumentDataTypes.stream()
+ .map(DataType::getLogicalType)
+ .collect(Collectors.toList());
+
+ if (!argumentTypes.stream()
+ .allMatch(logicalType ->
logicalType.is(LogicalTypeRoot.ARRAY))) {
+ return callContext.fail(throwOnFailure, "All arguments requires to
be a ARRAY type");
+ }
+ if (argumentDataTypes.size() == 1) {
+ final DataType elementDataType =
+ ((CollectionDataType)
argumentDataTypes.get(0)).getElementDataType();
+ final LogicalType elementLogicalDataType =
elementDataType.getLogicalType();
+ if (!areComparable(elementLogicalDataType,
elementLogicalDataType)) {
+ return callContext.fail(
+ throwOnFailure,
+ "Type '%s' should support %s comparison with itself.",
+ elementLogicalDataType,
+ comparisonToString());
+ }
+ }
+ return Optional.of(argumentDataTypes);
+ }
+
+ private String comparisonToString() {
+ return requiredComparison == StructuredComparison.EQUALS
+ ? "'EQUALS'"
+ : "both 'EQUALS' and 'ORDER'";
+ }
+
+ private boolean areComparable(LogicalType firstType, LogicalType
secondType) {
+ return areComparableWithNormalizedNullability(firstType.copy(true),
secondType.copy(true));
+ }
+
+ private boolean areComparableWithNormalizedNullability(
Review Comment:
Let's not duplicate the code. How about we create a package scoped utility
class:
```
org.apache.flink.table.types.inference.strategies.ComparabilityUtils {
static boolean areComparableWithNormalizedNullability;
private ComparabilityUtils {
// private ctor because the class is just a container for utility functions
}
}
```
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayMaxFunction.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayMaxFunction extends BuiltInScalarFunction {
+ private final ArrayData.ElementGetter elementGetter;
+ private final SpecializedFunction.ExpressionEvaluator compareEvaluator;
+ private transient MethodHandle compareHandle;
+
+ public ArrayMaxFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.ARRAY_MAX, context);
+
+ final DataType dataType =
+ ((CollectionDataType)
context.getCallContext().getArgumentDataTypes().get(0))
+ .getElementDataType();
+ elementGetter =
ArrayData.createElementGetter(dataType.getLogicalType());
+ compareEvaluator =
+ context.createEvaluator(
+ $("element1").isGreater($("element2")),
+ DataTypes.BOOLEAN(),
Review Comment:
```suggestion
DataTypes.BOOLEAN().notNull(),
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Optional;
+
+/** Specific {@link ArgumentTypeStrategy} for {@link
BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayElementOutputTypeStrategy implements TypeStrategy {
+ @Override
+ public Optional<DataType> inferType(CallContext callContext) {
+ DataType inputDataType = callContext.getArgumentDataTypes().get(0);
+ if (inputDataType.getLogicalType().getTypeRoot() !=
LogicalTypeRoot.ARRAY) {
+ return Optional.empty();
+ }
+ return Optional.of(((CollectionDataType)
inputDataType).getElementDataType());
Review Comment:
could you please add a test case for that?
##########
flink-python/pyflink/table/expression.py:
##########
@@ -1519,6 +1519,13 @@ def array_union(self, array) -> 'Expression':
"""
return _binary_op("arrayUnion")(self, array)
+ def array_max(self) -> 'Expression':
+ """
+ Return the element that this element is the maximum one in the array
Review Comment:
```suggestion
Returns the maximum value from the array
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY
type and check whether
+ * its' elements are comparable.
+ *
+ * <p>It requires one argument.
+ *
+ * <p>For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+@Internal
+public final class ArrayComparableElementTypeStrategy implements
InputTypeStrategy {
+ private final StructuredComparison requiredComparison;
+ private final ConstantArgumentCount argumentCount;
+
+ public ArrayComparableElementTypeStrategy(StructuredComparison
requiredComparison) {
+ Preconditions.checkArgument(requiredComparison !=
StructuredComparison.NONE);
+ this.requiredComparison = requiredComparison;
+ this.argumentCount = ConstantArgumentCount.of(1);
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
Review Comment:
What about `DISTINCT`, `STRUCTURED`, `CONSTRUCTED` and `RAW` types?
Those are covered in the `ComparableTypeStrategy`.
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayElementOutputTypeStrategy.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Optional;
+
+/** Specific {@link ArgumentTypeStrategy} for {@link
BuiltInFunctionDefinitions#ARRAY_MAX}. */
+@Internal
+public class ArrayElementOutputTypeStrategy implements TypeStrategy {
+ @Override
+ public Optional<DataType> inferType(CallContext callContext) {
+ DataType inputDataType = callContext.getArgumentDataTypes().get(0);
+ if (inputDataType.getLogicalType().getTypeRoot() !=
LogicalTypeRoot.ARRAY) {
+ return Optional.empty();
+ }
+ return Optional.of(((CollectionDataType)
inputDataType).getElementDataType());
Review Comment:
This should consider `nullability` of the array itself.
```
ARRAY(INT NOT NULL) NOT NULL -> output: INT NOT NULL
```
but
```
ARRAY(INT NOT NULL) -> output: INT
```
The javadoc says the output for a `null` array is `null`
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY
type and check whether
+ * its' elements are comparable.
+ *
+ * <p>It requires one argument.
+ *
+ * <p>For the rules which types are comparable with which types see {@link
+ * #areComparable(LogicalType, LogicalType)}.
+ */
+@Internal
+public final class ArrayComparableElementTypeStrategy implements
InputTypeStrategy {
+ private final StructuredComparison requiredComparison;
+ private final ConstantArgumentCount argumentCount;
+
+ public ArrayComparableElementTypeStrategy(StructuredComparison
requiredComparison) {
+ Preconditions.checkArgument(requiredComparison !=
StructuredComparison.NONE);
+ this.requiredComparison = requiredComparison;
+ this.argumentCount = ConstantArgumentCount.of(1);
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return argumentCount;
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
+ List<LogicalType> argumentTypes =
+ argumentDataTypes.stream()
+ .map(DataType::getLogicalType)
+ .collect(Collectors.toList());
+
+ if (!argumentTypes.stream()
+ .allMatch(logicalType ->
logicalType.is(LogicalTypeRoot.ARRAY))) {
+ return callContext.fail(throwOnFailure, "All arguments requires to
be a ARRAY type");
+ }
Review Comment:
Does it make sense to perform the check first if we expect exactly one
argument? Can't we first check the arg count and then if it is of an array type?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]