This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 37b7c5170 [FLINK-39010][transform] YAML Pipeline supports item 
subscription of Variant type (#4261)
37b7c5170 is described below

commit 37b7c5170d920874724bbcac6d5d924ed0c4ce96
Author: Jia Fan <[email protected]>
AuthorDate: Fri Feb 6 12:14:08 2026 +0800

    [FLINK-39010][transform] YAML Pipeline supports item subscription of 
Variant type (#4261)
---
 docs/content.zh/docs/core-concept/transform.md     |   4 +
 docs/content/docs/core-concept/transform.md        |   4 +-
 .../src/test/resources/specs/nested.yaml           |  93 ++++++++++++-
 .../runtime/functions/impl/StructFunctions.java    |  51 ++++++-
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |   5 +-
 .../parser/metadata/TransformSqlOperatorTable.java |   4 +-
 .../parser/metadata/VariantAwareItemOperator.java  | 113 +++++++++++++++
 .../functions/impl/StructFunctionsTest.java        | 154 +++++++++++++++++++++
 .../cdc/runtime/parser/TransformParserTest.java    |  19 +++
 9 files changed, 434 insertions(+), 13 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 5f278e798..35665fa8a 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -228,11 +228,15 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any 
valid expression `<EXP
 
 ## Struct Functions
 
+结构函数用于使用下标语法访问 ARRAY、MAP、ROW 和 VARIANT 类型中的元素。
+
 | Function | Janino Code | Description |
 | -------- | ----------- | ----------- |
 | array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL 
标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 |
 | map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为 
NULL,则返回 NULL。 |
 | row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1 
开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 |
+| variant[index] | itemAccess(variant, index) | 返回 variant 中位置 `index` 
的元素(如果是数组)。索引从 1 开始(SQL 标准)。如果索引超出范围或 variant 不是数组,则返回 NULL。 |
+| variant[key] | itemAccess(variant, key) | 返回 variant 中与 `key` 关联的值(如果是对象)。如果 
key 不存在或 variant 不是对象,则返回 NULL。 |
 
 # 示例
 ## 添加计算列
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index d0a4901b4..22fe8aff1 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -229,13 +229,15 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any 
valid expression `<EXP
 
 ## Struct Functions
 
-Struct functions are used to access elements in ARRAY, MAP and ROW types using 
subscript syntax.
+Struct functions are used to access elements in ARRAY, MAP, ROW, and VARIANT 
types using subscript syntax.
 
 | Function | Janino Code | Description |
 | -------- | ----------- | ----------- |
 | array[index] | itemAccess(array, index) | Returns the element at position 
`index` in the array. Index is 1-based (SQL standard). Returns NULL if the 
index is out of bounds or if the array is NULL. |
 | map[key] | itemAccess(map, key) | Returns the value associated with `key` in 
the map. Returns NULL if the key does not exist or if the map is NULL. |
 | row[index] | itemAccess(row, index) | Returns the field at position `index` 
in the row. Index is 1-based. The index must be a constant (not a computed 
expression) since the return type must be statically determined. |
+| variant[index] | itemAccess(variant, index) | Returns the element at 
position `index` in the variant (if it's an array). Index is 1-based (SQL 
standard). Returns NULL if the index is out of bounds or if the variant is not 
an array. |
+| variant[key] | itemAccess(variant, key) | Returns the value associated with 
`key` in the variant (if it's an object). Returns NULL if the key does not 
exist or if the variant is not an object. |
 
 # Example
 ## Add computed columns
diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml 
b/flink-cdc-composer/src/test/resources/specs/nested.yaml
index f61ec8f79..a600b469b 100644
--- a/flink-cdc-composer/src/test/resources/specs/nested.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml
@@ -35,7 +35,7 @@
     id_
     PARSE_JSON(id_) AS variant_id_
     PARSE_JSON('[' || id_ || ',' || bool_ || ']') AS variant_array_
-    PARSE_JSON('{\"id\": ' || id_ || ', \"name\": \"' || string_ || '\"}') AS 
variant_doc_
+    PARSE_JSON('{"id": ' || id_ || ', "name": "' || string_ || '"}') AS 
variant_doc_
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_id_` VARIANT,`variant_array_` VARIANT,`variant_doc_` 
VARIANT}, primaryKeys=id_, options=()}
@@ -55,7 +55,7 @@
     id_
     TRY_PARSE_JSON(id_) AS variant_id_
     TRY_PARSE_JSON('[' || id_ || ',' || bool_ || ']') AS variant_array_
-    TRY_PARSE_JSON('{\"id\": ' || id_ || ', \"name\": \"' || string_ || '\"}') 
AS variant_doc_
+    TRY_PARSE_JSON('{"id": ' || id_ || ', "name": "' || string_ || '"}') AS 
variant_doc_
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_id_` VARIANT,`variant_array_` VARIANT,`variant_doc_` 
VARIANT}, primaryKeys=id_, options=()}
@@ -79,13 +79,13 @@
 - do: Parse Variant from JSON with duplicate key rejected
   projection: |-
     id_
-    PARSE_JSON('{\"id\": 1, \"id\": 2}') AS variant_
+    PARSE_JSON('{"id": 1, "id": 2}') AS variant_
   primary-key: id_
   expect-error: 
'org.apache.flink.cdc.common.types.variant.VariantTypeException: 
VARIANT_DUPLICATE_KEY'
 - do: Parse Variant from JSON with duplicate key allowed
   projection: |-
     id_
-    PARSE_JSON('{\"id\": 1, \"id\": 2}', TRUE) AS variant_
+    PARSE_JSON('{"id": 1, "id": 2}', TRUE) AS variant_
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_` VARIANT}, primaryKeys=id_, options=()}
@@ -97,7 +97,7 @@
 - do: Try Parse Variant from JSON with duplicate key rejected
   projection: |-
     id_
-    TRY_PARSE_JSON('{\"id\": 1, \"id\": 2}') AS variant_
+    TRY_PARSE_JSON('{"id": 1, "id": 2}') AS variant_
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_` VARIANT}, primaryKeys=id_, options=()}
@@ -109,7 +109,7 @@
 - do: Try Parse Variant from JSON with duplicate key allowed
   projection: |-
     id_
