This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 09ad78918 [FLINK-38888][transform] YAML Pipeline supports item 
subscription of ARRAY, MAP, and ROW (#4241)
09ad78918 is described below

commit 09ad789184ecbc80d780884f3034f7c2f13ecc56
Author: Jia Fan <[email protected]>
AuthorDate: Wed Feb 4 10:12:04 2026 +0800

    [FLINK-38888][transform] YAML Pipeline supports item subscription of ARRAY, 
MAP, and ROW (#4241)
---
 docs/content.zh/docs/core-concept/transform.md     |   8 ++
 docs/content/docs/core-concept/transform.md        |  10 ++
 .../src/test/resources/specs/nested.yaml           |  15 --
 .../runtime/functions/impl/StructFunctions.java    |  70 +++++++++
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |  42 +++++-
 .../parser/metadata/TransformSqlOperatorTable.java |   6 +
 .../functions/impl/StructFunctionsTest.java        | 160 +++++++++++++++++++++
 .../cdc/runtime/parser/TransformParserTest.java    |  50 +++++++
 8 files changed, 345 insertions(+), 16 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index aec40d70a..5f278e798 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -226,6 +226,14 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any 
valid expression `<EXP
 | PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[, 
allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string 
is invalid, an error will be thrown. To return NULL instead of an error, use 
the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input 
JSON string, when `allowDuplicateKeys` is true, the parser will keep the last 
occurrence of all fields with the same key, otherwise it will throw an error. 
The default value of `allowDuplicateKe [...]
 | TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, 
allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the 
JSON string is invalid, return NULL. To throw an error instead of returning 
NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the 
input JSON string, when `allowDuplicateKeys` is true, the parser will keep the 
last occurrence of all fields with the same key, otherwise it will return NULL. 
The default value of `allowDu [...]
 
+## Struct Functions
+
+| Function | Janino Code | Description |
+| -------- | ----------- | ----------- |
+| array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL 
标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 |
+| map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为 
NULL,则返回 NULL。 |
+| row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1 
开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 |
+
 # 示例
 ## 添加计算列
 表达式可以用来生成新的列。例如,如果我们想基于表 `web_order` 在数据库 `mydb` 中添加两个计算列,我们可以定义一个转换规则如下:
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 584c3606d..d0a4901b4 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -227,6 +227,16 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any 
valid expression `<EXP
 | PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[, 
allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string 
is invalid, an error will be thrown. To return NULL instead of an error, use 
the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input 
JSON string, when `allowDuplicateKeys` is true, the parser will keep the last 
occurrence of all fields with the same key, otherwise it will throw an error. 
The default value of `allowDuplicateKe [...]
 | TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[, 
allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the 
JSON string is invalid, return NULL. To throw an error instead of returning 
NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the 
input JSON string, when `allowDuplicateKeys` is true, the parser will keep the 
last occurrence of all fields with the same key, otherwise it will return NULL. 
The default value of `allowDu [...]
 
+## Struct Functions
+
+Struct functions are used to access elements in ARRAY, MAP and ROW types using 
subscript syntax.
+
+| Function | Janino Code | Description |
+| -------- | ----------- | ----------- |
+| array[index] | itemAccess(array, index) | Returns the element at position 
`index` in the array. Index is 1-based (SQL standard). Returns NULL if the 
index is out of bounds or if the array is NULL. |
+| map[key] | itemAccess(map, key) | Returns the value associated with `key` in 
the map. Returns NULL if the key does not exist or if the map is NULL. |
+| row[index] | itemAccess(row, index) | Returns the field at position `index` 
in the row. Index is 1-based. The index must be a constant (not a computed 
expression) since the return type must be statically determined. |
+
 # Example
 ## Add computed columns
 Evaluation expressions can be used to generate new columns. For example, if we 
want to append two computed columns based on the table `web_order` in the 
database `mydb`, we may define a transform rule as follows:
diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml 
b/flink-cdc-composer/src/test/resources/specs/nested.yaml
index f230cc6ca..f61ec8f79 100644
--- a/flink-cdc-composer/src/test/resources/specs/nested.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml
@@ -119,7 +119,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, {"id":2}], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, {"id":2}], after=[], 
op=DELETE, meta=()}
 - do: Integer Array Subscripting
