This is an automated email from the ASF dual-hosted git repository.
yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 80e26ca9828 [FLINK-37924][table]Introduce Built-in Function to Access
field or element in the Variant (#27330)
80e26ca9828 is described below
commit 80e26ca982872b2eae6266816ccfd6007de17b08
Author: Jinkun Liu <[email protected]>
AuthorDate: Mon Feb 2 09:37:28 2026 +0800
[FLINK-37924][table]Introduce Built-in Function to Access field or element
in the Variant (#27330)
---
docs/data/sql_functions.yml | 10 +
docs/data/sql_functions_zh.yml | 11 +-
.../apache/calcite/sql/fun/SqlItemOperator.java | 221 +++++++++++++++++++++
.../functions/BuiltInFunctionDefinitions.java | 3 +-
.../ItemAtIndexArgumentTypeStrategy.java | 31 ++-
.../inference/strategies/ItemAtTypeStrategy.java | 20 +-
.../ItemAtIndexArgumentTypeStrategyTest.java | 33 ++-
.../table/planner/codegen/ExprCodeGenerator.scala | 6 +-
.../planner/codegen/calls/ScalarOperatorGens.scala | 94 +++++++++
.../nodes/exec/stream/VariantSemanticTest.java | 220 +++++++++++++++++++-
.../table/runtime/typeutils/TypeCheckUtils.java | 4 +
11 files changed, 635 insertions(+), 18 deletions(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 83bc11abe44..0bf149795e9 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -1167,6 +1167,16 @@ variant:
parser will keep the last occurrence of all fields with the same key,
otherwise when
`allowDuplicateKeys` is false it will throw an error. The default value
of
`allowDuplicateKeys` is false.
+ - sql: variant '[' INT ']'
+ table: VARIANT.at(INT)
+ description: |
+ If the VARIANT is an ARRAY value, returns a VARIANT whose value is the
element at
+ the specified index. The index starts from 1, If the index is out of
range, it returns NULL.
+ - sql: variant '[' STRING ']'
+ table: VARIANT.at(STRING)
+ description: |
+ If the VARIANT is a MAP value that has an element with this key, a
VARIANT holding
+ the associated value is returned. Otherwise, NULL is returned.
valueconstruction:
- sql: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 8a5c772d215..9d2541babe6 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -1241,7 +1241,6 @@ variant:
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
allowDuplicateKeys 的值为 false。
-
- sql: TRY_PARSE_JSON(json_string[, allow_duplicate_keys])
description: |
尽可能将 JSON 字符串解析为 Variant。如果 JSON 字符串无效,则返回 NULL。如果希望抛出错误而不是返回 NULL,
@@ -1251,6 +1250,16 @@ variant:
同键的字段,否则当 allowDuplicateKeys 为 false 时,它会抛出一个错误。默认情况下,
allowDuplicateKeys 的值为 false。
+ - sql: variant '[' INT ']'
+ table: VARIANT.at(INT)
+ description: |
+ 如果这是一个 ARRAY 类型的 VARIANT,则返回一个 VARIANT,其值为指定索引处的元素。索引从 1 开始,如果索引超出范围,则返回
NULL。
+
+ - sql: variant '[' STRING ']'
+ table: VARIANT.at(STRING)
+ description: |
+ 如果这是一个 MAP 类型的 VARIANT,则返回一个 VARIANT,其值为指定 key 对应的值。否则返回 NULL。
+
valueconstruction:
- sql: |
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/fun/SqlItemOperator.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/fun/SqlItemOperator.java
new file mode 100644
index 00000000000..fad1d6dfee7
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/fun/SqlItemOperator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.fun;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlSingleOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+
+import java.util.Arrays;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.calcite.sql.type.NonNullableAccessors.getComponentTypeOrThrow;
+import static
org.apache.calcite.sql.validate.SqlNonNullableAccessors.getOperandLiteralValueOrThrow;
+
+/**
+ * The item operator {@code [ ... ]}, used to access a given element of an
array, map or struct. For
+ * example, {@code myArray[3]}, {@code "myMap['foo']"}, {@code myStruct[2]} or
{@code
+ * myStruct['fieldName']}.
+ *
+ * <p>This class was copied over from Calcite 1.39.0 version to support access
variant
+ * (FLINK-37924).
+ *
+ * <p>Line 148 ~ 153, CALCITE-7325, should be removed after upgrading Calcite
to 1.42.0.
+ */
+public class SqlItemOperator extends SqlSpecialOperator {
+ public final int offset;
+ public final boolean safe;
+
+ public SqlItemOperator(
+ String name, SqlSingleOperandTypeChecker operandTypeChecker, int
offset, boolean safe) {
+ super(name, SqlKind.ITEM, 100, true, null, null, operandTypeChecker);
+ this.offset = offset;
+ this.safe = safe;
+ }
+
+ @Override
+ public ReduceResult reduceExpr(int ordinal, TokenSequence list) {
+ SqlNode left = list.node(ordinal - 1);
+ SqlNode right = list.node(ordinal + 1);
+ return new ReduceResult(
+ ordinal - 1,
+ ordinal + 2,
+ createCall(
+ SqlParserPos.sum(
+ Arrays.asList(
+ requireNonNull(left,
"left").getParserPosition(),
+ requireNonNull(right,
"right").getParserPosition(),
+ list.pos(ordinal))),
+ left,
+ right));
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int
rightPrec) {
+ call.operand(0).unparse(writer, leftPrec, 0);
+ final SqlWriter.Frame frame = writer.startList("[", "]");
+ if (!this.getName().equals("ITEM")) {
+ final SqlWriter.Frame offsetFrame =
writer.startFunCall(this.getName());
+ call.operand(1).unparse(writer, 0, 0);
+ writer.endFunCall(offsetFrame);
+ } else {
+ call.operand(1).unparse(writer, 0, 0);
+ }
+ writer.endList(frame);
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return SqlOperandCountRanges.of(2);
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean
throwOnFailure) {
+ final SqlNode left = callBinding.operand(0);
+ final SqlNode right = callBinding.operand(1);
+ if (!getOperandTypeChecker().checkSingleOperandType(callBinding, left,
0, throwOnFailure)) {
+ return false;
+ }
+ final SqlSingleOperandTypeChecker checker = getChecker(callBinding);
+ return checker.checkSingleOperandType(callBinding, right, 0,
throwOnFailure);
+ }
+
+ @Override
+ public SqlSingleOperandTypeChecker getOperandTypeChecker() {
+ return (SqlSingleOperandTypeChecker)
+ requireNonNull(super.getOperandTypeChecker(),
"operandTypeChecker");
+ }
+
+ private static SqlSingleOperandTypeChecker getChecker(SqlCallBinding
callBinding) {
+ final RelDataType operandType = callBinding.getOperandType(0);
+ switch (operandType.getSqlTypeName()) {
+ case ARRAY:
+ return OperandTypes.family(SqlTypeFamily.INTEGER);
+ case MAP:
+ RelDataType keyType =
+ requireNonNull(operandType.getKeyType(),
"operandType.getKeyType()");
+ SqlTypeName sqlTypeName = keyType.getSqlTypeName();
+ return OperandTypes.family(
+ requireNonNull(
+ sqlTypeName.getFamily(),
+ () ->
+ "keyType.getSqlTypeName().getFamily()
null, type is "
+ + sqlTypeName));
+ case ROW:
+ case ANY:
+ case DYNAMIC_STAR:
+ case VARIANT:
+ return OperandTypes.family(SqlTypeFamily.INTEGER)
+ .or(OperandTypes.family(SqlTypeFamily.CHARACTER));
+ default:
+ throw callBinding.newValidationSignatureError();
+ }
+ }
+
+ @Override
+ public String getAllowedSignatures(String name) {
+ if (name.equals("ITEM")) {
+ // FLINK MODIFICATION BEGIN
+ return "<ARRAY>[<INTEGER>]\n"
+ + "<MAP>[<ANY>]\n"
+ + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+ + "<VARIANT>[<CHARACTER>|<INTEGER>]";
+ // FLINK MODIFICATION END
+ } else {
+ return "<ARRAY>[" + name + "(<INTEGER>)]";
+ }
+ }
+
+ @Override
+ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+ final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+ final RelDataType operandType = opBinding.getOperandType(0);
+ switch (operandType.getSqlTypeName()) {
+ case VARIANT:
+ // Return type is always nullable VARIANT
+ return typeFactory.createTypeWithNullability(operandType,
true);
+ case ARRAY:
+ return typeFactory.createTypeWithNullability(
+ getComponentTypeOrThrow(operandType), true);
+ case MAP:
+ return typeFactory.createTypeWithNullability(
+ requireNonNull(
+ operandType.getValueType(),
+ () -> "operandType.getValueType() is null for
" + operandType),
+ true);
+ case ROW:
+ RelDataType fieldType;
+ RelDataType indexType = opBinding.getOperandType(1);
+
+ if (SqlTypeUtil.isString(indexType)) {
+ final String fieldName =
+ getOperandLiteralValueOrThrow(opBinding, 1,
String.class);
+ RelDataTypeField field = operandType.getField(fieldName,
false, false);
+ if (field == null) {
+ throw new AssertionError(
+ "Cannot infer type of field '"
+ + fieldName
+ + "' within ROW type: "
+ + operandType);
+ } else {
+ fieldType = field.getType();
+ }
+ } else if (SqlTypeUtil.isIntType(indexType)) {
+ Integer index = opBinding.getOperandLiteralValue(1,
Integer.class);
+ if (index == null || index < 1 || index >
operandType.getFieldCount()) {
+ throw new AssertionError(
+ "Cannot infer type of field at position "
+ + index
+ + " within ROW type: "
+ + operandType);
+ } else {
+ fieldType =
+ operandType.getFieldList().get(index -
1).getType(); // 1 indexed
+ }
+ } else {
+ throw new AssertionError(
+ "Unsupported field identifier type: '" + indexType
+ "'");
+ }
+ if (operandType.isNullable()) {
+ fieldType =
typeFactory.createTypeWithNullability(fieldType, true);
+ }
+ return fieldType;
+ case ANY:
+ case DYNAMIC_STAR:
+ return typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.ANY), true);
+ default:
+ throw new AssertionError();
+ }
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index febd7ac1fa4..6685be8fdbd 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -2455,7 +2455,8 @@ public final class BuiltInFunctionDefinitions {
sequence(
or(
logical(LogicalTypeRoot.ARRAY),
- logical(LogicalTypeRoot.MAP)),
+ logical(LogicalTypeRoot.MAP),
+ logical(LogicalTypeRoot.VARIANT)),
InputTypeStrategies.ITEM_AT_INDEX))
.outputTypeStrategy(SpecificTypeStrategies.ITEM_AT)
.build();
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtIndexArgumentTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtIndexArgumentTypeStrategy.java
index 82b360ccc43..fb3c6c7432c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtIndexArgumentTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtIndexArgumentTypeStrategy.java
@@ -41,7 +41,10 @@ import java.util.Optional;
* or {@link LogicalTypeRoot#MULTISET}
*
* <p>the type to be equal to the key type of {@link LogicalTypeRoot#MAP} if
the first argument is a
- * map.
+ * map
+ *
+ * <p>a {@link LogicalTypeFamily#NUMERIC} type or {@link
LogicalTypeFamily#CHARACTER_STRING} type if
+ * the first argument is a {@link LogicalTypeRoot#VARIANT}.
*/
@Internal
public final class ItemAtIndexArgumentTypeStrategy implements
ArgumentTypeStrategy {
@@ -86,12 +89,36 @@ public final class ItemAtIndexArgumentTypeStrategy
implements ArgumentTypeStrate
}
}
+ if (collectionType.is(LogicalTypeRoot.VARIANT)) {
+ if
(indexType.getLogicalType().is(LogicalTypeFamily.INTEGER_NUMERIC)) {
+
+ if (callContext.isArgumentLiteral(1)) {
+ Optional<Integer> literalVal =
callContext.getArgumentValue(1, Integer.class);
+ if (literalVal.isPresent() && literalVal.get() <= 0) {
+ return callContext.fail(
+ throwOnFailure,
+ "The provided index must be a valid SQL index
starting from 1, but was '%s'",
+ literalVal.get());
+ }
+ }
+
+ return Optional.of(indexType);
+ } else if
(indexType.getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING)) {
+ return Optional.of(indexType);
+ } else {
+ return callContext.fail(
+ throwOnFailure,
+ "Incorrect type %s supplied for the variant value.
Variant values can only be accessed with a CHARACTER STRING map key or an
INTEGER NUMERIC array index.",
+ indexType.getLogicalType().toString());
+ }
+ }
+
return Optional.empty();
}
@Override
public Signature.Argument getExpectedArgument(
FunctionDefinition functionDefinition, int argumentPos) {
- return Signature.Argument.of("[<INTEGER NUMERIC> | <MAP_KEY_TYPE>]");
+ return Signature.Argument.of("[<CHARACTER STRING> | <INTEGER NUMERIC>
| <MAP_KEY_TYPE>]");
}
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtTypeStrategy.java
index dc5a3ddebf8..ba781ee4875 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ItemAtTypeStrategy.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.types.inference.strategies;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.KeyValueDataType;
@@ -33,27 +34,30 @@ import java.util.Optional;
/**
* An output type strategy for {@link BuiltInFunctionDefinitions#AT}.
*
- * <p>Returns either the element of an {@link LogicalTypeFamily#COLLECTION}
type or the value of
- * {@link LogicalTypeRoot#MAP}.
+ * <p>Returns either the element of an {@link LogicalTypeFamily#COLLECTION}
type, the value of
+ * {@link LogicalTypeRoot#MAP}, or another {@link LogicalTypeRoot#VARIANT}
value obtained by
+ * accessing the input {@link LogicalTypeRoot#VARIANT}.
*/
@Internal
public final class ItemAtTypeStrategy implements TypeStrategy {
@Override
public Optional<DataType> inferType(CallContext callContext) {
- DataType arrayOrMapType = callContext.getArgumentDataTypes().get(0);
+ DataType containerType = callContext.getArgumentDataTypes().get(0);
final Optional<DataType> legacyArrayElement =
- StrategyUtils.extractLegacyArrayElement(arrayOrMapType);
+ StrategyUtils.extractLegacyArrayElement(containerType);
if (legacyArrayElement.isPresent()) {
return legacyArrayElement;
}
- if (arrayOrMapType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
+ if (containerType.getLogicalType().is(LogicalTypeRoot.ARRAY)) {
return Optional.of(
- ((CollectionDataType)
arrayOrMapType).getElementDataType().nullable());
- } else if (arrayOrMapType instanceof KeyValueDataType) {
- return Optional.of(((KeyValueDataType)
arrayOrMapType).getValueDataType().nullable());
+ ((CollectionDataType)
containerType).getElementDataType().nullable());
+ } else if (containerType instanceof KeyValueDataType) {
+ return Optional.of(((KeyValueDataType)
containerType).getValueDataType().nullable());
+ } else if (containerType.getLogicalType().is(LogicalTypeRoot.VARIANT))
{
+ return Optional.of(((AtomicDataType) containerType).nullable());
}
return Optional.empty();
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/ItemAtIndexArgumentTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/ItemAtIndexArgumentTypeStrategyTest.java
index d38ebd011f3..1a5bf23b7a0 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/ItemAtIndexArgumentTypeStrategyTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/ItemAtIndexArgumentTypeStrategyTest.java
@@ -42,7 +42,7 @@ class ItemAtIndexArgumentTypeStrategyTest extends
InputTypeStrategiesTestBase {
DataTypes.ARRAY(DataTypes.STRING().notNull()),
DataTypes.SMALLINT().notNull())
.expectSignature(
- "f([<ARRAY> | <MAP>], [<INTEGER NUMERIC> |
<MAP_KEY_TYPE>])")
+ "f([<ARRAY> | <MAP> | <VARIANT>], [<CHARACTER
STRING> | <INTEGER NUMERIC> | <MAP_KEY_TYPE>])")
.expectArgumentTypes(
DataTypes.ARRAY(DataTypes.STRING().notNull()),
DataTypes.SMALLINT().notNull()),
@@ -58,7 +58,7 @@ class ItemAtIndexArgumentTypeStrategyTest extends
InputTypeStrategiesTestBase {
DataTypes.MAP(DataTypes.BIGINT(),
DataTypes.STRING().notNull()),
DataTypes.SMALLINT())
.expectSignature(
- "f([<ARRAY> | <MAP>], [<INTEGER NUMERIC> |
<MAP_KEY_TYPE>])")
+ "f([<ARRAY> | <MAP> | <VARIANT>], [<CHARACTER
STRING> | <INTEGER NUMERIC> | <MAP_KEY_TYPE>])")
.expectArgumentTypes(
DataTypes.MAP(DataTypes.BIGINT(),
DataTypes.STRING().notNull()),
DataTypes.BIGINT()),
@@ -67,11 +67,36 @@ class ItemAtIndexArgumentTypeStrategyTest extends
InputTypeStrategiesTestBase {
DataTypes.MAP(DataTypes.BIGINT(),
DataTypes.STRING().notNull()),
DataTypes.STRING())
.expectErrorMessage("Expected index for a MAP to be of
type: BIGINT"),
- TestSpec.forStrategy("Validate incorrect index",
ITEM_AT_INPUT_STRATEGY)
+ TestSpec.forStrategy(
+ "Validate incorrect index for an array",
ITEM_AT_INPUT_STRATEGY)
.calledWithArgumentTypes(
DataTypes.ARRAY(DataTypes.BIGINT()),
DataTypes.INT().notNull())
.calledWithLiteralAt(1, 0)
.expectErrorMessage(
- "The provided index must be a valid SQL index
starting from 1, but was '0'"));
+ "The provided index must be a valid SQL index
starting from 1, but was '0'"),
+ TestSpec.forStrategy("Validate integer index for a variant",
ITEM_AT_INPUT_STRATEGY)
+ .calledWithArgumentTypes(
+ DataTypes.VARIANT(),
DataTypes.SMALLINT().notNull())
+ .expectSignature(
+ "f([<ARRAY> | <MAP> | <VARIANT>], [<CHARACTER
STRING> | <INTEGER NUMERIC> | <MAP_KEY_TYPE>])")
+ .expectArgumentTypes(DataTypes.VARIANT(),
DataTypes.SMALLINT().notNull()),
+ TestSpec.forStrategy(
+ "Validate incorrect index for a variant",
ITEM_AT_INPUT_STRATEGY)
+ .calledWithArgumentTypes(
+ DataTypes.VARIANT(),
DataTypes.SMALLINT().notNull())
+ .calledWithLiteralAt(1, 0)
+ .expectErrorMessage(
+ "The provided index must be a valid SQL index
starting from 1, but was '0'"),
+ TestSpec.forStrategy("Validate string key for a variant",
ITEM_AT_INPUT_STRATEGY)
+ .calledWithArgumentTypes(DataTypes.VARIANT(),
DataTypes.STRING().notNull())
+ .expectSignature(
+ "f([<ARRAY> | <MAP> | <VARIANT>], [<CHARACTER
STRING> | <INTEGER NUMERIC> | <MAP_KEY_TYPE>])")
+ .expectArgumentTypes(DataTypes.VARIANT(),
DataTypes.STRING().notNull()),
+ TestSpec.forStrategy(
+ "Validate incorrect variant key for a variant",
+ ITEM_AT_INPUT_STRATEGY)
+ .calledWithArgumentTypes(DataTypes.VARIANT(),
DataTypes.DOUBLE().notNull())
+ .expectErrorMessage(
+ "Incorrect type DOUBLE NOT NULL supplied for
the variant value. Variant values can only be accessed with a CHARACTER STRING
map key or an INTEGER NUMERIC array index."));
}
}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index c625bfd89b3..7154aa09f0a 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -743,7 +743,11 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
case LogicalTypeRoot.ROW | LogicalTypeRoot.STRUCTURED_TYPE =>
generateDot(ctx, operands)
- case _ => throw new CodeGenException("Expect an array, a map or a
row.")
+ case LogicalTypeRoot.VARIANT =>
+ val key = operands(1)
+ generateVariantGet(ctx, operands.head, key)
+
+ case _ => throw new CodeGenException("Expect an array, a map, a row
or a variant.")
}
case CARDINALITY =>
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 249e73fc4ca..11b1cf199ab 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -1668,6 +1668,100 @@ object ScalarOperatorGens {
generateUnaryOperatorIfNotNull(ctx, resultType, map)(_ =>
s"${map.resultTerm}.size()")
}
+ def generateVariantGet(
+ ctx: CodeGeneratorContext,
+ variant: GeneratedExpression,
+ key: GeneratedExpression): GeneratedExpression = {
+ val Seq(resultTerm, nullTerm) = newNames(ctx, "result", "isNull")
+ val tmpValue = newName(ctx, "tmpValue")
+
+ val variantType = variant.resultType.asInstanceOf[VariantType]
+ val variantTerm = variant.resultTerm
+ val variantTypeTerm = primitiveTypeTermForType(variantType)
+ val variantDefaultTerm = primitiveDefaultValue(variantType)
+
+ val keyTerm = key.resultTerm
+ val keyType = key.resultType
+
+ val accessCode = if (isIntegerNumeric(keyType)) {
+ generateIntegerKeyAccess(
+ variantTerm,
+ variantTypeTerm,
+ keyTerm,
+ resultTerm,
+ nullTerm,
+ tmpValue
+ )
+ } else if (isCharacterString(keyType)) {
+ val fieldName = key.literalValue.get.toString
+ generateCharacterStringKeyAccess(
+ variantTerm,
+ variantTypeTerm,
+ fieldName,
+ resultTerm,
+ nullTerm,
+ tmpValue
+ )
+ } else {
+ throw new CodeGenException(s"Unsupported key type for variant: $keyType")
+ }
+
+ val finalCode =
+ s"""
+ |${variant.code}
+ |${key.code}
+ |boolean $nullTerm = (${variant.nullTerm} || ${key.nullTerm});
+ |$variantTypeTerm $resultTerm = $variantDefaultTerm;
+ |if (!$nullTerm) {
+ | $accessCode
+ |}
+ """.stripMargin
+
+ GeneratedExpression(resultTerm, nullTerm, finalCode, variantType)
+ }
+
+ private def generateCharacterStringKeyAccess(
+ variantTerm: String,
+ variantTypeTerm: String,
+ fieldName: String,
+ resultTerm: String,
+ nullTerm: String,
+ tmpValue: String): String = {
+ s"""
+ | if ($variantTerm.isObject()){
+ | $variantTypeTerm $tmpValue = $variantTerm.getField("$fieldName");
+ | if ($tmpValue == null) {
+ | $nullTerm = true;
+ | } else {
+ | $resultTerm = $tmpValue;
+ | }
+ | } else {
+ | throw new org.apache.flink.table.api.TableRuntimeException("String
key access on variant requires an object variant, but a non-object variant was
provided.");
+ | }
+ """.stripMargin
+ }
+
+ private def generateIntegerKeyAccess(
+ variantTerm: String,
+ variantTypeTerm: String,
+ keyTerm: String,
+ resultTerm: String,
+ nullTerm: String,
+ tmpValue: String): String = {
+ s"""
+ | if ($variantTerm.isArray()){
+ | $variantTypeTerm $tmpValue = $variantTerm.getElement((int)
$keyTerm - 1);
+ | if ($tmpValue == null) {
+ | $nullTerm = true;
+ | } else {
+ | $resultTerm = $tmpValue;
+ | }
+ | } else {
+ | throw new
org.apache.flink.table.api.TableRuntimeException("Integer index access on
variant requires an array variant, but a non-array variant was provided.");
+ | }
+ """.stripMargin
+ }
+
//
----------------------------------------------------------------------------------------
// private generate utils
//
----------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
index 6815e7b2528..1aa4ad1af0f 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
@@ -33,6 +34,8 @@ import org.apache.flink.types.variant.VariantBuilder;
import java.util.List;
+import static org.apache.flink.table.api.Expressions.$;
+
/** Semantic tests for {@link DataTypes#VARIANT()} type. */
public class VariantSemanticTest extends SemanticTestBase {
@@ -253,6 +256,215 @@ public class VariantSemanticTest extends SemanticTestBase
{
.runSql("INSERT INTO sink_t SELECT k, SUM(v) AS total FROM
t GROUP BY k")
.build();
+ static final TableTestProgram VARIANT_ARRAY_ACCESS;
+
+ static final TableTestProgram
VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES;
+
+ static final TableTestProgram VARIANT_OBJECT_ACCESS;
+
+ static final TableTestProgram VARIANT_NESTED_ACCESS;
+
+ static final TableTestProgram VARIANT_ARRAY_ERROR_ACCESS;
+
+ static final TableTestProgram VARIANT_OBJECT_ERROR_ACCESS;
+
+ public static final SourceTestStep VARIANT_ARRAY_SOURCE =
+ SourceTestStep.newBuilder("t")
+ .addSchema("v VARIANT")
+ .producedValues(
+ Row.of(
+ BUILDER.array()
+ .add(BUILDER.of(1))
+ .add(BUILDER.of("hello"))
+ .add(BUILDER.of(3.14))
+ .build()),
+ Row.of(
+ BUILDER.array()
+ .add(BUILDER.of(10))
+ .add(BUILDER.of("world"))
+ .build()),
+ new Row(1))
+ .build();
+
+ public static final SinkTestStep VARIANT_ARRAY_SINK =
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("v1 VARIANT", "v2 VARIANT", "v3 VARIANT")
+ .consumedValues(
+ Row.of(BUILDER.of(1), BUILDER.of("hello"),
BUILDER.of(3.14)),
+ Row.of(BUILDER.of(10), BUILDER.of("world"), null),
+ new Row(3))
+ .build();
+
+ public static final SourceTestStep VARIANT_OBJECT_SOURCE =
+ SourceTestStep.newBuilder("t")
+ .addSchema("v VARIANT")
+ .producedValues(
+ Row.of(
+ BUILDER.object()
+ .add("name", BUILDER.of("Alice"))
+ .add("age", BUILDER.of(30))
+ .add("city", BUILDER.of("NYC"))
+ .build()),
+ Row.of(
+ BUILDER.object()
+ .add("name", BUILDER.of("Bob"))
+ .add("age", BUILDER.of(25))
+ .build()),
+ new Row(1))
+ .build();
+
+ public static final SinkTestStep VARIANT_OBJECT_SINK =
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("name VARIANT", "age VARIANT", "city VARIANT")
+ .consumedValues(
+ Row.of(BUILDER.of("Alice"), BUILDER.of(30),
BUILDER.of("NYC")),
+ Row.of(BUILDER.of("Bob"), BUILDER.of(25), null),
+ new Row(3))
+ .build();
+
+ public static final SourceTestStep VARIANT_NESTED_SOURCE =
+ SourceTestStep.newBuilder("t")
+ .addSchema("v VARIANT")
+ .producedValues(
+ Row.of(
+ BUILDER.object()
+ .add(
+ "users",
+ BUILDER.array()
+ .add(
+
BUILDER.object()
+
.add(
+
"id",
+
BUILDER.of(1))
+
.add(
+
"name",
+
BUILDER.of(
+
"Alice"))
+
.build())
+ .build())
+ .build()),
+ new Row(1))
+ .build();
+
+ public static final SinkTestStep VARIANT_NESTED_SINK =
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("user_id VARIANT", "user_name VARIANT")
+ .consumedValues(Row.of(BUILDER.of(1),
BUILDER.of("Alice")), new Row(2))
+ .build();
+
+ static {
+ VARIANT_ARRAY_ACCESS =
+ TableTestProgram.of(
+ "variant-array-access",
+ "validates variant array access using []
operator in sql and at() in table api")
+ .setupTableSource(VARIANT_ARRAY_SOURCE)
+ .setupTableSink(VARIANT_ARRAY_SINK)
+ .runSql("INSERT INTO sink_t SELECT v[1], v[2], v[3]
FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+ $("v").at(1).as("v1"),
+ $("v").at(2).as("v2"),
+ $("v").at(3).as("v3")),
+ "sink_t")
+ .build();
+
+ VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES =
+ TableTestProgram.of(
+
"variant-array-access-with-different-index-types",
+ "validates variant array access with different
index types using [] operator in sql and at() in table api")
+ .setupTableSource(VARIANT_ARRAY_SOURCE)
+ .setupTableSink(VARIANT_ARRAY_SINK)
+ .runSql(
+ "INSERT INTO sink_t SELECT v[cast(1 as
tinyint)], v[cast(2 as smallint)], v[cast(4 as bigint)] FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+ $("v").at((byte)
1).as("v1"),
+ $("v").at((short)
2).as("v2"),
+ $("v").at((long)
3).as("v3")),
+ "sink_t")
+ .build();
+
+ VARIANT_OBJECT_ACCESS =
+ TableTestProgram.of(
+ "variant-object-access",
+ "validates variant object field access using
[] operator in sql and at() in table api")
+ .setupTableSource(VARIANT_OBJECT_SOURCE)
+ .setupTableSink(VARIANT_OBJECT_SINK)
+ .runSql("INSERT INTO sink_t SELECT v['name'],
v['age'], v['city'] FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+
$("v").at("name").as("name"),
+
$("v").at("age").as("age"),
+
$("v").at("city").as("city")),
+ "sink_t")
+ .build();
+
+ VARIANT_NESTED_ACCESS =
+ TableTestProgram.of(
+ "variant-nested-access",
+ "validates variant nested access using []
operator in sql and at() in table api")
+ .setupTableSource(VARIANT_NESTED_SOURCE)
+ .setupTableSink(VARIANT_NESTED_SINK)
+ .runSql(
+ "INSERT INTO sink_t SELECT
v['users'][1]['id'], v['users'][1]['name'] FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+ $("v").at("users")
+ .at(1)
+ .at("id")
+ .as("user_id"),
+ $("v").at("users")
+ .at(1)
+ .at("name")
+
.as("user_name")),
+ "sink_t")
+ .build();
+
+ VARIANT_ARRAY_ERROR_ACCESS =
+ TableTestProgram.of(
+ "variant-array-error-access",
+ "validates variant array access using []
operator in sql and at() in table api with string")
+ .setupTableSource(VARIANT_ARRAY_SOURCE)
+ .runFailingSql(
+ "SELECT v['1'], v['2'], v['3'] FROM t",
+ TableRuntimeException.class,
+ "String key access on variant requires an
object variant, but a non-object variant was provided.")
+ .runFailingSql(
+ "SELECT v[1.5], v[4.2], v[3.3] FROM t",
+ ValidationException.class,
+ "Cannot apply 'ITEM' to arguments of type
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
+ + "<MAP>[<ANY>]\n"
+ + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+ + "<VARIANT>[<CHARACTER>|<INTEGER>]")
+ .build();
+
+ VARIANT_OBJECT_ERROR_ACCESS =
+ TableTestProgram.of(
+ "variant-object-error-access",
+ "validates variant object field access using
[] operator in sql and at() in table api")
+ .setupTableSource(VARIANT_OBJECT_SOURCE)
+ .runFailingSql(
+ "SELECT v[1], v[2], v[3] FROM t",
+ TableRuntimeException.class,
+ "Integer index access on variant requires an
array variant, but a non-array variant was provided.")
+ .runFailingSql(
+ "SELECT v[1.5], v[4.2], v[3.3] FROM t",
+ ValidationException.class,
+ "Cannot apply 'ITEM' to arguments of type
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
+ + "<MAP>[<ANY>]\n"
+ + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+ + "<VARIANT>[<CHARACTER>|<INTEGER>]")
+ .build();
+ }
+
@Override
public List<TableTestProgram> programs() {
return List.of(
@@ -264,7 +476,13 @@ public class VariantSemanticTest extends SemanticTestBase {
BUILTIN_AGG_WITH_RETRACTION,
VARIANT_AS_UDF_ARG,
VARIANT_AS_UDAF_ARG,
- VARIANT_AS_AGG_KEY);
+ VARIANT_AS_AGG_KEY,
+ VARIANT_ARRAY_ACCESS,
+ VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES,
+ VARIANT_OBJECT_ACCESS,
+ VARIANT_NESTED_ACCESS,
+ VARIANT_ARRAY_ERROR_ACCESS,
+ VARIANT_OBJECT_ERROR_ACCESS);
}
public static class MyUdf extends ScalarFunction {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
index 49022e18fbe..591e1899b9d 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/TypeCheckUtils.java
@@ -84,6 +84,10 @@ public class TypeCheckUtils {
return
type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.BINARY_STRING);
}
+ public static boolean isIntegerNumeric(LogicalType type) {
+ return
type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.INTEGER_NUMERIC);
+ }
+
public static boolean isTimestamp(LogicalType type) {
return type.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE;
}