This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e42d7089b24 [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, 
MAP_FROM_ARRAYS
e42d7089b24 is described below

commit e42d7089b24f4e93e980ce724faf00341660bcd6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 17 00:05:44 2023 +0100

    [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRAYS
---
 docs/data/sql_functions.yml                        |   9 ++
 .../docs/reference/pyflink.table/expressions.rst   |   3 +
 flink-python/pyflink/table/expression.py           |  18 ++++
 flink-python/pyflink/table/expressions.py          |  20 ++++
 .../org/apache/flink/table/api/Expressions.java    |  20 ++++
 .../flink/table/api/internal/BaseExpressions.java  |  12 +++
 .../table/api/ImplicitExpressionConversions.scala  |   5 +
 .../functions/BuiltInFunctionDefinitions.java      |  41 ++++++++
 .../strategies/SpecificInputTypeStrategies.java    |  32 ++++++
 .../table/planner/functions/MapFunctionITCase.java | 114 ++++++++++++++++++++-
 .../functions/scalar/MapFromArraysFunction.java    |  77 ++++++++++++++
 .../runtime/functions/scalar/MapKeysFunction.java  |  43 ++++++++
 .../functions/scalar/MapValuesFunction.java        |  43 ++++++++
 13 files changed, 436 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 16187c64221..7dd6d8d446a 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -634,6 +634,15 @@ collection:
   - sql: ARRAY_DISTINCT(haystack)
     table: haystack.arrayDistinct()
     description: Returns an array with unique elements. If the array itself is 
null, the function will return null. Keeps ordering of elements.
+  - sql: MAP_KEYS(map)
+    table: MAP.mapKeys()
+    description: Returns the keys of the map as array. No order guaranteed.
+  - sql: MAP_VALUES(map)
+    table: MAP.mapValues()
+    description: Returns the values of the map as array. No order guaranteed.
+  - sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values)
+    table: mapFromArrays(array_of_keys, array_of_values)
+    description: Returns a map created from an arrays of keys and values. Note 
that the lengths of two arrays should be the same.
 
 json:
   - sql: IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 1d472834052..981ab77cc23 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -54,6 +54,7 @@ Expressions
     array
     row
     map_
+    map_from_arrays
     row_interval
     pi
     e
@@ -226,6 +227,8 @@ advanced type helper functions
     Expression.element
     Expression.array_contains
     Expression.array_distinct
+    Expression.map_keys
+    Expression.map_values
 
 
 time definition functions
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index d7d9bfdb1a5..34ea0716603 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1487,6 +1487,24 @@ class Expression(Generic[T]):
         """
         return _binary_op("arrayDistinct")(self)
 
+    @property
+    def map_keys(self) -> 'Expression':
+        """
+        Returns the keys of the map as an array. No order guaranteed.
+
+        .. seealso:: :py:attr:`~Expression.map_values`
+        """
+        return _unary_op("mapKeys")(self)
+
+    @property
+    def map_values(self) -> 'Expression':
+        """
+        Returns the values of the map as an array. No order guaranteed.
+
+        .. seealso:: :py:attr:`~Expression.map_keys`
+        """
+        return _unary_op("mapValues")(self)
+
     # ---------------------------- time definition functions 
-----------------------------
 
     @property
diff --git a/flink-python/pyflink/table/expressions.py 
b/flink-python/pyflink/table/expressions.py
index b3cffe347b2..aa4022c14c0 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -483,6 +483,26 @@ def map_(key, value, *tail) -> Expression:
     return _ternary_op("map", key, value, tail)
 
 
+def map_from_arrays(key, value) -> Expression:
+    """
+    Creates a map from an array of keys and an array of values.
+
+    Example:
+    ::
+
+        >>> tab.select(
+        >>>     map_from_arrays(
+        >>>         array("key1", "key2", "key3"),
+        >>>         array(1, 2, 3)
+        >>>     ))
+
+    .. note::
+
+        both arrays should have the same length.
+    """
+    return _binary_op("mapFromArrays", key, value)
+
+
 def row_interval(rows: int) -> Expression:
     """
     Creates an interval of rows.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 44f76ad627b..d72ca72e04e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -533,6 +533,26 @@ public final class Expressions {
         return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.MAP, key, 
value, tail);
     }
 
