This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 09ad78918 [FLINK-38888][transform] YAML Pipeline supports item
subscription of ARRAY, MAP, and ROW (#4241)
09ad78918 is described below
commit 09ad789184ecbc80d780884f3034f7c2f13ecc56
Author: Jia Fan <[email protected]>
AuthorDate: Wed Feb 4 10:12:04 2026 +0800
[FLINK-38888][transform] YAML Pipeline supports item subscription of ARRAY,
MAP, and ROW (#4241)
---
docs/content.zh/docs/core-concept/transform.md | 8 ++
docs/content/docs/core-concept/transform.md | 10 ++
.../src/test/resources/specs/nested.yaml | 15 --
.../runtime/functions/impl/StructFunctions.java | 70 +++++++++
.../flink/cdc/runtime/parser/JaninoCompiler.java | 42 +++++-
.../parser/metadata/TransformSqlOperatorTable.java | 6 +
.../functions/impl/StructFunctionsTest.java | 160 +++++++++++++++++++++
.../cdc/runtime/parser/TransformParserTest.java | 50 +++++++
8 files changed, 345 insertions(+), 16 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index aec40d70a..5f278e798 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -226,6 +226,14 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any
valid expression `<EXP
| PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[,
allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string
is invalid, an error will be thrown. To return NULL instead of an error, use
the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input
JSON string, when `allowDuplicateKeys` is true, the parser will keep the last
occurrence of all fields with the same key, otherwise it will throw an error.
The default value of `allowDuplicateKe [...]
| TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[,
allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the
JSON string is invalid, return NULL. To throw an error instead of returning
NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the
input JSON string, when `allowDuplicateKeys` is true, the parser will keep the
last occurrence of all fields with the same key, otherwise it will return NULL.
The default value of `allowDu [...]
+## Struct Functions
+
+| Function | Janino Code | Description |
+| -------- | ----------- | ----------- |
+| array[index] | itemAccess(array, index) | 返回数组中位置 `index` 的元素。索引从 1 开始(SQL
标准)。如果索引超出范围或数组为 NULL,则返回 NULL。 |
+| map[key] | itemAccess(map, key) | 返回 map 中与 `key` 关联的值。如果 key 不存在或 map 为
NULL,则返回 NULL。 |
+| row[index] | itemAccess(row, index) | 返回 row 中位置 `index` 的字段。索引从 1
开始。索引必须是常量(不能是计算表达式),因为返回类型必须在静态阶段确定。 |
+
# 示例
## 添加计算列
表达式可以用来生成新的列。例如,如果我们想基于表 `web_order` 在数据库 `mydb` 中添加两个计算列,我们可以定义一个转换规则如下:
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index 584c3606d..d0a4901b4 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -227,6 +227,16 @@ You can use `CAST( <EXPR> AS <T> )` syntax to convert any
valid expression `<EXP
| PARSE_JSON(string[, allowDuplicateKeys]) | parseJson(string[,
allowDuplicateKeys]) | Parse a JSON string into a Variant. If the JSON string
is invalid, an error will be thrown. To return NULL instead of an error, use
the TRY_PARSE_JSON function. <br><br> If there are duplicate keys in the input
JSON string, when `allowDuplicateKeys` is true, the parser will keep the last
occurrence of all fields with the same key, otherwise it will throw an error.
The default value of `allowDuplicateKe [...]
| TRY_PARSE_JSON(string[, allowDuplicateKeys]) | tryParseJson(string[,
allowDuplicateKeys]) | Parse a JSON string into a Variant if possible. If the
JSON string is invalid, return NULL. To throw an error instead of returning
NULL, use the PARSE_JSON function. <br><br> If there are duplicate keys in the
input JSON string, when `allowDuplicateKeys` is true, the parser will keep the
last occurrence of all fields with the same key, otherwise it will return NULL.
The default value of `allowDu [...]
+## Struct Functions
+
+Struct functions are used to access elements in ARRAY, MAP and ROW types using
subscript syntax.
+
+| Function | Janino Code | Description |
+| -------- | ----------- | ----------- |
+| array[index] | itemAccess(array, index) | Returns the element at position
`index` in the array. Index is 1-based (SQL standard). Returns NULL if the
index is out of bounds or if the array is NULL. |
+| map[key] | itemAccess(map, key) | Returns the value associated with `key` in
the map. Returns NULL if the key does not exist or if the map is NULL. |
+| row[index] | itemAccess(row, index) | Returns the field at position `index`
in the row. Index is 1-based. The index must be a constant (not a computed
expression) since the return type must be statically determined. |
+
# Example
## Add computed columns
Evaluation expressions can be used to generate new columns. For example, if we
want to append two computed columns based on the table `web_order` in the
database `mydb`, we may define a transform rule as follows:
diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml
b/flink-cdc-composer/src/test/resources/specs/nested.yaml
index f230cc6ca..f61ec8f79 100644
--- a/flink-cdc-composer/src/test/resources/specs/nested.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml
@@ -119,7 +119,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, {"id":2}],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, {"id":2}], after=[],
op=DELETE, meta=()}
- do: Integer Array Subscripting
- ignore: FLINK-38888
projection: |-
id_
array_int_
@@ -133,7 +132,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: String Array Subscripting
- ignore: FLINK-38888
projection: |-
id_
array_string_
@@ -147,7 +145,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Array OOB Subscripting
- ignore: FLINK-38888
projection: |-
id_
array_string_[0] AS negative_overflow
@@ -161,7 +158,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Array Subscripting With Sub Expression
- ignore: FLINK-38888
projection: |-
id_
array_int_[1 + 1] AS int_key
@@ -175,7 +171,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Filter by Array-related Expression
- ignore: FLINK-38888
projection: id_, array_string_
filter: array_string_[3] = '五'
primary-key: id_
@@ -184,7 +179,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three,
five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]],
after=[], op=DELETE, meta=()}
- do: Map Subscripting
- ignore: FLINK-38888
projection: |-
id_
map_int_string_
@@ -198,7 +192,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Map Subscripting with absent key
- ignore: FLINK-38888
projection: |-
id_
map_int_string_[233] AS map_value
@@ -211,7 +204,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[],
op=DELETE, meta=()}
- do: Map Subscripting with complex expression
- ignore: FLINK-38888
projection: |-
id_
map_int_string_[1 + 2] AS map_value
@@ -224,7 +216,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[],
op=DELETE, meta=()}
- do: Map Subscripting with nested expressions
- ignore: FLINK-38888
projection: |-
id_
map_string_array_string_[lower('ONE')] AS index_1
@@ -238,7 +229,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Map Subscripting With Absent Object
- ignore: FLINK-38888
projection: |-
id_
map_string_array_string_['foo'] AS index_1
@@ -252,7 +242,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null], after=[],
op=DELETE, meta=()}
- do: Filter by Map-related Expression
- ignore: FLINK-38888
projection: id_, map_int_string_
filter: map_int_string_[1] = 'one'
primary-key: id_
@@ -260,7 +249,6 @@
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_,
options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 ->
two, 3 -> three}], op=INSERT, meta=()}
- do: Record Subscripting With Index
- ignore: FLINK-38888
projection: |-
id_
complex_row_
@@ -275,7 +263,6 @@
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null, null,
null], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null, null, null],
after=[], op=DELETE, meta=()}
- do: Record Subscripting With Invalid Index
- ignore: FLINK-38888
projection: |-
id_
complex_row_
@@ -283,7 +270,6 @@
primary-key: id_
expect-error: 'Cannot infer type of field at position 0 within ROW type:
RecordType(VARCHAR(65536) name, INTEGER length)'
- do: Record Subscripting With Computed Index (Illegal, type must be
statically determined)
- ignore: FLINK-38888
projection: |-
id_
complex_row_
@@ -291,7 +277,6 @@
primary-key: id_
expect-error: 'Cannot infer type of field at position null within ROW type:
RecordType(VARCHAR(65536) name, INTEGER length)'
- do: Filter by Record-related Expression
- ignore: FLINK-38888
projection: id_, complex_row_
filter: complex_row_[1] = 'Derrida'
primary-key: id_
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
new file mode 100644
index 000000000..603930cc9
--- /dev/null
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctions.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Built-in functions for collection and struct data types.
+ *
+ * <p>These functions support accessing elements from collections (ARRAY, MAP)
and structured data
+ * types (ROW).
+ */
+public class StructFunctions {
+
+ /**
+ * Accesses an element from an ARRAY by index (1-based, SQL standard).
+ *
+ * <p>array[1] returns the first element.
+ *
+ * @param <T> the element type of the array
+ * @param array the array to access
+ * @param index the 1-based index
+ * @return the element at the specified index, or null if index is out of
bounds
+ */
+ public static <T> T itemAccess(List<T> array, Integer index) {
+ if (array == null || index == null) {
+ return null;
+ }
+ // Convert 1-based index to 0-based (SQL standard uses 1-based
indexing)
+ int zeroBasedIndex = index - 1;
+ if (zeroBasedIndex < 0 || zeroBasedIndex >= array.size()) {
+ return null;
+ }
+ return array.get(zeroBasedIndex);
+ }
+
+ /**
+ * Accesses a value from a MAP by key.
+ *
+ * <p>map['key'] returns the value for 'key'.
+ *
+ * @param <K> the key type of the map
+ * @param <V> the value type of the map
+ * @param map the map to access
+ * @param key the key to look up
+ * @return the value for the specified key, or null if not found
+ */
+ public static <K, V> V itemAccess(Map<K, V> map, K key) {
+ if (map == null || key == null) {
+ return null;
+ }
+ return map.get(key);
+ }
+}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index 4345331ac..fb5069255 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -25,6 +25,7 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeRoot;
+import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
@@ -89,7 +90,7 @@ public class JaninoCompiler {
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
private static final String[] BUILTIN_FUNCTION_MODULES = {
- "Arithmetic", "Casting", "Comparison", "Logical", "String", "Temporal"
+ "Arithmetic", "Casting", "Comparison", "Logical", "String", "Struct",
"Temporal"
};
@VisibleForTesting
@@ -306,6 +307,8 @@ public class JaninoCompiler {
return generateTimestampAddOperation(context, sqlBasicCall,
atoms);
case OTHER:
return generateOtherOperation(context, sqlBasicCall, atoms);
+ case ITEM:
+ return generateItemAccessOperation(context, sqlBasicCall,
atoms);
default:
throw new ParseException("Unrecognized expression: " +
sqlBasicCall);
}
@@ -468,6 +471,43 @@ public class JaninoCompiler {
throw new ParseException("Unrecognized expression: " +
sqlBasicCall.toString());
}
+ private static Java.Rvalue generateItemAccessOperation(
+ Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
+ Preconditions.checkArgument(
+ atoms.length == 2,
+ "Expecting item accessing call %s to have 2 operands, got %s
actually",
+ sqlBasicCall,
+ List.of(atoms));
+ Java.Rvalue methodInvocation =
+ new Java.MethodInvocation(Location.NOWHERE, null,
"itemAccess", atoms);
+
+ // Deduce the return type and add a cast to ensure proper type
conversion
+ DataType resultType =
+ TransformParser.deduceSubExpressionType(
+ context.columns,
+ sqlBasicCall,
+ context.udfDescriptors,
+ context.supportedMetadataColumns);
+
+ // Get the Java class for the result type and add a cast
+ // Use getCanonicalName() to correctly handle array types (e.g.,
byte[] instead of "[B")
+ Class<?> javaClass = JavaClassConverter.toJavaClass(resultType);
+ if (javaClass != null && javaClass != Object.class) {
+ String canonicalName = javaClass.getCanonicalName();
+ if (canonicalName != null) {
+ return new Java.Cast(
+ Location.NOWHERE,
+ new Java.ReferenceType(
+ Location.NOWHERE,
+ new Java.Annotation[0],
+ canonicalName.split("\\."),
+ null),
+ methodInvocation);
+ }
+ }
+ return methodInvocation;
+ }
+
private static Java.Rvalue generateOtherFunctionOperation(
Context context, SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
String operationName =
sqlBasicCall.getOperator().getName().toUpperCase();
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 468566b82..7d0f66362 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -398,6 +398,12 @@ public class TransformSqlOperatorTable extends
ReflectiveSqlOperatorTable {
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
+ // ---------------------
+ // Struct Functions
+ // ---------------------
+ // Supports accessing elements of ARRAY[index], ROW[index] and MAP[key]
+ public static final SqlOperator ITEM = SqlStdOperatorTable.ITEM;
+
public static final SqlFunction AI_CHAT_PREDICT =
new SqlFunction(
"AI_CHAT_PREDICT",
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
new file mode 100644
index 000000000..1ac03cc2b
--- /dev/null
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/functions/impl/StructFunctionsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link StructFunctions}. */
+class StructFunctionsTest {
+
+ // ========================================
+ // List (ARRAY) Access Tests
+ // ========================================
+ @Nested
+ class ListAccessTests {
+
+ @Test
+ void testNormalAccess() {
+ List<String> list = Arrays.asList("one", "two", "three");
+
+ // SQL uses 1-based indexing
+ assertThat(StructFunctions.itemAccess(list, 1)).isEqualTo("one");
+ assertThat(StructFunctions.itemAccess(list, 2)).isEqualTo("two");
+ assertThat(StructFunctions.itemAccess(list, 3)).isEqualTo("three");
+ }
+
+ @Test
+ void testOutOfBoundsAccess() {
+ List<Integer> list = Arrays.asList(10, 20, 30);
+
+ // Index 0 is invalid in SQL (1-based indexing)
+ assertThat(StructFunctions.itemAccess(list, 0)).isNull();
+ // Negative index
+ assertThat(StructFunctions.itemAccess(list, -1)).isNull();
+ // Index beyond size
+ assertThat(StructFunctions.itemAccess(list, 4)).isNull();
+ assertThat(StructFunctions.itemAccess(list, 100)).isNull();
+ }
+
+ @Test
+ void testNullHandling() {
+ List<String> list = Arrays.asList("one", "two", "three");
+
+ // Null list returns null
+ assertThat(StructFunctions.itemAccess((List<String>) null,
1)).isNull();
+ // Null index returns null
+ assertThat(StructFunctions.itemAccess(list, null)).isNull();
+ }
+
+ @Test
+ void testEmptyList() {
+ List<String> emptyList = Collections.emptyList();
+
+ assertThat(StructFunctions.itemAccess(emptyList, 1)).isNull();
+ }
+
+ @Test
+ void testListWithNullElement() {
+ List<String> listWithNull = new ArrayList<>();
+ listWithNull.add("first");
+ listWithNull.add(null);
+ listWithNull.add("third");
+
+ assertThat(StructFunctions.itemAccess(listWithNull,
1)).isEqualTo("first");
+ assertThat(StructFunctions.itemAccess(listWithNull, 2)).isNull();
+ assertThat(StructFunctions.itemAccess(listWithNull,
3)).isEqualTo("third");
+ }
+ }
+
+ // ========================================
+ // Map Access Tests
+ // ========================================
+ @Nested
+ class MapAccessTests {
+
+ @Test
+ void testNormalAccessWithStringKey() {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("a", 1);
+ map.put("b", 2);
+ map.put("c", 3);
+
+ assertThat(StructFunctions.itemAccess(map, "a")).isEqualTo(1);
+ assertThat(StructFunctions.itemAccess(map, "b")).isEqualTo(2);
+ assertThat(StructFunctions.itemAccess(map, "c")).isEqualTo(3);
+ }
+
+ @Test
+ void testNormalAccessWithIntegerKey() {
+ Map<Integer, String> map = new HashMap<>();
+ map.put(1, "one");
+ map.put(2, "two");
+ map.put(3, "three");
+
+ assertThat(StructFunctions.itemAccess(map, 1)).isEqualTo("one");
+ assertThat(StructFunctions.itemAccess(map, 2)).isEqualTo("two");
+ assertThat(StructFunctions.itemAccess(map, 3)).isEqualTo("three");
+ }
+
+ @Test
+ void testMissingKey() {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("exists", 1);
+
+ assertThat(StructFunctions.itemAccess(map,
"nonexistent")).isNull();
+ }
+
+ @Test
+ void testNullHandling() {
+ Map<String, Integer> map = new HashMap<>();
+ map.put("a", 1);
+
+ // Null map returns null
+ assertThat(StructFunctions.itemAccess((Map<String, Integer>) null,
"a")).isNull();
+ // Null key returns null
+ assertThat(StructFunctions.itemAccess(map, null)).isNull();
+ }
+
+ @Test
+ void testEmptyMap() {
+ Map<String, Integer> emptyMap = Collections.emptyMap();
+
+ assertThat(StructFunctions.itemAccess(emptyMap, "any")).isNull();
+ }
+
+ @Test
+ void testMapWithNullValue() {
+ Map<String, String> mapWithNullValue = new HashMap<>();
+ mapWithNullValue.put("key1", "value1");
+ mapWithNullValue.put("key2", null);
+
+ assertThat(StructFunctions.itemAccess(mapWithNullValue,
"key1")).isEqualTo("value1");
+ assertThat(StructFunctions.itemAccess(mapWithNullValue,
"key2")).isNull();
+ }
+ }
+}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 3f1d02686..a45dbde2a 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -391,6 +391,44 @@ class TransformParserTest {
testFilterExpression("try_parse_json(jsonStr)",
"tryParseJson(jsonStr)");
}
+ @Test
+ public void testTranslateItemAccessToJaninoExpression() {
+ // Test collection access functions (ARRAY, MAP) with proper column
schema
+ List<Column> columns =
+ List.of(
+ Column.physicalColumn("arr",
DataTypes.ARRAY(DataTypes.STRING())),
+ Column.physicalColumn(
+ "m", DataTypes.MAP(DataTypes.STRING(),
DataTypes.INT())),
+ Column.physicalColumn("idx", DataTypes.INT()),
+ Column.physicalColumn("k", DataTypes.STRING()));
+
+ // Array access: array[index] - index is 1-based (SQL standard)
+ // Result type is String (from ARRAY<STRING>), so cast is added
+ testFilterExpressionWithColumns("arr[1]", "(java.lang.String)
itemAccess(arr, 1)", columns);
+ testFilterExpressionWithColumns("arr[2]", "(java.lang.String)
itemAccess(arr, 2)", columns);
+ testFilterExpressionWithColumns(
+ "arr[idx]", "(java.lang.String) itemAccess(arr, idx)",
columns);
+ // Map access: map[key]
+ // Result type is Integer (from MAP<STRING, INT>), so cast is added
+ testFilterExpressionWithColumns(
+ "m['key']", "(java.lang.Integer) itemAccess(m, \"key\")",
columns);
+ testFilterExpressionWithColumns("m[k]", "(java.lang.Integer)
itemAccess(m, k)", columns);
+ // Nested access with comparisons
+ testFilterExpressionWithColumns(
+ "arr[1] = 'value'",
+ "valueEquals((java.lang.String) itemAccess(arr, 1),
\"value\")",
+ columns);
+ testFilterExpressionWithColumns(
+ "m['key'] > 10",
+ "greaterThan((java.lang.Integer) itemAccess(m, \"key\"), 10)",
+ columns);
+
+ List<Column> binaryArrayColumns =
+ List.of(Column.physicalColumn("binArr",
DataTypes.ARRAY(DataTypes.BINARY(16))));
+ testFilterExpressionWithColumns(
+ "binArr[1]", "(byte[]) itemAccess(binArr, 1)",
binaryArrayColumns);
+ }
+
@Test
public void testTranslateFilterToJaninoExpressionError() {
Assertions.assertThatThrownBy(
@@ -818,6 +856,18 @@ class TransformParserTest {
Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
}
+ private void testFilterExpressionWithColumns(
+ String expression, String expressionExpect, List<Column> columns) {
+ String janinoExpression =
+ TransformParser.translateFilterExpressionToJaninoExpression(
+ expression,
+ columns,
+ Collections.emptyList(),
+ new SupportedMetadataColumn[0],
+ Collections.emptyMap());
+ Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
+ }
+
private void testFilterExpressionWithUdf(String expression, String
expressionExpect) {
testFilterExpressionWithUdf(
expression, expressionExpect, DUMMY_COLUMNS,
Collections.emptyMap());