This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e644beac8e5 [FLINK-27891][Table] Add ARRAY_APPEND and ARRAY_PREPEND 
functions
e644beac8e5 is described below

commit e644beac8e5ffe71d9b6185c06ed31050e7c5268
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Feb 19 23:06:21 2024 +0100

    [FLINK-27891][Table] Add ARRAY_APPEND and ARRAY_PREPEND functions
---
 docs/data/sql_functions.yml                        |  6 ++
 docs/data/sql_functions_zh.yml                     |  6 ++
 flink-python/pyflink/table/expression.py           | 18 +++++
 .../flink/table/api/internal/BaseExpressions.java  | 26 +++++++
 .../functions/BuiltInFunctionDefinitions.java      | 29 ++++++++
 .../strategies/ArrayAppendPrependTypeStrategy.java | 55 +++++++++++++++
 .../strategies/SpecificTypeStrategies.java         |  4 ++
 .../ArrayAppendPrependTypeStrategyTest.java        | 56 +++++++++++++++
 .../functions/CollectionFunctionsITCase.java       | 80 ++++++++++++++++++++++
 .../functions/scalar/ArrayAppendFunction.java      | 56 +++++++++++++++
 .../functions/scalar/ArrayPrependFunction.java     | 56 +++++++++++++++
 11 files changed, 392 insertions(+)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 7ebf715e279..7f06199ecb0 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -628,6 +628,9 @@ collection:
   - sql: map ‘[’ value ‘]’
     table: MAP.at(ANY)
     description: Returns the value specified by key value in map.
+  - sql: ARRAY_APPEND(array, element)
+    table: array.arrayAppend(element)
+    description: Appends an element to the end of the array and returns the 
result. If the array itself is null, the function will return null. If an 
element to add is null, the null element will be added to the end of the array.
   - sql: ARRAY_CONTAINS(haystack, needle)
     table: haystack.arrayContains(needle)
     description: Returns whether the given element exists in an array. 
Checking for null elements in the array is supported. If the array itself is 
null, the function will return null. The given element is cast implicitly to 
the array's element type if necessary.
@@ -637,6 +640,9 @@ collection:
   - sql: ARRAY_POSITION(haystack, needle)
     table: haystack.arrayPosition(needle)
     description: Returns the position of the first occurrence of element in 
the given array as int. Returns 0 if the given value could not be found in the 
array. Returns null if either of the arguments are null. And this is not zero 
based, but 1-based index. The first element in the array has index 1.
+  - sql: ARRAY_PREPEND(array, element)
+    table: array.arrayPrepend(element)
+    description: Appends an element to the beginning of the array and returns 
the result. If the array itself is null, the function will return null. If an 
element to add is null, the null element will be added to the beginning of the 
array.
   - sql: ARRAY_REMOVE(haystack, needle)
     table: haystack.arrayRemove(needle)
     description: Removes all elements that equal to element from array. If the 
array itself is null, the function will return null. Keeps ordering of elements.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index b0ba79f12b3..b67ac949835 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -763,6 +763,9 @@ collection:
   - sql: map ‘[’ value ‘]’
     table: MAP.at(ANY)
     description: 返回 map 中指定 key 对应的值。
+  - sql: ARRAY_APPEND(array, element)
+    table: array.arrayAppend(element)
+    description: Appends an element to the end of the array and returns the 
result. If the array itself is null, the function will return null. If an 
element to add is null, the null element will be added to the end of the array.
   - sql: ARRAY_CONTAINS(haystack, needle)
     table: haystack.arrayContains(needle)
     description: 返回是否数组 haystack 中包含指定元素 needle。支持检查数组中是否存在 null。 
如果数组本身是null,函数会返回 null。如果需要,指定元素会隐式转换为数组的元素类型。
@@ -772,6 +775,9 @@ collection:
   - sql: ARRAY_POSITION(haystack, needle)
     table: haystack.arrayPosition(needle)
     description: 返回数组中第一次出现 needle 元素的位置,返回类型为 int。如果数组中不存在该元素则返回 
