This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 620e5975985 [FLINK-26948][table] Add-ARRAY_SORT-function.
620e5975985 is described below

commit 620e5975985944a02886b82362a2bc1774c733e3
Author: Hanyu Zheng <[email protected]>
AuthorDate: Mon Jul 3 07:59:23 2023 -0700

    [FLINK-26948][table] Add-ARRAY_SORT-function.
    
    [FLINK-26948][table] Add-ARRAY_SORT-function.
---
 docs/data/sql_functions.yml                        |   3 +
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |  16 +++
 .../functions/BuiltInFunctionDefinitions.java      |  25 +++-
 .../table/types/inference/InputTypeStrategies.java |   5 -
 ...rrayComparableElementArgumentTypeStrategy.java} |  37 ++----
 .../strategies/SpecificInputTypeStrategies.java    |   5 +
 .../types/inference/InputTypeStrategiesTest.java   |  13 ++
 .../ArrayComparableElementTypeStrategyTest.java    |  55 --------
 .../functions/CollectionFunctionsITCase.java       | 148 ++++++++++++++++++++-
 .../functions/scalar/ArraySortFunction.java        | 124 +++++++++++++++++
 11 files changed, 345 insertions(+), 87 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 9692db6994b..7ebf715e279 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -646,6 +646,9 @@ collection:
   - sql: ARRAY_SLICE(array, start_offset[, end_offset])
     table: array.arraySlice(start_offset[, end_offset])
     description: Returns a subarray of the input array between 'start_offset' 
and 'end_offset' inclusive. The offsets are 1-based however 0 is also treated 
as the beginning of the array. Positive values are counted from the beginning 
of the array while negative from the end. If 'end_offset' is omitted then this 
offset is treated as the length of the array. If 'start_offset' is after 
'end_offset' or both are out of array bounds an empty array will be returned. 
Returns null if any input is null.
+  - sql: ARRAY_SORT(array[, ascending_order[, null_first]])
+    table: array.arraySort([, ascendingOrder[, null_first]])
+    description: Returns the array in sorted order.The function sorts an 
array, defaulting to ascending order with NULLs at the start when only the 
array is input. Specifying ascending_order as true orders the array in 
ascending with NULLs first, and setting it to false orders it in descending 
with NULLs last. Independently, null_first as true moves NULLs to the 
beginning, and as false to the end, irrespective of the sorting order. The 
function returns null if any input is null.
   - sql: ARRAY_UNION(array1, array2)
     table: haystack.arrayUnion(array)
     description: Returns an array of the elements in the union of array1 and 
array2, without duplicates. If any of the array is null, the function will 
return null.
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 5c7ea97df03..3d3cc1ad326 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -236,6 +236,7 @@ advanced type helper functions
     Expression.array_max
     Expression.array_slice
     Expression.array_min
+    Expression.array_sort
     Expression.array_union
     Expression.map_entries
     Expression.map_keys
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 4272f1724cb..84772c1739e 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1531,6 +1531,22 @@ class Expression(Generic[T]):
         else:
             return _ternary_op("array_slice")(self, start_offset, end_offset)
 