-  ignore: FLINK-38888
   projection: |-
     id_
     array_int_
@@ -133,7 +132,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: String Array Subscripting
-  ignore: FLINK-38888
   projection: |-
     id_
     array_string_
@@ -147,7 +145,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Array OOB Subscripting
-  ignore: FLINK-38888
   projection: |-
     id_
     array_string_[0] AS negative_overflow
@@ -161,7 +158,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Array Subscripting With Sub Expression
-  ignore: FLINK-38888
   projection: |-
     id_
     array_int_[1 + 1] AS int_key
@@ -175,7 +171,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Filter by Array-related Expression
-  ignore: FLINK-38888
   projection: id_, array_string_
   filter: array_string_[3] = '五'
   primary-key: id_
@@ -184,7 +179,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, 
five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], 
after=[], op=DELETE, meta=()}
 - do: Map Subscripting
-  ignore: FLINK-38888
   projection: |-
     id_
     map_int_string_
@@ -198,7 +192,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Map Subscripting with absent key
-  ignore: FLINK-38888
   projection: |-
     id_
     map_int_string_[233] AS map_value
@@ -211,7 +204,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], 
op=DELETE, meta=()}
 - do: Map Subscripting with complex expression
-  ignore: FLINK-38888
   projection: |-
     id_
     map_int_string_[1 + 2] AS map_value
@@ -224,7 +216,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], 
op=DELETE, meta=()}
 - do: Map Subscripting with nested expressions
-  ignore: FLINK-38888
   projection: |-
     id_
     map_string_array_string_[lower('ONE')] AS index_1
@@ -238,7 +229,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Map Subscripting With Absent Object
-  ignore: FLINK-38888
   projection: |-
     id_
     map_string_array_string_['foo'] AS index_1
@@ -252,7 +242,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[], 
op=DELETE, meta=()}
 - do: Filter by Map-related Expression
-  ignore: FLINK-38888
   projection: id_, map_int_string_
   filter: map_int_string_[1] = 'one'
   primary-key: id_
@@ -260,7 +249,6 @@
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, 
options=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> 
two, 3 -> three}], op=INSERT, meta=()}
 - do: Record Subscripting With Index
-  ignore: FLINK-38888
   projection: |-
     id_
     complex_row_
@@ -275,7 +263,6 @@
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null, 
null], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null], 
after=[], op=DELETE, meta=()}
 - do: Record Subscripting With Invalid Index
-  ignore: FLINK-38888
   projection: |-
     id_
     complex_row_
@@ -283,7 +270,6 @@
   primary-key: id_
   expect-error: 'Cannot infer type of field at position 0 within ROW type: 
RecordType(VARCHAR(65536) name, INTEGER length)'
 - do: Record Subscripting With Computed Index (Illegal, type must be 
statically determined)
-  ignore: FLINK-38888
   projection: |-
     id_
     complex_row_
@@ -291,7 +277,6 @@
   primary-key: id_
   expect-error: 'Cannot infer type of field at position null within ROW type: 
RecordType(VARCHAR(65536) name, INTEGER length)'
 - do: Filter by Record-related Expression
