This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 620e5975985 [FLINK-26948][table] Add-ARRAY_SORT-function.
620e5975985 is described below
commit 620e5975985944a02886b82362a2bc1774c733e3
Author: Hanyu Zheng <[email protected]>
AuthorDate: Mon Jul 3 07:59:23 2023 -0700
[FLINK-26948][table] Add-ARRAY_SORT-function.
[FLINK-26948][table] Add-ARRAY_SORT-function.
---
docs/data/sql_functions.yml | 3 +
.../docs/reference/pyflink.table/expressions.rst | 1 +
flink-python/pyflink/table/expression.py | 16 +++
.../functions/BuiltInFunctionDefinitions.java | 25 +++-
.../table/types/inference/InputTypeStrategies.java | 5 -
...rrayComparableElementArgumentTypeStrategy.java} | 37 ++----
.../strategies/SpecificInputTypeStrategies.java | 5 +
.../types/inference/InputTypeStrategiesTest.java | 13 ++
.../ArrayComparableElementTypeStrategyTest.java | 55 --------
.../functions/CollectionFunctionsITCase.java | 148 ++++++++++++++++++++-
.../functions/scalar/ArraySortFunction.java | 124 +++++++++++++++++
11 files changed, 345 insertions(+), 87 deletions(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 9692db6994b..7ebf715e279 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -646,6 +646,9 @@ collection:
- sql: ARRAY_SLICE(array, start_offset[, end_offset])
table: array.arraySlice(start_offset[, end_offset])
description: Returns a subarray of the input array between 'start_offset'
and 'end_offset' inclusive. The offsets are 1-based however 0 is also treated
as the beginning of the array. Positive values are counted from the beginning
of the array while negative from the end. If 'end_offset' is omitted then this
offset is treated as the length of the array. If 'start_offset' is after
'end_offset' or both are out of array bounds an empty array will be returned.
Returns null if any input is null.
+ - sql: ARRAY_SORT(array[, ascending_order[, null_first]])
+ table: array.arraySort([, ascendingOrder[, null_first]])
+ description: Returns the array in sorted order.The function sorts an
array, defaulting to ascending order with NULLs at the start when only the
array is input. Specifying ascending_order as true orders the array in
ascending with NULLs first, and setting it to false orders it in descending
with NULLs last. Independently, null_first as true moves NULLs to the
beginning, and as false to the end, irrespective of the sorting order. The
function returns null if any input is null.
- sql: ARRAY_UNION(array1, array2)
table: haystack.arrayUnion(array)
description: Returns an array of the elements in the union of array1 and
array2, without duplicates. If any of the array is null, the function will
return null.
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 5c7ea97df03..3d3cc1ad326 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -236,6 +236,7 @@ advanced type helper functions
Expression.array_max
Expression.array_slice
Expression.array_min
+ Expression.array_sort
Expression.array_union
Expression.map_entries
Expression.map_keys
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index 4272f1724cb..84772c1739e 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1531,6 +1531,22 @@ class Expression(Generic[T]):
else:
return _ternary_op("array_slice")(self, start_offset, end_offset)
+ def array_sort(self, ascending_order=None, null_first=None) ->
'Expression':
+ """
+ Returns the array in sorted order.
+ The function sorts an array, defaulting to ascending order with NULLs
at the start when
+ only the array is input. Specifying ascending_order as true orders the
array in ascending
+ with NULLs first, and setting it to false orders it in descending with
NULLs last.
+ Independently, null_first as true moves NULLs to the beginning, and as
false to the end,
+ irrespective of the sorting order. The function returns null if any
input is null.
+ """
+ if ascending_order and null_first is None:
+ return _unary_op("array_sort")(self)
+ elif null_first is None:
+ return _binary_op("array_sort")(self, ascending_order)
+ else:
+ return _ternary_op("array_sort")(self, ascending_order, null_first)
+
def array_union(self, array) -> 'Expression':
"""
Returns an array of the elements in the union of array1 and array2,
without duplicates.
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 704e8efac35..967b55c90c2 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -77,7 +77,6 @@ import static
org.apache.flink.table.types.inference.InputTypeStrategies.NO_ARGS
import static
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
-import static
org.apache.flink.table.types.inference.InputTypeStrategies.arrayFullyComparableElementType;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
@@ -99,6 +98,7 @@ import static
org.apache.flink.table.types.inference.TypeStrategies.nullableIfAl
import static
org.apache.flink.table.types.inference.TypeStrategies.nullableIfArgs;
import static
org.apache.flink.table.types.inference.TypeStrategies.varyingString;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
+import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
import static
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
@@ -231,6 +231,25 @@ public final class BuiltInFunctionDefinitions {
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
.build();
+ public static final BuiltInFunctionDefinition ARRAY_SORT =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("ARRAY_SORT")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ or(
+ sequence(ARRAY_FULLY_COMPARABLE),
+ sequence(
+ ARRAY_FULLY_COMPARABLE,
+ logical(LogicalTypeRoot.BOOLEAN)),
+ sequence(
+ ARRAY_FULLY_COMPARABLE,
+ logical(LogicalTypeRoot.BOOLEAN),
+ logical(LogicalTypeRoot.BOOLEAN))))
+ .outputTypeStrategy(nullableIfArgs(argument(0)))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.ArraySortFunction")
+ .build();
+
public static final BuiltInFunctionDefinition ARRAY_DISTINCT =
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_DISTINCT")
@@ -327,7 +346,7 @@ public final class BuiltInFunctionDefinitions {
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_MAX")
.kind(SCALAR)
- .inputTypeStrategy(arrayFullyComparableElementType())
+ .inputTypeStrategy(sequence(ARRAY_FULLY_COMPARABLE))
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayMaxFunction")
@@ -355,7 +374,7 @@ public final class BuiltInFunctionDefinitions {
BuiltInFunctionDefinition.newBuilder()
.name("ARRAY_MIN")
.kind(SCALAR)
- .inputTypeStrategy(arrayFullyComparableElementType())
+ .inputTypeStrategy(sequence(ARRAY_FULLY_COMPARABLE))
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ArrayMinFunction")
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index f5477c508d4..ef6b1b20f59 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.expressions.TableSymbol;
import org.apache.flink.table.types.DataType;
import
org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
-import
org.apache.flink.table.types.inference.strategies.ArrayComparableElementTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.CommonArrayInputTypeStrategy;
import
org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
@@ -366,10 +365,6 @@ public final class InputTypeStrategies {
return new
CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount));
}
- public static InputTypeStrategy arrayFullyComparableElementType() {
- return new
ArrayComparableElementTypeStrategy(StructuredComparison.FULL);
- }
-
/** @see ItemAtIndexArgumentTypeStrategy */
public static final ArgumentTypeStrategy ITEM_AT_INDEX = new
ItemAtIndexArgumentTypeStrategy();
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java
similarity index 77%
rename from
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
rename to
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java
index 175a41d45dd..a8118cf95a1 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java
@@ -22,10 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
-import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -34,13 +32,12 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
import org.apache.flink.util.Preconditions;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
/**
- * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY
type and check whether
- * its' elements are comparable.
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an
ARRAY type and check
+ * whether its' elements are comparable.
*
* <p>It requires one argument.
*
@@ -48,28 +45,22 @@ import java.util.Optional;
* #areComparable(LogicalType, LogicalType)}.
*/
@Internal
-public final class ArrayComparableElementTypeStrategy implements
InputTypeStrategy {
+public final class ArrayComparableElementArgumentTypeStrategy implements
ArgumentTypeStrategy {
+
private final StructuredComparison requiredComparison;
- private final ConstantArgumentCount argumentCount;
- public ArrayComparableElementTypeStrategy(StructuredComparison
requiredComparison) {
+ public ArrayComparableElementArgumentTypeStrategy(StructuredComparison
requiredComparison) {
Preconditions.checkArgument(requiredComparison !=
StructuredComparison.NONE);
this.requiredComparison = requiredComparison;
- this.argumentCount = ConstantArgumentCount.of(1);
- }
-
- @Override
- public ArgumentCount getArgumentCount() {
- return argumentCount;
}
@Override
- public Optional<List<DataType>> inferInputTypes(
- CallContext callContext, boolean throwOnFailure) {
+ public Optional<DataType> inferArgumentType(
+ CallContext callContext, int argumentPos, boolean throwOnFailure) {
final List<DataType> argumentDataTypes =
callContext.getArgumentDataTypes();
- final DataType argumentType = argumentDataTypes.get(0);
+ final DataType argumentType = argumentDataTypes.get(argumentPos);
if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
- return callContext.fail(throwOnFailure, "All arguments requires to
be an ARRAY type");
+ return callContext.fail(throwOnFailure, "The argument requires to
be an ARRAY type");
}
final DataType elementDataType = ((CollectionDataType)
argumentType).getElementDataType();
final LogicalType elementLogicalDataType =
elementDataType.getLogicalType();
@@ -80,7 +71,7 @@ public final class ArrayComparableElementTypeStrategy
implements InputTypeStrate
elementLogicalDataType,
comparisonToString());
}
- return Optional.of(argumentDataTypes);
+ return Optional.of(argumentType);
}
private String comparisonToString() {
@@ -131,8 +122,8 @@ public final class ArrayComparableElementTypeStrategy
implements InputTypeStrate
}
@Override
- public List<Signature> getExpectedSignatures(FunctionDefinition
definition) {
- return Collections.singletonList(
- Signature.of(Signature.Argument.ofGroup("ARRAY<COMPARABLE>")));
+ public Signature.Argument getExpectedArgument(
+ FunctionDefinition functionDefinition, int argumentPos) {
+ return Signature.Argument.ofGroup("ARRAY<COMPARABLE>");
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 635fd299bb2..bb6102b83b5 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -38,6 +38,7 @@ import static
org.apache.flink.table.types.inference.InputTypeStrategies.logical
import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.repeatingSequence;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
+import static
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
/**
* Entry point for specific input type strategies not covered in {@link
InputTypeStrategies}.
@@ -89,6 +90,10 @@ public final class SpecificInputTypeStrategies {
public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
new ArrayElementArgumentTypeStrategy();
+ /** Argument type representing the array is comparable. */
+ public static final ArgumentTypeStrategy ARRAY_FULLY_COMPARABLE =
+ new
ArrayComparableElementArgumentTypeStrategy(StructuredComparison.FULL);
+
/**
* Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
*
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index a2e3f6a9ea6..76b35074ae2 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -640,6 +640,19 @@ class InputTypeStrategiesTest extends
InputTypeStrategiesTestBase {
.expectArgumentTypes(
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
DataTypes.INT()),
+
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE))
+ .expectSignature("f(<ARRAY<COMPARABLE>>)")
+
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+ .expectErrorMessage(
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "f(<ARRAY<COMPARABLE>>)"),
+ TestSpec.forStrategy(
+ "Strategy fails if input argument type is not
ARRAY",
+
sequence(SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE))
+ .calledWithArgumentTypes(DataTypes.INT())
+ .expectErrorMessage(
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "f(<ARRAY<COMPARABLE>>)"),
TestSpec.forStrategy(
"PROCTIME type strategy",
SpecificInputTypeStrategies.windowTimeIndicator(
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
deleted file mode 100644
index 3e9d298ebc7..00000000000
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.types.inference.strategies;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.inference.InputTypeStrategies;
-import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
-
-import java.util.stream.Stream;
-
-/** Tests for {@link ArrayComparableElementTypeStrategy}. */
-public class ArrayComparableElementTypeStrategyTest extends
InputTypeStrategiesTestBase {
- @Override
- protected Stream<TestSpec> testData() {
- return Stream.of(
-
TestSpec.forStrategy(InputTypeStrategies.arrayFullyComparableElementType())
- .expectSignature("f(<ARRAY<COMPARABLE>>)")
-
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
- .expectErrorMessage(
- "Invalid input arguments. Expected signatures
are:\n"
- + "f(<ARRAY<COMPARABLE>>)"),
- TestSpec.forStrategy(
- "Strategy fails if input argument type is not
ARRAY",
-
InputTypeStrategies.arrayFullyComparableElementType())
- .calledWithArgumentTypes(DataTypes.INT())
- .expectErrorMessage(
- "Invalid input arguments. Expected signatures
are:\n"
- + "f(<ARRAY<COMPARABLE>>)"),
- TestSpec.forStrategy(
- "Strategy fails if the number of input
arguments are not one",
-
InputTypeStrategies.arrayFullyComparableElementType())
- .calledWithArgumentTypes(
- DataTypes.ARRAY(DataTypes.INT()),
- DataTypes.ARRAY(DataTypes.STRING()))
- .expectErrorMessage(
- "Invalid input arguments. Expected signatures
are:\n"
- + "f(<ARRAY<COMPARABLE>>)"));
- }
-}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index 1bd9a0e8cab..1a5d5723e5c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -51,7 +51,8 @@ class CollectionFunctionsITCase extends
BuiltInFunctionTestBase {
arrayMaxTestCases(),
arrayJoinTestCases(),
arraySliceTestCases(),
- arrayMinTestCases())
+ arrayMinTestCases(),
+ arraySortTestCases())
.flatMap(s -> s);
}
@@ -1370,4 +1371,149 @@ class CollectionFunctionsITCase extends
BuiltInFunctionTestBase {
+ "ARRAY_SLICE(<ARRAY>, <INTEGER>)")
.testSqlValidationError("ARRAY_SLICE(null)", "Illegal
use of 'NULL'"));
}
+
+ private Stream<TestSetSpec> arraySortTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_SORT)
+ .onFieldsWithData(
+ new Integer[] {1, 2, 2, null},
+ null,
+ new Row[] {
+ Row.of(true, LocalDate.of(2022, 4, 20)),
+ Row.of(true, LocalDate.of(1990, 10, 14)),
+ null
+ },
+ new Double[] {1.2, 3.5, 4.7, 1.3, 1.0, 5.0},
+ new String[] {"a", "cv", "dc", "rerer", "234",
"12"},
+ new LocalDate[] {
+ LocalDate.of(2022, 1, 2),
+ LocalDate.of(2023, 4, 21),
+ LocalDate.of(2022, 12, 24),
+ LocalDate.of(2026, 2, 10),
+ LocalDate.of(2012, 5, 16),
+ LocalDate.of(2092, 7, 19)
+ })
+ .andDataTypes(
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(
+ DataTypes.ROW(DataTypes.BOOLEAN(),
DataTypes.DATE())),
+ DataTypes.ARRAY(DataTypes.DOUBLE()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.DATE()))
+ .testResult(
+ call("ARRAY_SORT", $("f0")),
+ "ARRAY_SORT(f0)",
+ new Integer[] {null, 1, 2, 2},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), false),
+ "ARRAY_SORT(f0, false)",
+ new Integer[] {2, 2, 1, null},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), true),
+ "ARRAY_SORT(f0, true)",
+ new Integer[] {null, 1, 2, 2},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), null),
+ "ARRAY_SORT(f0, null)",
+ null,
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), true, true),
+ "ARRAY_SORT(f0, true, true)",
+ new Integer[] {null, 1, 2, 2},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), true, false),
+ "ARRAY_SORT(f0, true, false)",
+ new Integer[] {1, 2, 2, null},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), false, true),
+ "ARRAY_SORT(f0, false, true)",
+ new Integer[] {null, 2, 2, 1},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), false, false),
+ "ARRAY_SORT(f0, false, false)",
+ new Integer[] {2, 2, 1, null},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), false, null),
+ "ARRAY_SORT(f0, false, null)",
+ null,
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f0"), null, null),
+ "ARRAY_SORT(f0, false, null)",
+ null,
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f1"), true),
+ "ARRAY_SORT(f1, true)",
+ null,
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ call("ARRAY_SORT", $("f1"), false),
+ "ARRAY_SORT(f1, true)",
+ null,
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testTableApiValidationError(
+ call("ARRAY_SORT", $("f2"), true),
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "ARRAY_SORT(<ARRAY<COMPARABLE>>)\n"
+ + "ARRAY_SORT(<ARRAY<COMPARABLE>>,
<BOOLEAN>)\n"
+ + "ARRAY_SORT(<ARRAY<COMPARABLE>>,
<BOOLEAN>, <BOOLEAN>)")
+ .testSqlValidationError(
+ "ARRAY_SORT(f2, true)",
+ "SQL validation failed. Invalid function
call:\n"
+ + "ARRAY_SORT(ARRAY<ROW<`f0` BOOLEAN,
`f1` DATE>>, BOOLEAN NOT NULL)")
+ .testResult(
+ call("ARRAY_SORT", $("f3")),
+ "ARRAY_SORT(f3)",
+ new Double[] {1.0, 1.2, 1.3, 3.5, 4.7, 5.0},
+ DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .testResult(
+ call("ARRAY_SORT", $("f3"), false),
+ "ARRAY_SORT(f3, false)",
+ new Double[] {5.0, 4.7, 3.5, 1.3, 1.2, 1.0},
+ DataTypes.ARRAY(DataTypes.DOUBLE()))
+ .testResult(
+ call("ARRAY_SORT", $("f4")),
+ "ARRAY_SORT(f4)",
+ new String[] {"12", "234", "a", "cv", "dc",
"rerer"},
+ DataTypes.ARRAY(DataTypes.STRING()))
+ .testResult(
+ call("ARRAY_SORT", $("f4"), false),
+ "ARRAY_SORT(f4, false)",
+ new String[] {"rerer", "dc", "cv", "a", "234",
"12"},
+ DataTypes.ARRAY(DataTypes.STRING()))
+ .testResult(
+ call("ARRAY_SORT", $("f5")),
+ "ARRAY_SORT(f5)",
+ new LocalDate[] {
+ LocalDate.of(2012, 5, 16),
+ LocalDate.of(2022, 1, 2),
+ LocalDate.of(2022, 12, 24),
+ LocalDate.of(2023, 4, 21),
+ LocalDate.of(2026, 2, 10),
+ LocalDate.of(2092, 7, 19)
+ },
+ DataTypes.ARRAY(DataTypes.DATE()))
+ .testResult(
+ call("ARRAY_SORT", $("f5"), false),
+ "ARRAY_SORT(f5, false)",
+ new LocalDate[] {
+ LocalDate.of(2092, 7, 19),
+ LocalDate.of(2026, 2, 10),
+ LocalDate.of(2023, 4, 21),
+ LocalDate.of(2022, 12, 24),
+ LocalDate.of(2022, 1, 2),
+ LocalDate.of(2012, 5, 16)
+ },
+ DataTypes.ARRAY(DataTypes.DATE())));
+ }
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java
new file mode 100644
index 00000000000..0bc0f1e6576
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+ private final ArrayData.ElementGetter elementGetter;
+ private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+ private transient MethodHandle greaterHandle;
+
+ public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+ final DataType elementDataType =
+ ((CollectionDataType)
context.getCallContext().getArgumentDataTypes().get(0))
+ .getElementDataType()
+ .toInternal();
+ elementGetter =
+
ArrayData.createElementGetter(elementDataType.toInternal().getLogicalType());
+ greaterEvaluator =
+ context.createEvaluator(
+ $("element1").isGreater($("element2")),
+ DataTypes.BOOLEAN(),
+ DataTypes.FIELD("element1",
elementDataType.notNull().toInternal()),
+ DataTypes.FIELD("element2",
elementDataType.notNull().toInternal()));
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ greaterHandle = greaterEvaluator.open(context);
+ }
+
+ public @Nullable ArrayData eval(ArrayData array) {
+ return eval(array, true, true);
+ }
+
+ public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder) {
+ return eval(array, ascendingOrder, ascendingOrder);
+ }
+
+ public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder,
Boolean nullFirst) {
+ try {
+ if (array == null || ascendingOrder == null || nullFirst == null) {
+ return null;
+ }
+ if (array.size() == 0) {
+ return array;
+ }
+ Object[] elements = new Object[array.size()];
+ for (int i = 0; i < array.size(); i++) {
+ elements[i] = elementGetter.getElementOrNull(array, i);
+ }
+ Comparator<Object> ascendingComparator = new
ArraySortComparator(ascendingOrder);
+ Arrays.sort(
+ elements,
+ nullFirst
+ ? Comparator.nullsFirst(ascendingComparator)
+ : Comparator.nullsLast(ascendingComparator));
+ return new GenericArrayData(elements);
+ } catch (Throwable t) {
+ throw new FlinkRuntimeException(t);
+ }
+ }
+
+ private class ArraySortComparator implements Comparator<Object> {
+ private final boolean isAscending;
+
+ public ArraySortComparator(boolean isAscending) {
+ this.isAscending = isAscending;
+ }
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ try {
+ boolean isGreater = (boolean) greaterHandle.invoke(o1, o2);
+ return isAscending ? (isGreater ? 1 : -1) : (isGreater ? -1 :
1);
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ greaterEvaluator.close();
+ }
+}