-    TRY_PARSE_JSON('{\"id\": 1, \"id\": 2}', TRUE) AS variant_
+    TRY_PARSE_JSON('{"id": 1, "id": 2}', TRUE) AS variant_
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_` VARIANT}, primaryKeys=id_, options=()}
@@ -283,4 +283,83 @@
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>}, 
primaryKeys=id_, options=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice, 
length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], 
op=UPDATE, meta=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida, 
length: INT -> 7}], after=[], op=DELETE, meta=()}
\ No newline at end of file
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida, 
length: INT -> 7}], after=[], op=DELETE, meta=()}
+- do: Variant Object Subscripting With String Key
+  projection: |-
+    id_
+    variant_
+    variant_['k'] AS variant_k
+  primary-key: id_
+  expect: |-
+    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_` VARIANT,`variant_k` VARIANT}, primaryKeys=id_, 
options=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, 1], op=INSERT, 
meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, 
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, 1], after=[-1, 
[{"k":1},"hello",{"k":2}], null], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, 
[{"k":1},"hello",{"k":2}], null], after=[], op=DELETE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
+- do: Variant Array Subscripting With Integer Index
+  projection: |-
+    id_
+    variant_
+    variant_[1] AS variant_first
+    variant_[2] AS variant_second
+  primary-key: id_
+  expect: |-
+    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`variant_` VARIANT,`variant_first` VARIANT,`variant_second` 
VARIANT}, primaryKeys=id_, options=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, null, null], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, 
{"array":[1,2,{"kk":1.123}],"k":1,"object":{"k":"hello"}}, null, null], 
after=[-1, [{"k":1},"hello",{"k":2}], {"k":1}, "hello"], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, 
[{"k":1},"hello",{"k":2}], {"k":1}, "hello"], after=[], op=DELETE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, 
null], op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null], 
after=[], op=DELETE, meta=()}
+- do: Variant Nested Subscripting
+  projection: |-
+    id_
+    variant_['object']['k'] AS nested_object_k
+    variant_['array'][1] AS nested_array_first
+  primary-key: id_
+  expect: |-
+    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`nested_object_k` VARIANT,`nested_array_first` VARIANT}, 
primaryKeys=id_, options=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, "hello", 1], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, "hello", 1], after=[-1, 
null, null], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, null], after=[], 
op=DELETE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
+- do: Variant Subscripting With Absent Key
+  projection: |-
+    id_
+    variant_['nonexistent'] AS absent_key
+    variant_[1000] AS absent_index
+  primary-key: id_
+  expect: |-
+    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`absent_key` VARIANT,`absent_index` VARIANT}, 
primaryKeys=id_, options=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, null, null], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, null, null], after=[-1, 
null, null], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, null, null], after=[], 
op=DELETE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
+- do: Variant Subscripting With Parsed JSON
+  projection: |-
+    id_
+    PARSE_JSON('[1, 2, 3]')[2] AS array_second
+    PARSE_JSON('[10, 20, 30]')[1] AS array_first
+  primary-key: id_
+  expect: |-
+    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`array_second` VARIANT,`array_first` VARIANT}, 
primaryKeys=id_, options=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, 2, 10], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, 2, 10], after=[-1, 2, 10], 
op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, 2, 10], after=[], 
op=DELETE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, 2, 10], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[0, 2, 10], after=[], 
op=DELETE, meta=()}
+- do: Variant Subscripting With Parsed JSON Object
+  projection: |-
+    id_
+    PARSE_JSON('{"key": "value"}')['key'] AS json_key
+    PARSE_JSON('{"nested": {"inner": 42}}')['nested']['inner'] AS nested_value
+  primary-key: id_
+  expect: |-
+    CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`json_key` VARIANT,`nested_value` VARIANT}, primaryKeys=id_, 
options=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, "value", 42], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, "value", 42], after=[-1, 
"value", 42], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[-1, "value", 42], after=[], 
op=DELETE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, "value", 42], 
op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[0, "value", 42], after=[], 
op=DELETE, meta=()}
\ No newline at end of file
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
index 603930cc9..9ce343782 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
@@ -17,14 +17,16 @@
 
 package org.apache.flink.cdc.runtime.functions.impl;
 