0。如果两个参数中任何一个参数为 null,则返回 null。序号不是从 0 开始,而是从 1 开始,第一个元素的序号为 1。
+  - sql: ARRAY_PREPEND(array, element)
+    table: array.arrayPrepend(element)
+    description: Appends an element to the beginning of the array and returns 
the result. If the array itself is null, the function will return null. If an 
element to add is null, the null element will be added to the beginning of the 
array.
   - sql: ARRAY_REMOVE(haystack, needle)
     table: haystack.arrayRemove(needle)
     description: 删除数组中所有和元素 needle 相等的元素。如果数组是 null,则返回 null。函数会保留数组中元素的顺序。
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 84772c1739e..c3b8f1b6591 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1474,6 +1474,15 @@ class Expression(Generic[T]):
         """
         return _unary_op("element")(self)
 
+    def array_append(self, addition) -> 'Expression':
+        """
+        Appends an element to the end of the array and returns the result.
+
+        If the array itself is null, the function will return null. If an 
element to add is null,
+        the null element will be added to the end of the array.
+        """
+        return _binary_op("arrayAppend")(self, addition)
+
     def array_contains(self, needle) -> 'Expression':
         """
         Returns whether the given element exists in an array.
@@ -1502,6 +1511,15 @@ class Expression(Generic[T]):
         """
         return _binary_op("arrayPosition")(self, needle)
 
+    def array_prepend(self, addition) -> 'Expression':
+        """
+        Appends an element to the beginning of the array and returns the 
result.
+
+        If the array itself is null, the function will return null. If an 
element to add is null,
+        the null element will be added to the beginning of the array.
+        """
+        return _binary_op("arrayPrepend")(self, addition)
+
     def array_remove(self, needle) -> 'Expression':
         """
         Removes all elements that equal to element from array.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 36da6d54969..8e022179277 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -54,6 +54,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ABS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ACOS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_AGG;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_APPEND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONCAT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_CONTAINS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_DISTINCT;
@@ -61,6 +62,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MAX;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_MIN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_POSITION;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_PREPEND;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REMOVE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_REVERSE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ARRAY_SLICE;
@@ -1355,6 +1357,18 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(ARRAY_ELEMENT, 
toExpr()));
     }
 
+    /**
+     * Appends an element to the end of the array and returns the result.
+     *
+     * <p>If the array itself is null, the function will return null. If an 
element to add is null,
+     * the null element will be added to the end of the array. The given 
element is cast implicitly
+     * to the array's element type if necessary.
+     */
+    public OutType arrayAppend(InType element) {
+        return toApiSpecificExpression(
+                unresolvedCall(ARRAY_APPEND, toExpr(), 
objectToExpression(element)));
+    }
+
     /**
      * Returns whether the given element exists in an array.
      *
@@ -1389,6 +1403,18 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(ARRAY_POSITION, toExpr(), 
objectToExpression(needle)));
     }
 
+    /**
+     * Appends an element to the beginning of the array and returns the result.
+     *
+     * <p>If the array itself is null, the function will return null. If an 
element to add is null,
+     * the null element will be added to the beginning of the array. The given 
element is cast
+     * implicitly to the array's element type if necessary.
+     */
+    public OutType arrayPrepend(InType element) {
+        return toApiSpecificExpression(
+                unresolvedCall(ARRAY_PREPEND, toExpr(), 
objectToExpression(element)));
+    }
+
     /**
      * Removes all elements that equal to element from array.
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 967b55c90c2..c1aad6fe1ad 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -102,6 +102,7 @@ import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTyp
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
 
 /** Dictionary of function definitions for all built-in functions. */
 @PublicEvolving
@@ -215,6 +216,20 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.CoalesceFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition ARRAY_APPEND =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ARRAY_APPEND")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Arrays.asList("array", "element"),
+                                    Arrays.asList(
+                                            logical(LogicalTypeRoot.ARRAY), 
ARRAY_ELEMENT_ARG)))
+                    
.outputTypeStrategy(nullableIfArgs(nullableIfArgs(ARRAY_APPEND_PREPEND)))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ArrayAppendFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition ARRAY_CONTAINS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("ARRAY_CONTAINS")
@@ -277,6 +292,20 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.ArrayPositionFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition ARRAY_PREPEND =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ARRAY_PREPEND")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Arrays.asList("array", "element"),
+                                    Arrays.asList(
+                                            logical(LogicalTypeRoot.ARRAY), 
ARRAY_ELEMENT_ARG)))
+                    .outputTypeStrategy(nullableIfArgs(ARRAY_APPEND_PREPEND))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.ArrayPrependFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition ARRAY_REMOVE =
             BuiltInFunctionDefinition.newBuilder()
                     .name("ARRAY_REMOVE")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java