+    /**
+     * Creates a map from an array of keys and an array of values.
+     *
+     * <pre>{@code
+     * table.select(
+     *     mapFromArrays(
+     *         array("key1", "key2", "key3"),
+     *         array(1, 2, 3)
+     *     ))
+     * }</pre>
+     *
+     * <p>Note both arrays should have the same length.
+     */
+    public static ApiExpression mapFromArrays(Object key, Object value) {
+        return apiCall(
+                BuiltInFunctionDefinitions.MAP_FROM_ARRAYS,
+                objectToExpression(key),
+                objectToExpression(value));
+    }
+
     /**
      * Creates an interval of rows.
      *
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 8e458cc3ad8..30899f49b68 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -118,6 +118,8 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOG2;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LPAD;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_VALUES;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAX;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MD5;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MIN;
@@ -1359,6 +1361,16 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT, 
toExpr()));
     }
 
+    /** Returns the keys of the map as an array. */
+    public OutType mapKeys() {
+        return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
+    }
+
+    /** Returns the values of the map as an array. */
+    public OutType mapValues() {
+        return toApiSpecificExpression(unresolvedCall(MAP_VALUES, toExpr()));
+    }
+
     // Time definition
 
     /**
diff --git 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index b4282f09cfb..b70be995a98 100644
--- 
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ 
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -647,6 +647,11 @@ trait ImplicitExpressionConversions {
     Expressions.map(key, value, tail: _*)
   }
 
+  /** Creates a map from an array of keys and an array of values. */
+  def mapFromArrays(key: Expression, value: Expression): Expression = {
+    Expressions.mapFromArrays(key, value)
+  }
+
   /** Returns a value that is closer than any other value to pi. */
   def pi(): Expression = {
     Expressions.pi()
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 7b110e87685..7cd7800a4f0 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -130,6 +130,47 @@ public final class BuiltInFunctionDefinitions {
                     
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.IfNullFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition MAP_KEYS =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("MAP_KEYS")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    new String[] {"input"},
+                                    new ArgumentTypeStrategy[] 
{logical(LogicalTypeRoot.MAP)}))
+                    
.outputTypeStrategy(nullableIfArgs(SpecificInputTypeStrategies.MAP_KEYS))
+                    
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.MapKeysFunction")
+                    .build();
+
+    public static final BuiltInFunctionDefinition MAP_VALUES =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("MAP_VALUES")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    new String[] {"input"},
+                                    new ArgumentTypeStrategy[] 
{logical(LogicalTypeRoot.MAP)}))
+                    
.outputTypeStrategy(nullableIfArgs(SpecificInputTypeStrategies.MAP_VALUES))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.MapValuesFunction")
+                    .build();
+
+    public static final BuiltInFunctionDefinition MAP_FROM_ARRAYS =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("MAP_FROM_ARRAYS")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    new String[] {"keysArray", "valuesArray"},
+                                    new ArgumentTypeStrategy[] {
+                                        logical(LogicalTypeRoot.ARRAY),
+                                        logical(LogicalTypeRoot.ARRAY)
+                                    }))
+                    
.outputTypeStrategy(nullableIfArgs(SpecificInputTypeStrategies.ARRAYS_FOR_MAP))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.MapFromArraysFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition SOURCE_WATERMARK =
             BuiltInFunctionDefinition.newBuilder()
                     .name("SOURCE_WATERMARK")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index f31be13145f..3657e7491ca 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -19,16 +19,22 @@
 package org.apache.flink.table.types.inference.strategies;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.JsonOnNull;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.KeyValueDataType;
 import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
 import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategies;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.TypeStrategy;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.StructuredType;
 
+import java.util.Optional;
+
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
 import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
