This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e42d7089b24 [FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES,
MAP_FROM_ARRAYS
e42d7089b24 is described below
commit e42d7089b24f4e93e980ce724faf00341660bcd6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 17 00:05:44 2023 +0100
[FLINK-22484][table] Add built-in MAP_KEYS, MAP_VALUES, MAP_FROM_ARRAYS
---
docs/data/sql_functions.yml | 9 ++
.../docs/reference/pyflink.table/expressions.rst | 3 +
flink-python/pyflink/table/expression.py | 18 ++++
flink-python/pyflink/table/expressions.py | 20 ++++
.../org/apache/flink/table/api/Expressions.java | 20 ++++
.../flink/table/api/internal/BaseExpressions.java | 12 +++
.../table/api/ImplicitExpressionConversions.scala | 5 +
.../functions/BuiltInFunctionDefinitions.java | 41 ++++++++
.../strategies/SpecificInputTypeStrategies.java | 32 ++++++
.../table/planner/functions/MapFunctionITCase.java | 114 ++++++++++++++++++++-
.../functions/scalar/MapFromArraysFunction.java | 77 ++++++++++++++
.../runtime/functions/scalar/MapKeysFunction.java | 43 ++++++++
.../functions/scalar/MapValuesFunction.java | 43 ++++++++
13 files changed, 436 insertions(+), 1 deletion(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 16187c64221..7dd6d8d446a 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -634,6 +634,15 @@ collection:
- sql: ARRAY_DISTINCT(haystack)
table: haystack.arrayDistinct()
description: Returns an array with unique elements. If the array itself is
null, the function will return null. Keeps ordering of elements.
+ - sql: MAP_KEYS(map)
+ table: MAP.mapKeys()
+ description: Returns the keys of the map as array. No order guaranteed.
+ - sql: MAP_VALUES(map)
+ table: MAP.mapValues()
+ description: Returns the values of the map as array. No order guaranteed.
+ - sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values)
+ table: mapFromArrays(array_of_keys, array_of_values)
+ description: Returns a map created from an arrays of keys and values. Note
that the lengths of two arrays should be the same.
json:
- sql: IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 1d472834052..981ab77cc23 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -54,6 +54,7 @@ Expressions
array
row
map_
+ map_from_arrays
row_interval
pi
e
@@ -226,6 +227,8 @@ advanced type helper functions
Expression.element
Expression.array_contains
Expression.array_distinct
+ Expression.map_keys
+ Expression.map_values
time definition functions
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index d7d9bfdb1a5..34ea0716603 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1487,6 +1487,24 @@ class Expression(Generic[T]):
"""
return _binary_op("arrayDistinct")(self)
+ @property
+ def map_keys(self) -> 'Expression':
+ """
+ Returns the keys of the map as an array. No order guaranteed.
+
+ .. seealso:: :py:attr:`~Expression.map_values`
+ """
+ return _unary_op("mapKeys")(self)
+
+ @property
+ def map_values(self) -> 'Expression':
+ """
+ Returns the values of the map as an array. No order guaranteed.
+
+ .. seealso:: :py:attr:`~Expression.map_keys`
+ """
+ return _unary_op("mapValues")(self)
+
# ---------------------------- time definition functions
-----------------------------
@property
diff --git a/flink-python/pyflink/table/expressions.py
b/flink-python/pyflink/table/expressions.py
index b3cffe347b2..aa4022c14c0 100644
--- a/flink-python/pyflink/table/expressions.py
+++ b/flink-python/pyflink/table/expressions.py
@@ -483,6 +483,26 @@ def map_(key, value, *tail) -> Expression:
return _ternary_op("map", key, value, tail)
+def map_from_arrays(key, value) -> Expression:
+ """
+ Creates a map from an array of keys and an array of values.
+
+ Example:
+ ::
+
+ >>> tab.select(
+ >>> map_from_arrays(
+ >>> array("key1", "key2", "key3"),
+ >>> array(1, 2, 3)
+ >>> ))
+
+ .. note::
+
+ both arrays should have the same length.
+ """
+ return _binary_op("mapFromArrays", key, value)
+
+
def row_interval(rows: int) -> Expression:
"""
Creates an interval of rows.
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
index 44f76ad627b..d72ca72e04e 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
@@ -533,6 +533,26 @@ public final class Expressions {
return apiCallAtLeastTwoArgument(BuiltInFunctionDefinitions.MAP, key,
value, tail);
}
+ /**
+ * Creates a map from an array of keys and an array of values.
+ *
+ * <pre>{@code
+ * table.select(
+ * mapFromArrays(
+ * array("key1", "key2", "key3"),
+ * array(1, 2, 3)
+ * ))
+ * }</pre>
+ *
+ * <p>Note both arrays should have the same length.
+ */
+ public static ApiExpression mapFromArrays(Object key, Object value) {
+ return apiCall(
+ BuiltInFunctionDefinitions.MAP_FROM_ARRAYS,
+ objectToExpression(key),
+ objectToExpression(value));
+ }
+
/**
* Creates an interval of rows.
*
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 8e458cc3ad8..30899f49b68 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -118,6 +118,8 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOG2;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LPAD;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_VALUES;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MD5;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MIN;
@@ -1359,6 +1361,16 @@ public abstract class BaseExpressions<InType, OutType> {
return toApiSpecificExpression(unresolvedCall(ARRAY_DISTINCT,
toExpr()));
}
+ /** Returns the keys of the map as an array. */
+ public OutType mapKeys() {
+ return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr()));
+ }
+
+ /** Returns the values of the map as an array. */
+ public OutType mapValues() {
+ return toApiSpecificExpression(unresolvedCall(MAP_VALUES, toExpr()));
+ }
+
// Time definition
/**
diff --git
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index b4282f09cfb..b70be995a98 100644
---
a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++
b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -647,6 +647,11 @@ trait ImplicitExpressionConversions {
Expressions.map(key, value, tail: _*)
}
+ /** Creates a map from an array of keys and an array of values. */
+ def mapFromArrays(key: Expression, value: Expression): Expression = {
+ Expressions.mapFromArrays(key, value)
+ }
+
/** Returns a value that is closer than any other value to pi. */
def pi(): Expression = {
Expressions.pi()
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 7b110e87685..7cd7800a4f0 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -130,6 +130,47 @@ public final class BuiltInFunctionDefinitions {
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.IfNullFunction")
.build();
+ public static final BuiltInFunctionDefinition MAP_KEYS =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("MAP_KEYS")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+ new String[] {"input"},
+ new ArgumentTypeStrategy[]
{logical(LogicalTypeRoot.MAP)}))
+
.outputTypeStrategy(nullableIfArgs(SpecificInputTypeStrategies.MAP_KEYS))
+
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.MapKeysFunction")
+ .build();
+
+ public static final BuiltInFunctionDefinition MAP_VALUES =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("MAP_VALUES")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+ new String[] {"input"},
+ new ArgumentTypeStrategy[]
{logical(LogicalTypeRoot.MAP)}))
+
.outputTypeStrategy(nullableIfArgs(SpecificInputTypeStrategies.MAP_VALUES))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.MapValuesFunction")
+ .build();
+
+ public static final BuiltInFunctionDefinition MAP_FROM_ARRAYS =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("MAP_FROM_ARRAYS")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+ new String[] {"keysArray", "valuesArray"},
+ new ArgumentTypeStrategy[] {
+ logical(LogicalTypeRoot.ARRAY),
+ logical(LogicalTypeRoot.ARRAY)
+ }))
+
.outputTypeStrategy(nullableIfArgs(SpecificInputTypeStrategies.ARRAYS_FOR_MAP))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.MapFromArraysFunction")
+ .build();
+
public static final BuiltInFunctionDefinition SOURCE_WATERMARK =
BuiltInFunctionDefinition.newBuilder()
.name("SOURCE_WATERMARK")
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index f31be13145f..3657e7491ca 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -19,16 +19,22 @@
package org.apache.flink.table.types.inference.strategies;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.JsonOnNull;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
import org.apache.flink.table.types.inference.ConstantArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategies;
import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.StructuredType;
+import java.util.Optional;
+
import static
org.apache.flink.table.types.inference.InputTypeStrategies.LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static
org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
@@ -120,6 +126,32 @@ public final class SpecificInputTypeStrategies {
public static final InputTypeStrategy TWO_EQUALS_COMPARABLE =
comparable(ConstantArgumentCount.of(2),
StructuredType.StructuredComparison.EQUALS);
+ /** Type strategy specific for {@link
BuiltInFunctionDefinitions#MAP_KEYS}. */
+ public static final TypeStrategy MAP_KEYS =
+ callContext ->
+ Optional.of(
+ DataTypes.ARRAY(
+ ((KeyValueDataType)
callContext.getArgumentDataTypes().get(0))
+ .getKeyDataType()));
+
+ /** Type strategy specific for {@link
BuiltInFunctionDefinitions#MAP_VALUES}. */
+ public static final TypeStrategy MAP_VALUES =
+ callContext ->
+ Optional.of(
+ DataTypes.ARRAY(
+ ((KeyValueDataType)
callContext.getArgumentDataTypes().get(0))
+ .getValueDataType()));
+
+ /** Type strategy specific for {@link
BuiltInFunctionDefinitions#MAP_FROM_ARRAYS}. */
+ public static final TypeStrategy ARRAYS_FOR_MAP =
+ callContext ->
+ Optional.of(
+ DataTypes.MAP(
+ ((CollectionDataType)
callContext.getArgumentDataTypes().get(0))
+ .getElementDataType(),
+ ((CollectionDataType)
callContext.getArgumentDataTypes().get(1))
+ .getElementDataType()));
+
private SpecificInputTypeStrategies() {
// no instantiation
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
index b02a78472fb..20906af91f8 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java
@@ -28,6 +28,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.Period;
import java.util.Collections;
+import java.util.Map;
import java.util.stream.Stream;
import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -44,7 +45,9 @@ import static org.apache.flink.table.api.DataTypes.STRING;
import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.map;
+import static org.apache.flink.table.api.Expressions.mapFromArrays;
import static org.apache.flink.util.CollectionUtil.entry;
/** Test {@link BuiltInFunctionDefinitions#MAP} and its return type. */
@@ -205,6 +208,115 @@ public class MapFunctionITCase extends
BuiltInFunctionTestBase {
DataTypes.MAP(
STRING().notNull(),
INTERVAL(MONTH()).nullable())
- .notNull())));
+ .notNull())),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_KEYS)
+ .onFieldsWithData(
+ null,
+ "item",
+ Collections.singletonMap(1, "value"),
+ Collections.singletonMap(new Integer[] {1, 2},
"value"))
+ .andDataTypes(
+ DataTypes.BOOLEAN().nullable(),
+ DataTypes.STRING(),
+ DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()),
+
DataTypes.MAP(DataTypes.ARRAY(DataTypes.INT()), DataTypes.STRING()))
+ .testTableApiValidationError(
+ call("MAP_KEYS", $("f0"), $("f1")),
+ "Invalid function call:\nMAP_KEYS(BOOLEAN,
STRING)")
+ .testResult(
+ map(
+
$("f0").cast(DataTypes.BOOLEAN()),
+
$("f1").cast(DataTypes.STRING()))
+ .mapKeys(),
+ "MAP_KEYS(MAP[CAST(f0 AS BOOLEAN), CAST(f1 AS
STRING)])",
+ new Boolean[] {null},
+ DataTypes.ARRAY(DataTypes.BOOLEAN()).notNull())
+ .testResult(
+ $("f2").mapKeys(),
+ "MAP_KEYS(f2)",
+ new Integer[] {1},
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testResult(
+ $("f3").mapKeys(),
+ "MAP_KEYS(f3)",
+ new Integer[][] {new Integer[] {1, 2}},
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT()))),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_VALUES)
+ .onFieldsWithData(
+ null,
+ "item",
+ Collections.singletonMap(1, "value1"),
+ Collections.singletonMap(
+ 3, Collections.singletonMap(true,
"value2")))
+ .andDataTypes(
+ DataTypes.BOOLEAN().nullable(),
+ DataTypes.STRING(),
+ DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()),
+ DataTypes.MAP(
+ DataTypes.INT(),
+ DataTypes.MAP(DataTypes.BOOLEAN(),
DataTypes.STRING())))
+ .testTableApiValidationError(
+ call("MAP_VALUES", $("f0"), $("f1")),
+ "Invalid function call:\nMAP_VALUES(BOOLEAN,
STRING)")
+ .testResult(
+ map(
+
$("f1").cast(DataTypes.STRING()),
+
$("f0").cast(DataTypes.BOOLEAN()))
+ .mapValues(),
+ "MAP_VALUES(MAP[CAST(f1 AS STRING), CAST(f0 AS
BOOLEAN)])",
+ new Boolean[] {null},
+ DataTypes.ARRAY(DataTypes.BOOLEAN()).notNull())
+ .testResult(
+ $("f2").mapValues(),
+ "MAP_VALUES(f2)",
+ new String[] {"value1"},
+ DataTypes.ARRAY(DataTypes.STRING()))
+ .testResult(
+ $("f3").mapValues(),
+ "MAP_VALUES(f3)",
+ new Map[] {Collections.singletonMap(true,
"value2")},
+ DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.BOOLEAN(),
DataTypes.STRING()))),
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_FROM_ARRAYS, "Invalid
input")
+ .onFieldsWithData(null, null, new Integer[] {1}, new
Integer[] {1, 2})
+ .andDataTypes(
+ DataTypes.ARRAY(DataTypes.BOOLEAN()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.INT()))
+ .testTableApiRuntimeError(
+ mapFromArrays($("f2"), $("f3")),
+ "Invalid function MAP_FROM_ARRAYS call:\n"
+ + "The length of the keys array 1 is
not equal to the length of the values array 2")
+ .testSqlRuntimeError(
+ "MAP_FROM_ARRAYS(array[1, 2, 3], array[1, 2])",
+ "Invalid function MAP_FROM_ARRAYS call:\n"
+ + "The length of the keys array 3 is
not equal to the length of the values array 2")
+ .testResult(
+ mapFromArrays($("f0"), $("f1")),
+ "MAP_FROM_ARRAYS(f0, f1)",
+ null,
+ DataTypes.MAP(DataTypes.BOOLEAN(),
DataTypes.STRING())),
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAP_FROM_ARRAYS)
+ .onFieldsWithData(
+ new Integer[] {1, 2},
+ new String[] {"one", "two"},
+ new Integer[][] {new Integer[] {1, 2}, new
Integer[] {3, 4}})
+ .andDataTypes(
+ DataTypes.ARRAY(DataTypes.INT()),
+ DataTypes.ARRAY(DataTypes.STRING()),
+
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.INT())))
+ .testResult(
+ mapFromArrays($("f0"), $("f1")),
+ "MAP_FROM_ARRAYS(f0, f1)",
+ CollectionUtil.map(entry(1, "one"), entry(2,
"two")),
+ DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()))
+ .testTableApiResult(
+ mapFromArrays($("f1"), $("f2")),
+ CollectionUtil.map(
+ entry("one", new Integer[] {1, 2}),
+ entry("two", new Integer[] {3, 4})),
+ DataTypes.MAP(
+ DataTypes.STRING(),
DataTypes.ARRAY(DataTypes.INT()))));
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromArraysFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromArraysFunction.java
new file mode 100644
index 00000000000..2fa789154f7
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapFromArraysFunction.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_FROM_ARRAYS}. */
+@Internal
+public class MapFromArraysFunction extends BuiltInScalarFunction {
+ public MapFromArraysFunction(SpecializedFunction.SpecializedContext
context) {
+ super(BuiltInFunctionDefinitions.MAP_FROM_ARRAYS, context);
+ }
+
+ public @Nullable MapData eval(@Nullable ArrayData keysArray, @Nullable
ArrayData valuesArray) {
+ if (keysArray == null || valuesArray == null) {
+ return null;
+ }
+
+ if (keysArray.size() != valuesArray.size()) {
+ throw new FlinkRuntimeException(
+ "Invalid function MAP_FROM_ARRAYS call:\n"
+ + "The length of the keys array "
+ + keysArray.size()
+ + " is not equal to the length of the values array
"
+ + valuesArray.size());
+ }
+ return new MapDataForMapFromArrays(keysArray, valuesArray);
+ }
+
+ private static class MapDataForMapFromArrays implements MapData {
+ private final ArrayData keyArray;
+ private final ArrayData valueArray;
+
+ public MapDataForMapFromArrays(ArrayData keyArray, ArrayData
valueArray) {
+ this.keyArray = keyArray;
+ this.valueArray = valueArray;
+ }
+
+ @Override
+ public int size() {
+ return keyArray.size();
+ }
+
+ @Override
+ public ArrayData keyArray() {
+ return keyArray;
+ }
+
+ @Override
+ public ArrayData valueArray() {
+ return valueArray;
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapKeysFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapKeysFunction.java
new file mode 100644
index 00000000000..00fc32d3b63
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapKeysFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_KEYS}. */
+@Internal
+public class MapKeysFunction extends BuiltInScalarFunction {
+ public MapKeysFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.MAP_KEYS, context);
+ }
+
+ public @Nullable ArrayData eval(@Nullable MapData input) {
+ if (input == null) {
+ return null;
+ }
+
+ return input.keyArray();
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapValuesFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapValuesFunction.java
new file mode 100644
index 00000000000..d2c58c482f0
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MapValuesFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#MAP_VALUES}. */
+@Internal
+public class MapValuesFunction extends BuiltInScalarFunction {
+ public MapValuesFunction(SpecializedFunction.SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.MAP_VALUES, context);
+ }
+
+ public @Nullable ArrayData eval(@Nullable MapData input) {
+ if (input == null) {
+ return null;
+ }
+
+ return input.valueArray();
+ }
+}