new file mode 100644
index 00000000000..f56245d9853
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
+
+/**
+ * Type strategy that returns a {@link DataTypes#ARRAY(DataType)} with element 
type equal to the
+ * type of the first argument if it's not nullable or element to add is not 
nullable, otherwise it
+ * returns {@link DataTypes#ARRAY(DataType)} with type equal to the type of 
the element to add to
+ * array.
+ */
+@Internal
+public class ArrayAppendPrependTypeStrategy implements TypeStrategy {
+    @Override
+    public Optional<DataType> inferType(CallContext callContext) {
+        final List<DataType> argumentDataTypes = 
callContext.getArgumentDataTypes();
+        final DataType arrayDataType = argumentDataTypes.get(0);
+        final DataType elementToAddDataType = argumentDataTypes.get(1);
+        final LogicalType arrayElementLogicalType =
+                arrayDataType.getLogicalType().getChildren().get(0);
+        if (elementToAddDataType.getLogicalType().isNullable()
+                && !arrayElementLogicalType.isNullable()) {
+            return Optional.of(
+                    
DataTypes.ARRAY(fromLogicalToDataType(arrayElementLogicalType).nullable()));
+        }
+        return Optional.of(arrayDataType);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index bf9c41d88a2..5d0ed1158c0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -65,6 +65,10 @@ public final class SpecificTypeStrategies {
     public static final TypeStrategy ARRAY_ELEMENT = new 
ArrayElementTypeStrategy();
 
     public static final TypeStrategy ITEM_AT = new ItemAtTypeStrategy();
+
+    /** See {@link ArrayAppendPrependTypeStrategy}. */
+    public static final TypeStrategy ARRAY_APPEND_PREPEND = new 
ArrayAppendPrependTypeStrategy();
+
     /** See {@link GetTypeStrategy}. */
     public static final TypeStrategy GET = new GetTypeStrategy();
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategyTest.java
new file mode 100644
index 00000000000..b0090d0cdf1
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ArrayAppendPrependTypeStrategyTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.TypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+/** Tests for {@link ArrayAppendPrependTypeStrategy}. */
+public class ArrayAppendPrependTypeStrategyTest extends TypeStrategiesTestBase 
{
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                TestSpec.forStrategy(
+                                "Array element is nullable, element to add is 
nullable",
+                                SpecificTypeStrategies.ARRAY_APPEND_PREPEND)
+                        .inputTypes(DataTypes.ARRAY(DataTypes.BIGINT()), 
DataTypes.BIGINT())
+                        .expectDataType(DataTypes.ARRAY(DataTypes.BIGINT())),
+                TestSpec.forStrategy(
+                                "Array element type is non null, element to 
add is nullable",
+                                SpecificTypeStrategies.ARRAY_APPEND_PREPEND)
+                        .inputTypes(
+                                DataTypes.ARRAY(DataTypes.BIGINT().notNull()), 
DataTypes.BIGINT())
+                        .expectDataType(DataTypes.ARRAY(DataTypes.BIGINT())),
+                TestSpec.forStrategy(
+                                "Array element type is nullable, element to 
add is non null",
+                                SpecificTypeStrategies.ARRAY_APPEND_PREPEND)
+                        .inputTypes(
+                                DataTypes.ARRAY(DataTypes.BIGINT()), 
DataTypes.BIGINT().notNull())
+                        .expectDataType(DataTypes.ARRAY(DataTypes.BIGINT())),
+                TestSpec.forStrategy(
+                                "Array element type is non null, element to 
add is non null",
+                                SpecificTypeStrategies.ARRAY_APPEND_PREPEND)
+                        .inputTypes(
+                                DataTypes.ARRAY(DataTypes.BIGINT().notNull()),
+                                DataTypes.BIGINT().notNull())
+                        
.expectDataType(DataTypes.ARRAY(DataTypes.BIGINT().notNull())));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
index 1a5d5723e5c..7d0821661bb 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CollectionFunctionsITCase.java
@@ -41,9 +41,11 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
         return Stream.of(
+                        arrayAppendTestCases(),
                         arrayContainsTestCases(),
                         arrayDistinctTestCases(),
                         arrayPositionTestCases(),
+                        arrayArrayPrependTestCases(),
                         arrayRemoveTestCases(),
                         arrayReverseTestCases(),
                         arrayUnionTestCases(),
@@ -56,6 +58,45 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                 .flatMap(s -> s);
     }
 
+    private Stream<TestSetSpec> arrayAppendTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_APPEND)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2}, null, new String[] 
{"Hello", "World"})
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.STRING().notNull()))
+                        .testResult(
+                                $("f0").arrayAppend(null),
+                                "ARRAY_APPEND(f0, NULL)",
+                                new Integer[] {1, 2, null},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                $("f1").arrayAppend(1),
+                                "ARRAY_APPEND(f1, 1)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()).nullable())
+                        .testResult(
+                                $("f2").arrayAppend("!"),
+                                "ARRAY_APPEND(f2, '!')",
+                                new String[] {"Hello", "World", "!"},
+                                DataTypes.ARRAY(DataTypes.STRING().notNull()))
+                        .testResult(
+                                $("f2").arrayAppend(null),
+                                "ARRAY_APPEND(f2, NULL)",
+                                new String[] {"Hello", "World", null},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testSqlValidationError(
+                                "ARRAY_APPEND(f2, 1)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ARRAY_APPEND(array <ARRAY>, element 
<ARRAY ELEMENT>)")
+                        .testTableApiValidationError(
+                                $("f2").arrayAppend(1),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ARRAY_APPEND(array <ARRAY>, element 
<ARRAY ELEMENT>)"));
+    }
+
     private Stream<TestSetSpec> arrayContainsTestCases() {
         return Stream.of(
                 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_CONTAINS)
@@ -280,6 +321,45 @@ class CollectionFunctionsITCase extends 
BuiltInFunctionTestBase {
                                         + "ARRAY_POSITION(haystack <ARRAY>, 
needle <ARRAY ELEMENT>)"));
     }
 
+    private Stream<TestSetSpec> arrayArrayPrependTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_PREPEND)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2}, null, new String[] 
{"Hello", "World"})
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.STRING().notNull()))
+                        .testResult(
+                                $("f0").arrayPrepend(1),
+                                "ARRAY_PREPEND(f0, 1)",
+                                new Integer[] {1, 1, 2},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                $("f1").arrayPrepend(1),
+                                "ARRAY_PREPEND(f1, 1)",
+                                null,
+                                DataTypes.ARRAY(DataTypes.INT()).nullable())
+                        .testResult(
+                                $("f2").arrayPrepend("!"),
+                                "ARRAY_PREPEND(f2, '!')",
+                                new String[] {"!", "Hello", "World"},
+                                DataTypes.ARRAY(DataTypes.STRING().notNull()))
+                        .testResult(
+                                $("f2").arrayPrepend(null),
+                                "ARRAY_PREPEND(f2, NULL)",
+                                new String[] {null, "Hello", "World"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testSqlValidationError(
+                                "ARRAY_PREPEND(1, f2)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ARRAY_PREPEND(array <ARRAY>, 
element <ARRAY ELEMENT>)")
+                        .testTableApiValidationError(
+                                $("f2").arrayPrepend(1),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ARRAY_PREPEND(array <ARRAY>, 
element <ARRAY ELEMENT>)"));
+    }
+
     private Stream<TestSetSpec> arrayRemoveTestCases() {
         return Stream.of(
                 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.ARRAY_REMOVE)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayAppendFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayAppendFunction.java
new file mode 100644
index 00000000000..2674598342f
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayAppendFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_APPEND}. */
+@Internal
+public class ArrayAppendFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayAppendFunction(SpecializedFunction.SpecializedContext context) 
{
+        super(BuiltInFunctionDefinitions.ARRAY_APPEND, context);
+        final DataType dataType =
+                ((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(@Nullable ArrayData array, Object element) 
{
+        if (array == null) {
+            return null;
+        }
+        final int size = array.size();
+        final Object[] data = new Object[size + 1];
+        for (int pos = 0; pos < size; pos++) {
+            data[pos] = elementGetter.getElementOrNull(array, pos);
+        }
+        data[size] = element;
+        return new GenericArrayData(data);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayPrependFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayPrependFunction.java
new file mode 100644
index 00000000000..bcc35347eb4
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ArrayPrependFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ARRAY_PREPEND}. */
+@Internal
+public class ArrayPrependFunction extends BuiltInScalarFunction {
+    private final ArrayData.ElementGetter elementGetter;
+
+    public ArrayPrependFunction(SpecializedFunction.SpecializedContext 
context) {
+        super(BuiltInFunctionDefinitions.ARRAY_APPEND, context);
+        final DataType dataType =
+                ((CollectionDataType) 
context.getCallContext().getArgumentDataTypes().get(0))
+                        .getElementDataType();
+        elementGetter = 
ArrayData.createElementGetter(dataType.getLogicalType());
+    }
+
+    public @Nullable ArrayData eval(@Nullable ArrayData array, Object element) 
{
+        if (array == null) {
+            return null;
+        }
+        final int size = array.size();
+        final Object[] data = new Object[size + 1];
+        data[0] = element;
+        for (int pos = 0; pos < size; pos++) {
+            data[pos + 1] = elementGetter.getElementOrNull(array, pos);
+        }
+        return new GenericArrayData(data);
+    }
+}

Reply via email to