@@ -120,6 +126,32 @@ public final class SpecificInputTypeStrategies {
     public static final InputTypeStrategy TWO_EQUALS_COMPARABLE =
             comparable(ConstantArgumentCount.of(2), 
StructuredType.StructuredComparison.EQUALS);
 
+    /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#MAP_KEYS}. */
+    public static final TypeStrategy MAP_KEYS =
+            callContext ->
+                    Optional.of(
+                            DataTypes.ARRAY(
+                                    ((KeyValueDataType) 
callContext.getArgumentDataTypes().get(0))
+                                            .getKeyDataType()));
+
+    /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#MAP_VALUES}. */
+    public static final TypeStrategy MAP_VALUES =
+            callContext ->
+                    Optional.of(
+                            DataTypes.ARRAY(
+                                    ((KeyValueDataType) 
callContext.getArgumentDataTypes().get(0))
+                                            .getValueDataType()));
+
+    /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#MAP_FROM_ARRAYS}. */
+    public static final TypeStrategy ARRAYS_FOR_MAP =
+            callContext ->
+                    Optional.of(
+                            DataTypes.MAP(
+                                    ((CollectionDataType) 
callContext.getArgumentDataTypes().get(0))
+                                            .getElementDataType(),
+                                    ((CollectionDataType) 
callContext.getArgumentDataTypes().get(1))
+                                            .getElementDataType()));
+
     private SpecificInputTypeStrategies() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
index b02a78472fb..20906af91f8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
@@ -28,6 +28,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.Period;
 import java.util.Collections;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -44,7 +45,9 @@ import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
 import static org.apache.flink.table.api.Expressions.map;
+import static org.apache.flink.table.api.Expressions.mapFromArrays;
 import static org.apache.flink.util.CollectionUtil.entry;
 
 /** Test {@link BuiltInFunctionDefinitions#MAP} and its return type. */
@@ -205,6 +208,115 @@ public class MapFunctionITCase extends 
BuiltInFunctionTestBase {
                                         DataTypes.MAP(
                                                         STRING().notNull(),
                                                         
INTERVAL(MONTH()).nullable())
-                                                .notNull())));
+                                                .notNull())),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_KEYS)
+                        .onFieldsWithData(
+                                null,
+                                "item",
+                                Collections.singletonMap(1, "value"),
+                                Collections.singletonMap(new Integer[] {1, 2}, 
"value"))
+                        .andDataTypes(
+                                DataTypes.BOOLEAN().nullable(),
+                                DataTypes.STRING(),
+                                DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
+                                
DataTypes.MAP(DataTypes.ARRAY(DataTypes.INT()), DataTypes.STRING()))
+                        .testTableApiValidationError(
+                                call("MAP_KEYS", $("f0"), $("f1")),
+                                "Invalid function call:\nMAP_KEYS(BOOLEAN, 
STRING)")
+                        .testResult(
+                                map(
+                                                
$("f0").cast(DataTypes.BOOLEAN()),
+                                                
$("f1").cast(DataTypes.STRING()))
+                                        .mapKeys(),
+                                "MAP_KEYS(MAP[CAST(f0 AS BOOLEAN), CAST(f1 AS 
STRING)])",
+                                new Boolean[] {null},
+                                DataTypes.ARRAY(DataTypes.BOOLEAN()).notNull())
+                        .testResult(
+                                $("f2").mapKeys(),
+                                "MAP_KEYS(f2)",
+                                new Integer[] {1},
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testResult(
+                                $("f3").mapKeys(),
+                                "MAP_KEYS(f3)",
+                                new Integer[][] {new Integer[] {1, 2}},
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_VALUES)
+                        .onFieldsWithData(
+                                null,
+                                "item",
+                                Collections.singletonMap(1, "value1"),
+                                Collections.singletonMap(
+                                        3, Collections.singletonMap(true, 
"value2")))
+                        .andDataTypes(
+                                DataTypes.BOOLEAN().nullable(),
+                                DataTypes.STRING(),
+                                DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
+                                DataTypes.MAP(
+                                        DataTypes.INT(),
+                                        DataTypes.MAP(DataTypes.BOOLEAN(), 
DataTypes.STRING())))
+                        .testTableApiValidationError(
+                                call("MAP_VALUES", $("f0"), $("f1")),
+                                "Invalid function call:\nMAP_VALUES(BOOLEAN, 
STRING)")
+                        .testResult(
+                                map(
+                                                
$("f1").cast(DataTypes.STRING()),
+                                                
$("f0").cast(DataTypes.BOOLEAN()))
+                                        .mapValues(),
+                                "MAP_VALUES(MAP[CAST(f1 AS STRING), CAST(f0 AS 
BOOLEAN)])",
+                                new Boolean[] {null},
+                                DataTypes.ARRAY(DataTypes.BOOLEAN()).notNull())
+                        .testResult(
+                                $("f2").mapValues(),
+                                "MAP_VALUES(f2)",
+                                new String[] {"value1"},
+                                DataTypes.ARRAY(DataTypes.STRING()))
+                        .testResult(
+                                $("f3").mapValues(),
+                                "MAP_VALUES(f3)",
+                                new Map[] {Collections.singletonMap(true, 
"value2")},
+                                DataTypes.ARRAY(
+                                        DataTypes.MAP(DataTypes.BOOLEAN(), 
DataTypes.STRING()))),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_FROM_ARRAYS, "Invalid 
input")
+                        .onFieldsWithData(null, null, new Integer[] {1}, new 
Integer[] {1, 2})
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.BOOLEAN()),
+                                DataTypes.ARRAY(DataTypes.STRING()),
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.INT()))
+                        .testTableApiRuntimeError(
+                                mapFromArrays($("f2"), $("f3")),
+                                "Invalid function MAP_FROM_ARRAYS call:\n"
+                                        + "The length of the keys array 1 is 
not equal to the length of the values array 2")
+                        .testSqlRuntimeError(
+                                "MAP_FROM_ARRAYS(array[1, 2, 3], array[1, 2])",
+                                "Invalid function MAP_FROM_ARRAYS call:\n"
+                                        + "The length of the keys array 3 is 
not equal to the length of the values array 2")
+                        .testResult(
+                                mapFromArrays($("f0"), $("f1")),
+                                "MAP_FROM_ARRAYS(f0, f1)",
+                                null,
+                                DataTypes.MAP(DataTypes.BOOLEAN(), 
DataTypes.STRING())),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_FROM_ARRAYS)
+                        .onFieldsWithData(
+                                new Integer[] {1, 2},
+                                new String[] {"one", "two"},
+                                new Integer[][] {new Integer[] {1, 2}, new 
Integer[] {3, 4}})
+                        .andDataTypes(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                DataTypes.ARRAY(DataTypes.STRING()),
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+                        .testResult(
+                                mapFromArrays($("f0"), $("f1")),
+                                "MAP_FROM_ARRAYS(f0, f1)",
+                                CollectionUtil.map(entry(1, "one"), entry(2, 
"two")),
+                                DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()))
+                        .testTableApiResult(
+                                mapFromArrays($("f1"), $("f2")),
+                                CollectionUtil.map(
+                                        entry("one", new Integer[] {1, 2}),
+                                        entry("two", new Integer[] {3, 4})),
+                                DataTypes.MAP(
+                                        DataTypes.STRING(), 
DataTypes.ARRAY(DataTypes.INT()))));
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromArraysFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromArraysFunction.java
new file mode 100644
index 00000000000..2fa789154f7
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromArraysFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_FROM_ARRAYS}. */
+@Internal
+public class MapFromArraysFunction extends BuiltInScalarFunction {
+    public MapFromArraysFunction(SpecializedFunction.SpecializedContext 
context) {
+        super(BuiltInFunctionDefinitions.MAP_FROM_ARRAYS, context);
+    }
+
+    public @Nullable MapData eval(@Nullable ArrayData keysArray, @Nullable 
ArrayData valuesArray) {
+        if (keysArray == null || valuesArray == null) {
+            return null;
+        }
+
+        if (keysArray.size() != valuesArray.size()) {
+            throw new FlinkRuntimeException(
+                    "Invalid function MAP_FROM_ARRAYS call:\n"
+                            + "The length of the keys array "
+                            + keysArray.size()
+                            + " is not equal to the length of the values array 
"
+                            + valuesArray.size());
+        }
+        return new MapDataForMapFromArrays(keysArray, valuesArray);
+    }
+
+    private static class MapDataForMapFromArrays implements MapData {
+        private final ArrayData keyArray;
+        private final ArrayData valueArray;
+
+        public MapDataForMapFromArrays(ArrayData keyArray, ArrayData 
valueArray) {
+            this.keyArray = keyArray;
+            this.valueArray = valueArray;
+        }
+
+        @Override
+        public int size() {
+            return keyArray.size();
+        }
+
+        @Override
+        public ArrayData keyArray() {
+            return keyArray;
+        }
+
+        @Override
+        public ArrayData valueArray() {
+            return valueArray;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapKeysFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapKeysFunction.java
new file mode 100644
index 00000000000..00fc32d3b63
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapKeysFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_KEYS}. */
+@Internal
+public class MapKeysFunction extends BuiltInScalarFunction {
+    public MapKeysFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.MAP_KEYS, context);
+    }
+
+    public @Nullable ArrayData eval(@Nullable MapData input) {
+        if (input == null) {
+            return null;
+        }
+
+        return input.keyArray();
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapValuesFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapValuesFunction.java
new file mode 100644
index 00000000000..d2c58c482f0
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapValuesFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_VALUES}. */
+@Internal
+public class MapValuesFunction extends BuiltInScalarFunction {
+    public MapValuesFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.MAP_VALUES, context);
+    }
+
+    public @Nullable ArrayData eval(@Nullable MapData input) {
+        if (input == null) {
+            return null;
+        }
+
+        return input.valueArray();
+    }
+}

Reply via email to