-  ignore: FLINK-38888
   projection: id_, complex_row_
   filter: complex_row_[1] = 'Derrida'
   primary-key: id_
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
new file mode 100644
index 000000000..603930cc9
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Built-in functions for collection and struct data types.
+ *
+ * <p>These functions support accessing elements from collections (ARRAY, MAP) 
and structured data
+ * types (ROW).
+ */
+public class StructFunctions {
+
+    /**
+     * Accesses an element from an ARRAY by index (1-based, SQL standard).
+     *
+     * <p>array[1] returns the first element.
+     *
+     * @param <T> the element type of the array
+     * @param array the array to access
+     * @param index the 1-based index
+     * @return the element at the specified index, or null if index is out of 
bounds
+     */
+    public static <T> T itemAccess(List<T> array, Integer index) {
+        if (array == null || index == null) {
+            return null;
+        }
+        // Convert 1-based index to 0-based (SQL standard uses 1-based 
indexing)
+        int zeroBasedIndex = index - 1;
+        if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) {
+            return null;
+        }
+        return array.get(zeroBasedIndex);
+    }
+
+    /**
+     * Accesses a value from a MAP by key.
+     *
+     * <p>map['key'] returns the value for 'key'.
+     *
+     * @param <K> the key type of the map
+     * @param <V> the value type of the map
+     * @param map the map to access
+     * @param key the key to look up
+     * @return the value for the specified key, or null if not found
+     */
+    public static <K, V> V itemAccess(Map<K, V> map, K key) {
+        if (map == null || key == null) {
+            return null;
+        }
+        return map.get(key);
+    }
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 4345331ac..fb5069255 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.cdc.common.utils.StringUtils;
 import 
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
 
