This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 37b7c5170 [FLINK-39010][transform] YAML Pipeline supports item
subscription of Variant type (#4261)
37b7c5170 is described below
commit 37b7c5170d920874724bbcac6d5d924ed0c4ce96
Author: Jia Fan <[email protected]>
AuthorDate: Fri Feb 6 12:14:08 2026 +0800
[FLINK-39010][transform] YAML Pipeline supports item subscription of
Variant type (#4261)
---
docs/content.zh/docs/core-concept/transform.md | 4 +
docs/content/docs/core-concept/transform.md | 4 +-
.../src/test/resources/specs/nested.yaml | 93 ++++++++++++-
.../runtime/functions/impl/StructFunctions.java | 51 ++++++-
.../flink/cdc/runtime/parser/JaninoCompiler.java | 5 +-
.../parser/metadata/TransformSqlOperatorTable.java | 4 +-
.../parser/metadata/VariantAwareItemOperator.java | 113 +++++++++++++++
.../functions/impl/StructFunctionsTest.java | 154 +++++++++++++++++++++
.../cdc/runtime/parser/TransformParserTest.java | 19 +++
9 files changed, 434 insertions(+), 13 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index 5f278e798..35665fa8a 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -228,11 +228,15 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any
valid expression `<EXP
## Struct Functions
+结构函数用于使用下标语法访问 ARRAY、MAP、ROW 和 VARIANT 类型中的元素。
+
| Function | Janino Code | Description |
| -------- | ----------- | ----------- |
| array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL
标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 |
| map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为
NULL,则返回 NULL。 |
| row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1
开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 |
+| variant[index] | itemAccess(variant, index) | 返回 variant 中位置 `index`
的元素(如果是数组)。索引从 1 开始(SQL 标准)。如果索引超出范围或 variant 不是数组,则返回 NULL。 |
+| variant[key] | itemAccess(variant, key) | 返回 variant 中与 `key` 关联的值(如果是对象)。如果
key 不存在或 variant 不是对象,则返回 NULL。 |
# 示例
## 添加计算列
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index d0a4901b4..22fe8aff1 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -229,13 +229,15 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any
valid expression `<EXP
## Struct Functions
-Struct functions are used to access elements in ARRAY, MAP and ROW types using
subscript syntax.
+Struct functions are used to access elements in ARRAY, MAP, ROW, and VARIANT
types using subscript syntax.
| Function | Janino Code | Description |
| -------- | ----------- | ----------- |
| array[index] | itemAccess(array, index) | Returns the element at position
`index` in the array. Index is 1-based (SQL standard). Returns NULL if the
index is out of bounds or if the array is NULL. |
| map[key] | itemAccess(map, key) | Returns the value associated with `key` in
the map. Returns NULL if the key does not exist or if the map is NULL. |
| row[index] | itemAccess(row, index) | Returns the field at position `index`
in the row. Index is 1-based. The index must be a constant (not a computed
expression) since the return type must be statically determined. |
+| variant[index] | itemAccess(variant, index) | Returns the element at
position `index` in the variant (if it's an array). Index is 1-based (SQL
standard). Returns NULL if the index is out of bounds or if the variant is not
an array. |
+| variant[key] | itemAccess(variant, key) | Returns the value associated with
`key` in the variant (if it's an object). Returns NULL if the key does not
exist or if the variant is not an object. |
# Example
## Add computed columns
diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml
b/flink-cdc-composer/src/test/resources/specs/nested.yaml
index f61ec8f79..a600b469b 100644
--- a/flink-cdc-composer/src/test/resources/specs/nested.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml
@@ -35,7 +35,7 @@
id_
PARSE_JSON(id_) AS variant_id_
PARSE_JSON('[' || id_ || ',' || bool_ || ']') AS variant_array_
- PARSE_JSON('{\"id\": ' || id_ || ', \"name\": \"' || string_ || '\"}') AS
variant_doc_
+ PARSE_JSON('{"id": ' || id_ || ', "name": "' || string_ || '"}') AS
variant_doc_
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_id_` VARIANT,`variant_array_` VARIANT,`variant_doc_`
VARIANT}, primaryKeys=id_, options=()}
@@ -55,7 +55,7 @@
id_
TRY_PARSE_JSON(id_) AS variant_id_
TRY_PARSE_JSON('[' || id_ || ',' || bool_ || ']') AS variant_array_
- TRY_PARSE_JSON('{\"id\": ' || id_ || ', \"name\": \"' || string_ || '\"}')
AS variant_doc_
+ TRY_PARSE_JSON('{"id": ' || id_ || ', "name": "' || string_ || '"}') AS
variant_doc_
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_id_` VARIANT,`variant_array_` VARIANT,`variant_doc_`
VARIANT}, primaryKeys=id_, options=()}
@@ -79,13 +79,13 @@
- do: Parse Variant from JSON with duplicate key rejected
projection: |-
id_
- PARSE_JSON('{\"id\": 1, \"id\": 2}') AS variant_
+ PARSE_JSON('{"id": 1, "id": 2}') AS variant_
primary-key: id_
expect-error:
'org.apache.flink.cdc.common.types.variant.VariantTypeException:
VARIANT_DUPLICATE_KEY'
- do: Parse Variant from JSON with duplicate key allowed
projection: |-
id_
- PARSE_JSON('{\"id\": 1, \"id\": 2}', TRUE) AS variant_
+ PARSE_JSON('{"id": 1, "id": 2}', TRUE) AS variant_
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_` VARIANT}, primaryKeys=id_, options=()}
@@ -97,7 +97,7 @@
- do: Try Parse Variant from JSON with duplicate key rejected
projection: |-
id_
- TRY_PARSE_JSON('{\"id\": 1, \"id\": 2}') AS variant_
+ TRY_PARSE_JSON('{"id": 1, "id": 2}') AS variant_
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_` VARIANT}, primaryKeys=id_, options=()}
@@ -109,7 +109,7 @@
- do: Try Parse Variant from JSON with duplicate key allowed
projection: |-
id_
- TRY_PARSE_JSON('{\"id\": 1, \"id\": 2}', TRUE) AS variant_
+ TRY_PARSE_JSON('{"id": 1, "id": 2}', TRUE) AS variant_
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_` VARIANT}, primaryKeys=id_, options=()}
@@ -283,4 +283,83 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>},
primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice,
length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}],
op=UPDATE, meta=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida,
length: INT -> 7}], after=[], op=DELETE, meta=()}
\ No newline at end of file
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida,
length: INT -> 7}], after=[], op=DELETE, meta=()}
+- do: Variant Object Subscripting With String Key
+ projection: |-
+ id_
+ variant_
+ variant_['k'] AS variant_k
+ primary-key: id_
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_` VARIANT,`variant_k` VARIANT}, primaryKeys=id_,
options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1,
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, 1], op=INSERT,
meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1,
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, 1], after=[-1,
[{"k":1},"hello",{"k":2}], null], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1,
[{"k":1},"hello",{"k":2}], null], after=[], op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
+- do: Variant Array Subscripting With Integer Index
+ projection: |-
+ id_
+ variant_
+ variant_[1] AS variant_first
+ variant_[2] AS variant_second
+ primary-key: id_
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`variant_` VARIANT,`variant_first` VARIANT,`variant_second`
VARIANT}, primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1,
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1,
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, null, null],
after=[-1, [{"k":1},"hello",{"k":2}], {"k":1}, "hello"], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1,
[{"k":1},"hello",{"k":2}], {"k":1}, "hello"], after=[], op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null,
null], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null],
after=[], op=DELETE, meta=()}
+- do: Variant Nested Subscripting
+ projection: |-
+ id_
+ variant_['object']['k'] AS nested_object_k
+ variant_['array'][1] AS nested_array_first
+ primary-key: id_
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`nested_object_k` VARIANT,`nested_array_first` VARIANT},
primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, "hello", 1],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, "hello", 1], after=[-1,
null, null], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, null], after=[],
op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
+- do: Variant Subscripting With Absent Key
+ projection: |-
+ id_
+ variant_['nonexistent'] AS absent_key
+ variant_[1000] AS absent_index
+ primary-key: id_
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`absent_key` VARIANT,`absent_index` VARIANT},
primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, null, null], after=[-1,
null, null], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, null], after=[],
op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
+- do: Variant Subscripting With Parsed JSON
+ projection: |-
+ id_
+ PARSE_JSON('[1, 2, 3]')[2] AS array_second
+ PARSE_JSON('[10, 20, 30]')[1] AS array_first
+ primary-key: id_
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`array_second` VARIANT,`array_first` VARIANT},
primaryKeys=id_, options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 2, 10],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, 2, 10], after=[-1, 2, 10],
op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, 2, 10], after=[],
op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, 2, 10],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, 2, 10], after=[],
op=DELETE, meta=()}
+- do: Variant Subscripting With Parsed JSON Object
+ projection: |-
+ id_
+ PARSE_JSON('{"key": "value"}')['key'] AS json_key
+ PARSE_JSON('{"nested": {"inner": 42}}')['nested']['inner'] AS nested_value
+ primary-key: id_
+ expect: |-
+ CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`json_key` VARIANT,`nested_value` VARIANT}, primaryKeys=id_,
options=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, "value", 42],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, "value", 42], after=[-1,
"value", 42], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[-1, "value", 42], after=[],
op=DELETE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, "value", 42],
op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[0, "value", 42], after=[],
op=DELETE, meta=()}
\ No newline at end of file
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
index 603930cc9..9ce343782 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
@@ -17,14 +17,16 @@
package org.apache.flink.cdc.runtime.functions.impl;
+import org.apache.flink.cdc.common.types.variant.Variant;
+
import java.util.List;
import java.util.Map;
/**
* Built-in functions for collection and struct data types.
*
- * <p>These functions support accessing elements from collections (ARRAY, MAP)
and structured data
- * types (ROW).
+ * <p>These functions support accessing elements from collections (ARRAY,
MAP), structured data
+ * types (ROW), and semi-structured data types (VARIANT).
*/
public class StructFunctions {
@@ -67,4 +69,49 @@ public class StructFunctions {
}
return map.get(key);
}
+
+ /**
+ * Accesses an element from a VARIANT array by index (1-based, SQL
standard).
+ *
+ * <p>variant[1] returns the first element.
+ *
+ * @param variant the variant (must be an array) to access
+ * @param index the 1-based index
+ * @return the element at the specified index as a Variant, or null if the
variant is not an
+ * array or index is out of bounds
+ */
+ public static Variant itemAccess(Variant variant, Integer index) {
+ if (variant == null || index == null) {
+ return null;
+ }
+ if (!variant.isArray()) {
+ return null;
+ }
+ // Convert 1-based index to 0-based (SQL standard uses 1-based
indexing)
+ int zeroBasedIndex = index - 1;
+ if (zeroBasedIndex < 0 || zeroBasedIndex >= variant.arraySize()) {
+ return null;
+ }
+ return variant.getElement(zeroBasedIndex);
+ }
+
+ /**
+ * Accesses a field from a VARIANT object by field name.
+ *
+ * <p>variant['fieldName'] returns the value of the specified field.
+ *
+ * @param variant the variant (must be an object) to access
+ * @param fieldName the name of the field to look up
+ * @return the field value as a Variant, or null if the variant is not an
object or field is not
+ * found
+ */
+ public static Variant itemAccess(Variant variant, String fieldName) {
+ if (variant == null || fieldName == null) {
+ return null;
+ }
+ if (!variant.isObject()) {
+ return null;
+ }
+ return variant.getField(fieldName);
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index fb5069255..14c483d70 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -174,7 +174,10 @@ public class JaninoCompiler {
Object value = sqlLiteral.getValue();
if (sqlLiteral instanceof SqlCharStringLiteral) {
// Double quotation marks represent strings in Janino.
- value = "\"" + sqlLiteral.getValueAs(NlsString.class).getValue() +
"\"";
+ // Escape backslashes first, then double quotes for proper Janino
string literals.
+ String stringValue =
sqlLiteral.getValueAs(NlsString.class).getValue();
+ stringValue = stringValue.replace("\\", "\\\\").replace("\"",
"\\\"");
+ value = "\"" + stringValue + "\"";
} else if (sqlLiteral instanceof SqlNumericLiteral) {
if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
long longValue = sqlLiteral.longValue(true);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 7d0f66362..6044fbd5d 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -401,8 +401,8 @@ public class TransformSqlOperatorTable extends
ReflectiveSqlOperatorTable {
// ---------------------
// Struct Functions
// ---------------------
- // Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
- public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
+ // Supports accessing elements of ARRAY[index], ROW[index], MAP[key], and
VARIANT[index/key]
+ public static final SqlOperator ITEM = new VariantAwareItemOperator();
public static final SqlFunction AI_CHAT_PREDICT =
new SqlFunction(
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/VariantAwareItemOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/VariantAwareItemOperator.java
new file mode 100644
index 000000000..d8f989c61
--- /dev/null
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/VariantAwareItemOperator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.parser.metadata;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+/**
+ * A variant-aware implementation of the ITEM operator.
+ *
+ * <p>This extends the standard Calcite ITEM operator to support VARIANT type.
For ARRAY, MAP, and
+ * ROW types, it delegates to the standard Calcite ITEM operator.
+ *
+ * <p>When accessing elements of a VARIANT:
+ *
+ * <ul>
+ * <li>VARIANT[INTEGER] - accesses array elements (1-based index)
+ * <li>VARIANT[STRING] - accesses object fields by name
+ * </ul>
+ *
+ * <p>The return type is always VARIANT when the source is VARIANT.
+ */
+public class VariantAwareItemOperator extends SqlSpecialOperator {
+
+ public VariantAwareItemOperator() {
+ super("ITEM", SqlKind.ITEM, 100, true, null, null, null);
+ }
+
+ @Override
+ public SqlOperandCountRange getOperandCountRange() {
+ return SqlStdOperatorTable.ITEM.getOperandCountRange();
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean
throwOnFailure) {
+ final SqlValidator validator = callBinding.getValidator();
+ final RelDataType operandType =
+ validator.deriveType(callBinding.getScope(),
callBinding.operand(0));
+ final SqlTypeName typeName = operandType.getSqlTypeName();
+
+ // Only handle VARIANT specially, delegate others to standard ITEM
+ if (typeName == SqlTypeName.VARIANT) {
+ final RelDataType keyType =
+ validator.deriveType(callBinding.getScope(),
callBinding.operand(1));
+ // VARIANT accepts integer or string keys
+ boolean valid =
+ SqlTypeUtil.isIntType(keyType)
+ || SqlTypeUtil.isCharacter(keyType)
+ || keyType.getSqlTypeName() == SqlTypeName.ANY;
+ if (!valid && throwOnFailure) {
+ throw callBinding.newValidationSignatureError();
+ }
+ return valid;
+ }
+
+ // Delegate to standard ITEM for ARRAY, MAP, ROW, etc.
+ return SqlStdOperatorTable.ITEM.checkOperandTypes(callBinding,
throwOnFailure);
+ }
+
+ @Override
+ public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+ final RelDataType operandType = opBinding.getOperandType(0);
+ final SqlTypeName typeName = operandType.getSqlTypeName();
+
+ // Only handle VARIANT specially
+ if (typeName == SqlTypeName.VARIANT) {
+ final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+ return typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARIANT), true);
+ }
+
+ // Delegate to standard ITEM for other types
+ return SqlStdOperatorTable.ITEM.inferReturnType(opBinding);
+ }
+
+ @Override
+ public String getAllowedSignatures(String opName) {
+ // Include VARIANT in addition to standard signatures
+ return SqlStdOperatorTable.ITEM.getAllowedSignatures(opName)
+ + "\n<VARIANT>[<INTEGER>|<CHARACTER>]";
+ }
+
+ @Override
+ public void unparse(
+ SqlWriter writer, org.apache.calcite.sql.SqlCall call, int
leftPrec, int rightPrec) {
+ SqlStdOperatorTable.ITEM.unparse(writer, call, leftPrec, rightPrec);
+ }
+}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
index 1ac03cc2b..4d9510451 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
@@ -17,6 +17,9 @@
package org.apache.flink.cdc.runtime.functions.impl;
+import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.types.variant.VariantBuilder;
+
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@@ -157,4 +160,155 @@ class StructFunctionsTest {
assertThat(StructFunctions.itemAccess(mapWithNullValue,
"key2")).isNull();
}
}
+
+ // ========================================
+ // Variant Access Tests
+ // ========================================
+ @Nested
+ class VariantAccessTests {
+
+ private final VariantBuilder builder = Variant.newBuilder();
+
+ @Test
+ void testVariantArrayAccessByIndex() {
+ // Build a variant array: [1, 2, 3]
+ Variant arrayVariant =
+ builder.array()
+ .add(builder.of(1))
+ .add(builder.of(2))
+ .add(builder.of(3))
+ .build();
+
+ // SQL uses 1-based indexing
+ Variant first = StructFunctions.itemAccess(arrayVariant, 1);
+ assertThat(first).isNotNull();
+ assertThat(first.getInt()).isEqualTo(1);
+
+ Variant second = StructFunctions.itemAccess(arrayVariant, 2);
+ assertThat(second).isNotNull();
+ assertThat(second.getInt()).isEqualTo(2);
+
+ Variant third = StructFunctions.itemAccess(arrayVariant, 3);
+ assertThat(third).isNotNull();
+ assertThat(third.getInt()).isEqualTo(3);
+ }
+
+ @Test
+ void testVariantArrayOutOfBoundsAccess() {
+ Variant arrayVariant =
+ builder.array()
+ .add(builder.of(10))
+ .add(builder.of(20))
+ .add(builder.of(30))
+ .build();
+
+ // Index 0 is invalid in SQL (1-based indexing)
+ assertThat(StructFunctions.itemAccess(arrayVariant, 0)).isNull();
+ // Negative index
+ assertThat(StructFunctions.itemAccess(arrayVariant, -1)).isNull();
+ // Index beyond size
+ assertThat(StructFunctions.itemAccess(arrayVariant, 4)).isNull();
+ assertThat(StructFunctions.itemAccess(arrayVariant, 100)).isNull();
+ }
+
+ @Test
+ void testVariantObjectAccessByFieldName() {
+ // Build a variant object: {"name": "Alice", "age": 30}
+ Variant objectVariant =
+ builder.object()
+ .add("name", builder.of("Alice"))
+ .add("age", builder.of(30))
+ .build();
+
+ Variant name = StructFunctions.itemAccess(objectVariant, "name");
+ assertThat(name).isNotNull();
+ assertThat(name.getString()).isEqualTo("Alice");
+
+ Variant age = StructFunctions.itemAccess(objectVariant, "age");
+ assertThat(age).isNotNull();
+ assertThat(age.getInt()).isEqualTo(30);
+ }
+
+ @Test
+ void testVariantObjectMissingField() {
+ Variant objectVariant = builder.object().add("exists",
builder.of("value")).build();
+
+ assertThat(StructFunctions.itemAccess(objectVariant,
"nonexistent")).isNull();
+ }
+
+ @Test
+ void testVariantNullHandling() {
+ Variant arrayVariant = builder.array().add(builder.of(1)).build();
+ Variant objectVariant = builder.object().add("key",
builder.of("value")).build();
+
+ // Null variant returns null
+ assertThat(StructFunctions.itemAccess((Variant) null, 1)).isNull();
+ assertThat(StructFunctions.itemAccess((Variant) null,
"key")).isNull();
+
+ // Null index returns null
+ assertThat(StructFunctions.itemAccess(arrayVariant, (Integer)
null)).isNull();
+
+ // Null field name returns null
+ assertThat(StructFunctions.itemAccess(objectVariant, (String)
null)).isNull();
+ }
+
+ @Test
+ void testVariantTypeMismatch() {
+ Variant arrayVariant = builder.array().add(builder.of(1)).build();
+ Variant objectVariant = builder.object().add("key",
builder.of("value")).build();
+
+ // Accessing array with string key returns null
+ assertThat(StructFunctions.itemAccess(arrayVariant,
"key")).isNull();
+
+ // Accessing object with integer index returns null
+ assertThat(StructFunctions.itemAccess(objectVariant, 1)).isNull();
+ }
+
+ @Test
+ void testNestedVariantAccess() {
+ // Build a nested variant: {"data": [1, {"nested": "value"}]}
+ Variant nestedVariant =
+ builder.object()
+ .add(
+ "data",
+ builder.array()
+ .add(builder.of(1))
+ .add(
+ builder.object()
+ .add("nested",
builder.of("value"))
+ .build())
+ .build())
+ .build();
+
+ // Access "data" field
+ Variant data = StructFunctions.itemAccess(nestedVariant, "data");
+ assertThat(data).isNotNull();
+ assertThat(data.isArray()).isTrue();
+
+ // Access second element of the array (index 2 in 1-based SQL
standard)
+ Variant secondElement = StructFunctions.itemAccess(data, 2);
+ assertThat(secondElement).isNotNull();
+ assertThat(secondElement.isObject()).isTrue();
+
+ // Access "nested" field
+ Variant nestedValue = StructFunctions.itemAccess(secondElement,
"nested");
+ assertThat(nestedValue).isNotNull();
+ assertThat(nestedValue.getString()).isEqualTo("value");
+ }
+
+ @Test
+ void testPrimitiveVariantAccess() {
+ // Primitive variants are neither arrays nor objects
+ Variant intVariant = builder.of(42);
+ Variant stringVariant = builder.of("hello");
+
+ // Accessing primitive variant with index returns null
+ assertThat(StructFunctions.itemAccess(intVariant, 1)).isNull();
+ assertThat(StructFunctions.itemAccess(stringVariant, 1)).isNull();
+
+ // Accessing primitive variant with field name returns null
+ assertThat(StructFunctions.itemAccess(intVariant, "key")).isNull();
+ assertThat(StructFunctions.itemAccess(stringVariant,
"key")).isNull();
+ }
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index a45dbde2a..a522f4f34 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -427,6 +427,25 @@ class TransformParserTest {
List.of(Column.physicalColumn("binArr",
DataTypes.ARRAY(DataTypes.BINARY(16))));
testFilterExpressionWithColumns(
"binArr[1]", "(byte[]) itemAccess(binArr, 1)",
binaryArrayColumns);
+
+ // Variant access tests
+ List<Column> variantColumns = List.of(Column.physicalColumn("v",
DataTypes.VARIANT()));
+ testFilterExpressionWithColumns(
+ "v['key']",
+ "(org.apache.flink.cdc.common.types.variant.Variant)
itemAccess(v, \"key\")",
+ variantColumns);
+ testFilterExpressionWithColumns(
+ "v[1]",
+ "(org.apache.flink.cdc.common.types.variant.Variant)
itemAccess(v, 1)",
+ variantColumns);
+ testFilterExpressionWithColumns(
+ "v['a']['b']",
+ "(org.apache.flink.cdc.common.types.variant.Variant)
itemAccess((org.apache.flink.cdc.common.types.variant.Variant) itemAccess(v,
\"a\"), \"b\")",
+ variantColumns);
+ testFilterExpressionWithColumns(
+ "parse_json('{\"key\": \"value\"}')['key']",
+ "(org.apache.flink.cdc.common.types.variant.Variant)
itemAccess(parseJson(\"{\\\"key\\\": \\\"value\\\"}\"), \"key\")",
+ Collections.emptyList());
}
@Test