+    def array_sort(self, ascending_order=None, null_first=None) -> 
'Expression':
+        """
+        Returns the array in sorted order.
+        The function sorts an array, defaulting to ascending order with NULLs 
at the start when
+        only the array is input. Specifying ascending_order as true orders the 
array in ascending
+        with NULLs first, and setting it to false orders it in descending with 
NULLs last.
+        Independently, null_first as true moves NULLs to the beginning, and as 
false to the end,
+        irrespective of the sorting order. The function returns null if any 
input is null.
+        """
+        if ascending_order and null_first is None:
+            return _unary_op("array_sort")(self)
+        elif null_first is None:
+            return _binary_op("array_sort")(self, ascending_order)
+        else:
+            return _ternary_op("array_sort")(self, ascending_order, null_first)
+
     def array_union(self, array) -> 'Expression':
         """
         Returns an array of the elements in the union of array1 and array2, 
without duplicates.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 704e8efac35..967b55c90c2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -77,7 +77,6 @@ import static 
org.apache.flink.table.types.inference.InputTypeStrategies.NO_ARGS
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.OUTPUT_IF_NULL;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
-import static 
org.apache.flink.table.types.inference.InputTypeStrategies.arrayFullyComparableElementType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
@@ -99,6 +98,7 @@ import static 
org.apache.flink.table.types.inference.TypeStrategies.nullableIfAl
 import static 
org.apache.flink.table.types.inference.TypeStrategies.nullableIfArgs;
 import static 
org.apache.flink.table.types.inference.TypeStrategies.varyingString;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
@@ -231,6 +231,25 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayContainsFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition ARRAY_SORT =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ARRAY_SORT")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            or(
+                                    sequence(ARRAY_FULLY_COMPARABLE),
+                                    sequence(
+                                            ARRAY_FULLY_COMPARABLE,
+                                            logical(LogicalTypeRoot.BOOLEAN)),
+                                    sequence(
+                                            ARRAY_FULLY_COMPARABLE,
+                                            logical(LogicalTypeRoot.BOOLEAN),
+                                            logical(LogicalTypeRoot.BOOLEAN))))
+                    .outputTypeStrategy(nullableIfArgs(argument(0)))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ArraySortFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition ARRAY_DISTINCT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("ARRAY_DISTINCT")
@@ -327,7 +346,7 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("ARRAY_MAX")
                     .kind(SCALAR)
-                    .inputTypeStrategy(arrayFullyComparableElementType())
+                    .inputTypeStrategy(sequence(ARRAY_FULLY_COMPARABLE))
                     
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
                     .runtimeClass(
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayMaxFunction")
@@ -355,7 +374,7 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("ARRAY_MIN")
                     .kind(SCALAR)
-                    .inputTypeStrategy(arrayFullyComparableElementType())
+                    .inputTypeStrategy(sequence(ARRAY_FULLY_COMPARABLE))
                     
.outputTypeStrategy(forceNullable(SpecificTypeStrategies.ARRAY_ELEMENT))
                     .runtimeClass(
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayMinFunction")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
index f5477c508d4..ef6b1b20f59 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.expressions.TableSymbol;
 import org.apache.flink.table.types.DataType;
 import 
org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy;
-import 
org.apache.flink.table.types.inference.strategies.ArrayComparableElementTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.CommonArrayInputTypeStrategy;
 import 
org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
@@ -366,10 +365,6 @@ public final class InputTypeStrategies {
         return new 
CommonArrayInputTypeStrategy(ConstantArgumentCount.from(minCount));
     }
 
-    public static InputTypeStrategy arrayFullyComparableElementType() {
-        return new 
ArrayComparableElementTypeStrategy(StructuredComparison.FULL);
-    }
-
     /** @see ItemAtIndexArgumentTypeStrategy */
     public static final ArgumentTypeStrategy ITEM_AT_INDEX = new 
ItemAtIndexArgumentTypeStrategy();
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java
similarity index 77%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java
index 175a41d45dd..a8118cf95a1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementArgumentTypeStrategy.java
@@ -22,10 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.CollectionDataType;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
-import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -34,13 +32,12 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
 /**
- * An {@link InputTypeStrategy} that checks if the input argument is an ARRAY 
type and check whether
- * its' elements are comparable.
+ * An {@link ArgumentTypeStrategy} that checks if the input argument is an 
ARRAY type and check
+ * whether its' elements are comparable.
  *
  * <p>It requires one argument.
  *
@@ -48,28 +45,22 @@ import java.util.Optional;
  * #areComparable(LogicalType, LogicalType)}.
  */
 @Internal
-public final class ArrayComparableElementTypeStrategy implements 
InputTypeStrategy {
+public final class ArrayComparableElementArgumentTypeStrategy implements 
ArgumentTypeStrategy {
+
     private final StructuredComparison requiredComparison;
-    private final ConstantArgumentCount argumentCount;
 
-    public ArrayComparableElementTypeStrategy(StructuredComparison 
requiredComparison) {
+    public ArrayComparableElementArgumentTypeStrategy(StructuredComparison 
requiredComparison) {
         Preconditions.checkArgument(requiredComparison != 
StructuredComparison.NONE);
         this.requiredComparison = requiredComparison;
-        this.argumentCount = ConstantArgumentCount.of(1);
-    }
-
-    @Override
-    public ArgumentCount getArgumentCount() {
-        return argumentCount;
     }
 
     @Override
-    public Optional<List<DataType>> inferInputTypes(
-            CallContext callContext, boolean throwOnFailure) {
+    public Optional<DataType> inferArgumentType(
+            CallContext callContext, int argumentPos, boolean throwOnFailure) {
         final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
-        final DataType argumentType = argumentDataTypes.get(0);
+        final DataType argumentType = argumentDataTypes.get(argumentPos);
         if (!argumentType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
-            return callContext.fail(throwOnFailure, "All arguments requires to 
be an ARRAY type");
+            return callContext.fail(throwOnFailure, "The argument requires to 
be an ARRAY type");
         }
         final DataType elementDataType = ((CollectionDataType) 
argumentType).getElementDataType();
         final LogicalType elementLogicalDataType = 
elementDataType.getLogicalType();
@@ -80,7 +71,7 @@ public final class ArrayComparableElementTypeStrategy 
implements InputTypeStrate
                     elementLogicalDataType,
                     comparisonToString());
         }
-        return Optional.of(argumentDataTypes);
+        return Optional.of(argumentType);
     }
 
     private String comparisonToString() {
@@ -131,8 +122,8 @@ public final class ArrayComparableElementTypeStrategy 
implements InputTypeStrate
     }
 
     @Override
-    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
-        return Collections.singletonList(
-                Signature.of(Signature.Argument.ofGroup("ARRAY<COMPARABLE>")));
+    public Signature.Argument getExpectedArgument(
+            FunctionDefinition functionDefinition, int argumentPos) {
+        return Signature.Argument.ofGroup("ARRAY<COMPARABLE>");
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 635fd299bb2..bb6102b83b5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -38,6 +38,7 @@ import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical
 import static org.apache.flink.table.types.inference.InputTypeStrategies.or;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.repeatingSequence;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.symbol;
+import static 
org.apache.flink.table.types.logical.StructuredType.StructuredComparison;
 
 /**
  * Entry point for specific input type strategies not covered in {@link 
InputTypeStrategies}.
@@ -89,6 +90,10 @@ public final class SpecificInputTypeStrategies {
     public static final ArgumentTypeStrategy ARRAY_ELEMENT_ARG =
             new ArrayElementArgumentTypeStrategy();
 
+    /** Argument type representing the array is comparable. */
+    public static final ArgumentTypeStrategy ARRAY_FULLY_COMPARABLE =
+            new 
ArrayComparableElementArgumentTypeStrategy(StructuredComparison.FULL);
+
     /**
      * Input strategy for {@link BuiltInFunctionDefinitions#JSON_OBJECT}.
      *
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
index a2e3f6a9ea6..76b35074ae2 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
@@ -640,6 +640,19 @@ class InputTypeStrategiesTest extends 
InputTypeStrategiesTestBase {
                         .expectArgumentTypes(
                                 
DataTypes.ARRAY(DataTypes.INT().notNull()).notNull(),
                                 DataTypes.INT()),
+                
TestSpec.forStrategy(sequence(SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE))
+                        .expectSignature("f(<ARRAY<COMPARABLE>>)")
+                        
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
+                        .expectErrorMessage(
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "f(<ARRAY<COMPARABLE>>)"),
+                TestSpec.forStrategy(
+                                "Strategy fails if input argument type is not 
ARRAY",
+                                
sequence(SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE))
+                        .calledWithArgumentTypes(DataTypes.INT())
+                        .expectErrorMessage(
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "f(<ARRAY<COMPARABLE>>)"),
                 TestSpec.forStrategy(
                                 "PROCTIME type strategy",
                                 
SpecificInputTypeStrategies.windowTimeIndicator(
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
deleted file mode 100644
index 3e9d298ebc7..00000000000
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayComparableElementTypeStrategyTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.types.inference.strategies;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.inference.InputTypeStrategies;
-import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
-
-import java.util.stream.Stream;
-
-/** Tests for {@link ArrayComparableElementTypeStrategy}. */
-public class ArrayComparableElementTypeStrategyTest extends 
InputTypeStrategiesTestBase {
-    @Override
-    protected Stream<TestSpec> testData() {
-        return Stream.of(
-                
TestSpec.forStrategy(InputTypeStrategies.arrayFullyComparableElementType())
-                        .expectSignature("f(<ARRAY<COMPARABLE>>)")
-                        
.calledWithArgumentTypes(DataTypes.ARRAY(DataTypes.ROW()))
-                        .expectErrorMessage(
-                                "Invalid input arguments. Expected signatures 
are:\n"
-                                        + "f(<ARRAY<COMPARABLE>>)"),
-                TestSpec.forStrategy(
-                                "Strategy fails if input argument type is not 
ARRAY",
-                                
InputTypeStrategies.arrayFullyComparableElementType())
-                        .calledWithArgumentTypes(DataTypes.INT())
-                        .expectErrorMessage(
-                                "Invalid input arguments. Expected signatures 
are:\n"
-                                        + "f(<ARRAY<COMPARABLE>>)"),
-                TestSpec.forStrategy(
-                                "Strategy fails if the number of input 
arguments are not one",
-                                
InputTypeStrategies.arrayFullyComparableElementType())
-                        .calledWithArgumentTypes(
-                                DataTypes.ARRAY(DataTypes.INT()),
-                                DataTypes.ARRAY(DataTypes.STRING()))
-                        .expectErrorMessage(
-                                "Invalid input arguments. Expected signatures 
are:\n"
-                                        + "f(<ARRAY<COMPARABLE>>)"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index 1bd9a0e8cab..1a5d5723e5c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -51,7 +51,8 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                         arrayMaxTestCases(),
                         arrayJoinTestCases(),
                         arraySliceTestCases(),
-                        arrayMinTestCases())
+                        arrayMinTestCases(),
+                        arraySortTestCases())
                 .flatMap(s -> s);
     }
 
@@ -1370,4 +1371,149 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "ARRAY_SLICE(<ARRAY>, <INTEGER>)")
                         .testSqlValidationError("ARRAY_SLICE(null)", "Illegal 
use of 'NULL'"));
     }
+
+    private Stream<TestSetSpec> arraySortTestCases() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_SORT)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2, 2, null},
+                                null,
+                                new Row[] {
+                                    Row.of(true, LocalDate.of(2022, 4, 20)),
+                                    Row.of(true, LocalDate.of(1990, 10, 14)),
+                                    null
+                                },
+                                new Double[] {1.2, 3.5, 4.7, 1.3, 1.0, 5.0},
+                                new String[] {"a", "cv", "dc", "rerer", "234", 
"12"},
+                                new LocalDate[] {
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDate.of(2023, 4, 21),
+                                    LocalDate.of(2022, 12, 24),
+                                    LocalDate.of(2026, 2, 10),
+                                    LocalDate.of(2012, 5, 16),
+                                    LocalDate.of(2092, 7, 19)
+                                })
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(DataTypes.BOOLEAN(), 
DataTypes.DATE())),
+                                DataTypes.ARRAY(DataTypes.DOUBLE()),
+                                DataTypes.ARRAY(DataTypes.STRING()),
+                                DataTypes.ARRAY(DataTypes.DATE()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0")),
+                                "ARRAY_SORT(f0)",
+                                new Integer[] {null, 1, 2, 2},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), false),
+                                "ARRAY_SORT(f0, false)",
+                                new Integer[] {2, 2, 1, null},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), true),
+                                "ARRAY_SORT(f0, true)",
+                                new Integer[] {null, 1, 2, 2},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), null),
+                                "ARRAY_SORT(f0, null)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), true, true),
+                                "ARRAY_SORT(f0, true, true)",
+                                new Integer[] {null, 1, 2, 2},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), true, false),
+                                "ARRAY_SORT(f0, true, false)",
+                                new Integer[] {1, 2, 2, null},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), false, true),
+                                "ARRAY_SORT(f0, false, true)",
+                                new Integer[] {null, 2, 2, 1},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), false, false),
+                                "ARRAY_SORT(f0, false, false)",
+                                new Integer[] {2, 2, 1, null},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), false, null),
+                                "ARRAY_SORT(f0, false, null)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f0"), null, null),
+                                "ARRAY_SORT(f0, false, null)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f1"), true),
+                                "ARRAY_SORT(f1, true)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f1"), false),
+                                "ARRAY_SORT(f1, true)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testTableApiValidationError(
+                                call("ARRAY_SORT", $("f2"), true),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ARRAY_SORT(<ARRAY<COMPARABLE>>)\n"
+                                        + "ARRAY_SORT(<ARRAY<COMPARABLE>>, 
<BOOLEAN>)\n"
+                                        + "ARRAY_SORT(<ARRAY<COMPARABLE>>, 
<BOOLEAN>, <BOOLEAN>)")
+                        .testSqlValidationError(
+                                "ARRAY_SORT(f2, true)",
+                                "SQL validation failed. Invalid function 
call:\n"
+                                        + "ARRAY_SORT(ARRAY<ROW<`f0` BOOLEAN, 
`f1` DATE>>, BOOLEAN NOT NULL)")
+                        .testResult(
+                                call("ARRAY_SORT", $("f3")),
+                                "ARRAY_SORT(f3)",
+                                new Double[] {1.0, 1.2, 1.3, 3.5, 4.7, 5.0},
+                                DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f3"), false),
+                                "ARRAY_SORT(f3, false)",
+                                new Double[] {5.0, 4.7, 3.5, 1.3, 1.2, 1.0},
+                                DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f4")),
+                                "ARRAY_SORT(f4)",
+                                new String[] {"12", "234", "a", "cv", "dc", 
"rerer"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f4"), false),
+                                "ARRAY_SORT(f4, false)",
+                                new String[] {"rerer", "dc", "cv", "a", "234", 
"12"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f5")),
+                                "ARRAY_SORT(f5)",
+                                new LocalDate[] {
+                                    LocalDate.of(2012, 5, 16),
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDate.of(2022, 12, 24),
+                                    LocalDate.of(2023, 4, 21),
+                                    LocalDate.of(2026, 2, 10),
+                                    LocalDate.of(2092, 7, 19)
+                                },
+                                DataTypes.ARRAY(DataTypes.DATE()))
+                        .testResult(
+                                call("ARRAY_SORT", $("f5"), false),
+                                "ARRAY_SORT(f5, false)",
+                                new LocalDate[] {
+                                    LocalDate.of(2092, 7, 19),
+                                    LocalDate.of(2026, 2, 10),
+                                    LocalDate.of(2023, 4, 21),
+                                    LocalDate.of(2022, 12, 24),
+                                    LocalDate.of(2022, 1, 2),
+                                    LocalDate.of(2012, 5, 16)
+                                },
+                                DataTypes.ARRAY(DataTypes.DATE())));
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java
new file mode 100644
index 00000000000..0bc0f1e6576
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArraySortFunction.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Implementation of ARRAY_SORT function. */
+@Internal
+public class ArraySortFunction extends BuiltInScalarFunction {
+
+    private final ArrayData.ElementGetter elementGetter;
+    private final SpecializedFunction.ExpressionEvaluator greaterEvaluator;
+
+    private transient MethodHandle greaterHandle;
+
+    public ArraySortFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ARRAY_SORT, context);
+        final DataType elementDataType =
+                ((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType()
+                        .toInternal();
+        elementGetter =
+                
ArrayData.createElementGetter(elementDataType.toInternal().getLogicalType());
+        greaterEvaluator =
+                context.createEvaluator(
+                        $("element1").isGreater($("element2")),
+                        DataTypes.BOOLEAN(),
+                        DataTypes.FIELD("element1", 
elementDataType.notNull().toInternal()),
+                        DataTypes.FIELD("element2", 
elementDataType.notNull().toInternal()));
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        greaterHandle = greaterEvaluator.open(context);
+    }
+
+    public @Nullable ArrayData eval(ArrayData array) {
+        return eval(array, true, true);
+    }
+
+    public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder) {
+        return eval(array, ascendingOrder, ascendingOrder);
+    }
+
+    public @Nullable ArrayData eval(ArrayData array, Boolean ascendingOrder, 
Boolean nullFirst) {
+        try {
+            if (array == null || ascendingOrder == null || nullFirst == null) {
+                return null;
+            }
+            if (array.size() == 0) {
+                return array;
+            }
+            Object[] elements = new Object[array.size()];
+            for (int i = 0; i < array.size(); i++) {
+                elements[i] = elementGetter.getElementOrNull(array, i);
+            }
+            Comparator<Object> ascendingComparator = new 
ArraySortComparator(ascendingOrder);
+            Arrays.sort(
+                    elements,
+                    nullFirst
+                            ? Comparator.nullsFirst(ascendingComparator)
+                            : Comparator.nullsLast(ascendingComparator));
+            return new GenericArrayData(elements);
+        } catch (Throwable t) {
+            throw new FlinkRuntimeException(t);
+        }
+    }
+
+    private class ArraySortComparator implements Comparator<Object> {
+        private final boolean isAscending;
+
+        public ArraySortComparator(boolean isAscending) {
+            this.isAscending = isAscending;
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            try {
+                boolean isGreater = (boolean) greaterHandle.invoke(o1, o2);
+                return isAscending ? (isGreater ? 1 : -1) : (isGreater ? -1 : 
1);
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        greaterEvaluator.close();
+    }
+}


Reply via email to