@@ -89,7 +90,7 @@ public class JaninoCompiler {
     public static final String DEFAULT_TIME_ZONE = "__time_zone__";
 
     private static final String[] BUILTIN_FUNCTION_MODULES = {
-        "Arithmetic", "Casting", "Comparison", "Logical", "String", "Temporal"
+        "Arithmetic", "Casting", "Comparison", "Logical", "String", "Struct", 
"Temporal"
     };
 
     @VisibleForTesting
@@ -306,6 +307,8 @@ public class JaninoCompiler {
                 return generateTimestampAddOperation(context, sqlBasicCall, 
atoms);
             case OTHER:
                 return generateOtherOperation(context, sqlBasicCall, atoms);
+            case ITEM:
+                return generateItemAccessOperation(context, sqlBasicCall, 
atoms);
             default:
                 throw new ParseException("Unrecognized expression: " + 
sqlBasicCall);
         }
@@ -468,6 +471,43 @@ public class JaninoCompiler {
         throw new ParseException("Unrecognized expression: " + 
sqlBasicCall.toString());
     }
 
+    private static Java.Rvalue generateItemAccessOperation(
+            Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+        Preconditions.checkArgument(
+                atoms.length == 2,
+                "Expecting item accessing call %s to have 2 operands, got %s 
actually",
+                sqlBasicCall,
+                List.of(atoms));
+        Java.Rvalue methodInvocation =
+                new Java.MethodInvocation(Location.NOWHERE, null, 
"itemAccess", atoms);
+
+        // Deduce the return type and add a cast to ensure proper type 
conversion
+        DataType resultType =
+                TransformParser.deduceSubExpressionType(
+                        context.columns,
+                        sqlBasicCall,
+                        context.udfDescriptors,
+                        context.supportedMetadataColumns);
+
+        // Get the Java class for the result type and add a cast
+        // Use getCanonicalName() to correctly handle array types (e.g., 
byte[] instead of "[B")
+        Class<?> javaClass = JavaClassConverter.toJavaClass(resultType);
+        if (javaClass != null && javaClass != Object.class) {
+            String canonicalName = javaClass.getCanonicalName();
+            if (canonicalName != null) {
+                return new Java.Cast(
+                        Location.NOWHERE,
+                        new Java.ReferenceType(
+                                Location.NOWHERE,
+                                new Java.Annotation[0],
+                                canonicalName.split("\\."),
+                                null),
+                        methodInvocation);
+            }
+        }
+        return methodInvocation;
+    }
+
     private static Java.Rvalue generateOtherFunctionOperation(
             Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
         String operationName = 
sqlBasicCall.getOperator().getName().toUpperCase();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 468566b82..7d0f66362 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -398,6 +398,12 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
     // --------------
     public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
 
+    // ---------------------
+    // Struct Functions
+    // ---------------------
+    // Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
+    public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
+
     public static final SqlFunction AI_CHAT_PREDICT =
             new SqlFunction(
                     "AI_CHAT_PREDICT",
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
new file mode 100644
index 000000000..1ac03cc2b
--- /dev/null
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link StructFunctions}. */
+class StructFunctionsTest {
+
+    // ========================================
+    // List (ARRAY) Access Tests
+    // ========================================
+    @Nested
+    class ListAccessTests {
+
+        @Test
+        void testNormalAccess() {
+            List<String> list = Arrays.asList("one", "two", "three");
+
+            // SQL uses 1-based indexing
+            assertThat(StructFunctions.itemAccess(list, 1)).isEqualTo("one");
+            assertThat(StructFunctions.itemAccess(list, 2)).isEqualTo("two");
+            assertThat(StructFunctions.itemAccess(list, 3)).isEqualTo("three");
+        }
+
+        @Test
+        void testOutOfBoundsAccess() {
+            List<Integer> list = Arrays.asList(10, 20, 30);
+
+            // Index 0 is invalid in SQL (1-based indexing)
+            assertThat(StructFunctions.itemAccess(list, 0)).isNull();
+            // Negative index
+            assertThat(StructFunctions.itemAccess(list, -1)).isNull();
+            // Index beyond size
+            assertThat(StructFunctions.itemAccess(list, 4)).isNull();
+            assertThat(StructFunctions.itemAccess(list, 100)).isNull();
+        }
+
+        @Test
+        void testNullHandling() {
+            List<String> list = Arrays.asList("one", "two", "three");
+
+            // Null list returns null
+            assertThat(StructFunctions.itemAccess((List<String>) null, 
1)).isNull();
+            // Null index returns null
+            assertThat(StructFunctions.itemAccess(list, null)).isNull();
+        }
+
+        @Test
+        void testEmptyList() {
+            List<String> emptyList = Collections.emptyList();
+
+            assertThat(StructFunctions.itemAccess(emptyList, 1)).isNull();
+        }
+
+        @Test
+        void testListWithNullElement() {
+            List<String> listWithNull = new ArrayList<>();
+            listWithNull.add("first");
+            listWithNull.add(null);
+            listWithNull.add("third");
+
+            assertThat(StructFunctions.itemAccess(listWithNull, 
1)).isEqualTo("first");
+            assertThat(StructFunctions.itemAccess(listWithNull, 2)).isNull();
+            assertThat(StructFunctions.itemAccess(listWithNull, 
3)).isEqualTo("third");
+        }
+    }
+
+    // ========================================
+    // Map Access Tests
+    // ========================================
+    @Nested
+    class MapAccessTests {
+
+        @Test
+        void testNormalAccessWithStringKey() {
+            Map<String, Integer> map = new HashMap<>();
+            map.put("a", 1);
+            map.put("b", 2);
+            map.put("c", 3);
+
+            assertThat(StructFunctions.itemAccess(map, "a")).isEqualTo(1);
+            assertThat(StructFunctions.itemAccess(map, "b")).isEqualTo(2);
+            assertThat(StructFunctions.itemAccess(map, "c")).isEqualTo(3);
+        }
+
+        @Test
+        void testNormalAccessWithIntegerKey() {
+            Map<Integer, String> map = new HashMap<>();
+            map.put(1, "one");
+            map.put(2, "two");
+            map.put(3, "three");
+
+            assertThat(StructFunctions.itemAccess(map, 1)).isEqualTo("one");
+            assertThat(StructFunctions.itemAccess(map, 2)).isEqualTo("two");
+            assertThat(StructFunctions.itemAccess(map, 3)).isEqualTo("three");
+        }
+
+        @Test
+        void testMissingKey() {
+            Map<String, Integer> map = new HashMap<>();
+            map.put("exists", 1);
+
+            assertThat(StructFunctions.itemAccess(map, 
"nonexistent")).isNull();
+        }
+
+        @Test
+        void testNullHandling() {
+            Map<String, Integer> map = new HashMap<>();
+            map.put("a", 1);
+
+            // Null map returns null
+            assertThat(StructFunctions.itemAccess((Map<String, Integer>) null, 
"a")).isNull();
+            // Null key returns null
+            assertThat(StructFunctions.itemAccess(map, null)).isNull();
+        }
+
+        @Test
+        void testEmptyMap() {
+            Map<String, Integer> emptyMap = Collections.emptyMap();
+
+            assertThat(StructFunctions.itemAccess(emptyMap, "any")).isNull();
+        }
+
+        @Test
+        void testMapWithNullValue() {
+            Map<String, String> mapWithNullValue = new HashMap<>();
+            mapWithNullValue.put("key1", "value1");
+            mapWithNullValue.put("key2", null);
+
+            assertThat(StructFunctions.itemAccess(mapWithNullValue, 
"key1")).isEqualTo("value1");
+            assertThat(StructFunctions.itemAccess(mapWithNullValue, 
"key2")).isNull();
+        }
+    }
+}
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 3f1d02686..a45dbde2a 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -391,6 +391,44 @@ class TransformParserTest {
         testFilterExpression("try_parse_json(jsonStr)", 
"tryParseJson(jsonStr)");
     }
 
+    @Test
+    public void testTranslateItemAccessToJaninoExpression() {
+        // Test collection access functions (ARRAY, MAP) with proper column 
schema
+        List<Column> columns =
+                List.of(
+                        Column.physicalColumn("arr", 
DataTypes.ARRAY(DataTypes.STRING())),
+                        Column.physicalColumn(
+                                "m", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.INT())),
+                        Column.physicalColumn("idx", DataTypes.INT()),
+                        Column.physicalColumn("k", DataTypes.STRING()));
+
+        // Array access: array[index] - index is 1-based (SQL standard)
+        // Result type is String (from ARRAY<STRING>), so cast is added
+        testFilterExpressionWithColumns("arr[1]", "(java.lang.String) 
itemAccess(arr, 1)", columns);
+        testFilterExpressionWithColumns("arr[2]", "(java.lang.String) 
itemAccess(arr, 2)", columns);
+        testFilterExpressionWithColumns(
+                "arr[idx]", "(java.lang.String) itemAccess(arr, idx)", 
columns);
+        // Map access: map[key]
+        // Result type is Integer (from MAP<STRING, INT>), so cast is added
+        testFilterExpressionWithColumns(
+                "m['key']", "(java.lang.Integer) itemAccess(m, \"key\")", 
columns);
+        testFilterExpressionWithColumns("m[k]", "(java.lang.Integer) 
itemAccess(m, k)", columns);
+        // Nested access with comparisons
+        testFilterExpressionWithColumns(
+                "arr[1] = 'value'",
+                "valueEquals((java.lang.String) itemAccess(arr, 1), 
\"value\")",
+                columns);
+        testFilterExpressionWithColumns(
+                "m['key'] > 10",
+                "greaterThan((java.lang.Integer) itemAccess(m, \"key\"), 10)",
+                columns);
+
+        List<Column> binaryArrayColumns =
+                List.of(Column.physicalColumn("binArr", 
DataTypes.ARRAY(DataTypes.BINARY(16))));
+        testFilterExpressionWithColumns(
+                "binArr[1]", "(byte[]) itemAccess(binArr, 1)", 
binaryArrayColumns);
+    }
+
     @Test
     public void testTranslateFilterToJaninoExpressionError() {
         Assertions.assertThatThrownBy(
@@ -818,6 +856,18 @@ class TransformParserTest {
         Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
     }
 
+    private void testFilterExpressionWithColumns(
+            String expression, String expressionExpect, List<Column> columns) {
+        String janinoExpression =
+                TransformParser.translateFilterExpressionToJaninoExpression(
+                        expression,
+                        columns,
+                        Collections.emptyList(),
+                        new SupportedMetadataColumn[0],
+                        Collections.emptyMap());
+        Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
+    }
+
     private void testFilterExpressionWithUdf(String expression, String 
expressionExpect) {
         testFilterExpressionWithUdf(
                 expression, expressionExpect, DUMMY_COLUMNS, 
Collections.emptyMap());

Reply via email to