+import org.apache.flink.cdc.common.types.variant.Variant;
+
 import java.util.List;
 import java.util.Map;
 
 /**
  * Built-in functions for collection and struct data types.
  *
- * <p>These functions support accessing elements from collections (ARRAY, MAP) 
and structured data
- * types (ROW).
+ * <p>These functions support accessing elements from collections (ARRAY, 
MAP), structured data
+ * types (ROW), and semi-structured data types (VARIANT).
  */
 public class StructFunctions {
 
@@ -67,4 +69,49 @@ public class StructFunctions {
         }
         return map.get(key);
     }
+
+    /**
+     * Accesses an element from a VARIANT array by index (1-based, SQL 
standard).
+     *
+     * <p>variant[1] returns the first element.
+     *
+     * @param variant the variant (must be an array) to access
+     * @param index the 1-based index
+     * @return the element at the specified index as a Variant, or null if the 
variant is not an
+     *     array or index is out of bounds
+     */
+    public static Variant itemAccess(Variant variant, Integer index) {
+        if (variant == null || index == null) {
+            return null;
+        }
+        if (!variant.isArray()) {
+            return null;
+        }
+        // Convert 1-based index to 0-based (SQL standard uses 1-based 
indexing)
+        int zeroBasedIndex = index - 1;
+        if (zeroBasedIndex < 0 || zeroBasedIndex >= variant.arraySize()) {
+            return null;
+        }
+        return variant.getElement(zeroBasedIndex);
+    }
+
+    /**
+     * Accesses a field from a VARIANT object by field name.
+     *
+     * <p>variant['fieldName'] returns the value of the specified field.
+     *
+     * @param variant the variant (must be an object) to access
+     * @param fieldName the name of the field to look up
+     * @return the field value as a Variant, or null if the variant is not an 
object or field is not
+     *     found
+     */
+    public static Variant itemAccess(Variant variant, String fieldName) {
+        if (variant == null || fieldName == null) {
+            return null;
+        }
+        if (!variant.isObject()) {
+            return null;
+        }
+        return variant.getField(fieldName);
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index fb5069255..14c483d70 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -174,7 +174,10 @@ public class JaninoCompiler {
         Object value = sqlLiteral.getValue();
         if (sqlLiteral instanceof SqlCharStringLiteral) {
             // Double quotation marks represent strings in Janino.
-            value = "\"" + sqlLiteral.getValueAs(NlsString.class).getValue() + 
"\"";
+            // Escape backslashes first, then double quotes for proper Janino 
string literals.
+            String stringValue = 
sqlLiteral.getValueAs(NlsString.class).getValue();
+            stringValue = stringValue.replace("\\", "\\\\").replace("\"", 
"\\\"");
+            value = "\"" + stringValue + "\"";
         } else if (sqlLiteral instanceof SqlNumericLiteral) {
             if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
                 long longValue = sqlLiteral.longValue(true);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 7d0f66362..6044fbd5d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -401,8 +401,8 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
     // ---------------------
     // Struct Functions
     // ---------------------
-    // Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
-    public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
+    // Supports accessing elements of ARRAY[index], ROW[index], MAP[key], and 
VARIANT[index/key]
+    public static final SqlOperator ITEM = new VariantAwareItemOperator();
 
     public static final SqlFunction AI_CHAT_PREDICT =
             new SqlFunction(
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/VariantAwareItemOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/VariantAwareItemOperator.java
new file mode 100644
index 000000000..d8f989c61
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/VariantAwareItemOperator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.parser.metadata;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+
+/**
+ * A variant-aware implementation of the ITEM operator.
+ *
+ * <p>This extends the standard Calcite ITEM operator to support VARIANT type. 
For ARRAY, MAP, and
+ * ROW types, it delegates to the standard Calcite ITEM operator.
+ *
+ * <p>When accessing elements of a VARIANT:
+ *
+ * <ul>
+ *   <li>VARIANT[INTEGER] - accesses array elements (1-based index)
+ *   <li>VARIANT[STRING] - accesses object fields by name
+ * </ul>
+ *
+ * <p>The return type is always VARIANT when the source is VARIANT.
+ */
+public class VariantAwareItemOperator extends SqlSpecialOperator {
+
+    public VariantAwareItemOperator() {
+        super("ITEM", SqlKind.ITEM, 100, true, null, null, null);
+    }
+
+    @Override
+    public SqlOperandCountRange getOperandCountRange() {
+        return SqlStdOperatorTable.ITEM.getOperandCountRange();
+    }
+
+    @Override
+    public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+        final SqlValidator validator = callBinding.getValidator();
+        final RelDataType operandType =
+                validator.deriveType(callBinding.getScope(), 
callBinding.operand(0));
+        final SqlTypeName typeName = operandType.getSqlTypeName();
+
+        // Only handle VARIANT specially, delegate others to standard ITEM
+        if (typeName == SqlTypeName.VARIANT) {
+            final RelDataType keyType =
+                    validator.deriveType(callBinding.getScope(), 
callBinding.operand(1));
+            // VARIANT accepts integer or string keys
+            boolean valid =
+                    SqlTypeUtil.isIntType(keyType)
+                            || SqlTypeUtil.isCharacter(keyType)
+                            || keyType.getSqlTypeName() == SqlTypeName.ANY;
+            if (!valid && throwOnFailure) {
+                throw callBinding.newValidationSignatureError();
+            }
+            return valid;
+        }
+
+        // Delegate to standard ITEM for ARRAY, MAP, ROW, etc.
+        return SqlStdOperatorTable.ITEM.checkOperandTypes(callBinding, 
throwOnFailure);
+    }
+
+    @Override
+    public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
+        final RelDataType operandType = opBinding.getOperandType(0);
+        final SqlTypeName typeName = operandType.getSqlTypeName();
+
+        // Only handle VARIANT specially
+        if (typeName == SqlTypeName.VARIANT) {
+            final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+            return typeFactory.createTypeWithNullability(
+                    typeFactory.createSqlType(SqlTypeName.VARIANT), true);
+        }
+
+        // Delegate to standard ITEM for other types
+        return SqlStdOperatorTable.ITEM.inferReturnType(opBinding);
+    }
+
+    @Override
+    public String getAllowedSignatures(String opName) {
+        // Include VARIANT in addition to standard signatures
+        return SqlStdOperatorTable.ITEM.getAllowedSignatures(opName)
+                + "\n<VARIANT>[<INTEGER>|<CHARACTER>]";
+    }
+
+    @Override
+    public void unparse(
+            SqlWriter writer, org.apache.calcite.sql.SqlCall call, int 
leftPrec, int rightPrec) {
+        SqlStdOperatorTable.ITEM.unparse(writer, call, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
index 1ac03cc2b..4d9510451 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.cdc.runtime.functions.impl;
 
+import org.apache.flink.cdc.common.types.variant.Variant;
+import org.apache.flink.cdc.common.types.variant.VariantBuilder;
+
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
@@ -157,4 +160,155 @@ class StructFunctionsTest {
             assertThat(StructFunctions.itemAccess(mapWithNullValue, 
"key2")).isNull();
         }
     }
+
+    // ========================================
+    // Variant Access Tests
+    // ========================================
+    @Nested
+    class VariantAccessTests {
+
+        private final VariantBuilder builder = Variant.newBuilder();
+
+        @Test
+        void testVariantArrayAccessByIndex() {
+            // Build a variant array: [1, 2, 3]
+            Variant arrayVariant =
+                    builder.array()
+                            .add(builder.of(1))
+                            .add(builder.of(2))
+                            .add(builder.of(3))
+                            .build();
+
+            // SQL uses 1-based indexing
+            Variant first = StructFunctions.itemAccess(arrayVariant, 1);
+            assertThat(first).isNotNull();
+            assertThat(first.getInt()).isEqualTo(1);
+
+            Variant second = StructFunctions.itemAccess(arrayVariant, 2);
+            assertThat(second).isNotNull();
+            assertThat(second.getInt()).isEqualTo(2);
+
+            Variant third = StructFunctions.itemAccess(arrayVariant, 3);
+            assertThat(third).isNotNull();
+            assertThat(third.getInt()).isEqualTo(3);
+        }
+
+        @Test
+        void testVariantArrayOutOfBoundsAccess() {
+            Variant arrayVariant =
+                    builder.array()
+                            .add(builder.of(10))
+                            .add(builder.of(20))
+                            .add(builder.of(30))
+                            .build();
+
+            // Index 0 is invalid in SQL (1-based indexing)
+            assertThat(StructFunctions.itemAccess(arrayVariant, 0)).isNull();
+            // Negative index
+            assertThat(StructFunctions.itemAccess(arrayVariant, -1)).isNull();
+            // Index beyond size
+            assertThat(StructFunctions.itemAccess(arrayVariant, 4)).isNull();
+            assertThat(StructFunctions.itemAccess(arrayVariant, 100)).isNull();
+        }
+
+        @Test
+        void testVariantObjectAccessByFieldName() {
+            // Build a variant object: {"name": "Alice", "age": 30}
+            Variant objectVariant =
+                    builder.object()
+                            .add("name", builder.of("Alice"))
+                            .add("age", builder.of(30))
+                            .build();
+
+            Variant name = StructFunctions.itemAccess(objectVariant, "name");
+            assertThat(name).isNotNull();
+            assertThat(name.getString()).isEqualTo("Alice");
+
+            Variant age = StructFunctions.itemAccess(objectVariant, "age");
+            assertThat(age).isNotNull();
+            assertThat(age.getInt()).isEqualTo(30);
+        }
+
+        @Test
+        void testVariantObjectMissingField() {
+            Variant objectVariant = builder.object().add("exists", 
builder.of("value")).build();
+
+            assertThat(StructFunctions.itemAccess(objectVariant, 
"nonexistent")).isNull();
+        }
+
+        @Test
+        void testVariantNullHandling() {
+            Variant arrayVariant = builder.array().add(builder.of(1)).build();
+            Variant objectVariant = builder.object().add("key", 
builder.of("value")).build();
+
+            // Null variant returns null
+            assertThat(StructFunctions.itemAccess((Variant) null, 1)).isNull();
+            assertThat(StructFunctions.itemAccess((Variant) null, 
"key")).isNull();
+
+            // Null index returns null
+            assertThat(StructFunctions.itemAccess(arrayVariant, (Integer) 
null)).isNull();
+
+            // Null field name returns null
+            assertThat(StructFunctions.itemAccess(objectVariant, (String) 
null)).isNull();
+        }
+
+        @Test
+        void testVariantTypeMismatch() {
+            Variant arrayVariant = builder.array().add(builder.of(1)).build();
+            Variant objectVariant = builder.object().add("key", 
builder.of("value")).build();
+
+            // Accessing array with string key returns null
+            assertThat(StructFunctions.itemAccess(arrayVariant, 
"key")).isNull();
+
+            // Accessing object with integer index returns null
+            assertThat(StructFunctions.itemAccess(objectVariant, 1)).isNull();
+        }
+
+        @Test
+        void testNestedVariantAccess() {
+            // Build a nested variant: {"data": [1, {"nested": "value"}]}
+            Variant nestedVariant =
+                    builder.object()
+                            .add(
+                                    "data",
+                                    builder.array()
+                                            .add(builder.of(1))
+                                            .add(
+                                                    builder.object()
+                                                            .add("nested", 
builder.of("value"))
+                                                            .build())
+                                            .build())
+                            .build();
+
+            // Access "data" field
+            Variant data = StructFunctions.itemAccess(nestedVariant, "data");
+            assertThat(data).isNotNull();
+            assertThat(data.isArray()).isTrue();
+
+            // Access second element of the array (index 2 in 1-based SQL 
standard)
+            Variant secondElement = StructFunctions.itemAccess(data, 2);
+            assertThat(secondElement).isNotNull();
+            assertThat(secondElement.isObject()).isTrue();
+
+            // Access "nested" field
+            Variant nestedValue = StructFunctions.itemAccess(secondElement, 
"nested");
+            assertThat(nestedValue).isNotNull();
+            assertThat(nestedValue.getString()).isEqualTo("value");
+        }
+
+        @Test
+        void testPrimitiveVariantAccess() {
+            // Primitive variants are neither arrays nor objects
+            Variant intVariant = builder.of(42);
+            Variant stringVariant = builder.of("hello");
+
+            // Accessing primitive variant with index returns null
+            assertThat(StructFunctions.itemAccess(intVariant, 1)).isNull();
+            assertThat(StructFunctions.itemAccess(stringVariant, 1)).isNull();
+
+            // Accessing primitive variant with field name returns null
+            assertThat(StructFunctions.itemAccess(intVariant, "key")).isNull();
+            assertThat(StructFunctions.itemAccess(stringVariant, 
"key")).isNull();
+        }
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index a45dbde2a..a522f4f34 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -427,6 +427,25 @@ class TransformParserTest {
                 List.of(Column.physicalColumn("binArr", 
DataTypes.ARRAY(DataTypes.BINARY(16))));
         testFilterExpressionWithColumns(
                 "binArr[1]", "(byte[]) itemAccess(binArr, 1)", 
binaryArrayColumns);
+
+        // Variant access tests
+        List<Column> variantColumns = List.of(Column.physicalColumn("v", 
DataTypes.VARIANT()));
+        testFilterExpressionWithColumns(
+                "v['key']",
+                "(org.apache.flink.cdc.common.types.variant.Variant) 
itemAccess(v, \"key\")",
+                variantColumns);
+        testFilterExpressionWithColumns(
+                "v[1]",
+                "(org.apache.flink.cdc.common.types.variant.Variant) 
itemAccess(v, 1)",
+                variantColumns);
+        testFilterExpressionWithColumns(
+                "v['a']['b']",
+                "(org.apache.flink.cdc.common.types.variant.Variant) 
itemAccess((org.apache.flink.cdc.common.types.variant.Variant) itemAccess(v, 
\"a\"), \"b\")",
+                variantColumns);
+        testFilterExpressionWithColumns(
+                "parse_json('{\"key\": \"value\"}')['key']",
+                "(org.apache.flink.cdc.common.types.variant.Variant) 
itemAccess(parseJson(\"{\\\"key\\\": \\\"value\\\"}\"), \"key\")",
+                Collections.emptyList());
     }
 
     @Test

Reply via email to