This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new f7f3901fa [FLINK-37012][transform] Fix argument type mismatch when metadata column used in function f7f3901fa is described below commit f7f3901fa38a0e1074609545703d19e99601aa5a Author: MOBIN <18814118...@163.com> AuthorDate: Mon Jan 13 15:23:58 2025 +0800 [FLINK-37012][transform] Fix argument type mismatch when metadata column used in function This closes #3837 --- .../transform/ProjectionColumnProcessor.java | 2 -- .../transform/PostTransformOperatorTest.java | 25 +++++++++++++++++++--- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 292ba416a..83de37926 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -186,9 +186,7 @@ public class ProjectionColumnProcessor { break; } } - } - for (String originalColumnName : originalColumnNames) { METADATA_COLUMNS.stream() .filter(col -> col.f0.equals(originalColumnName)) .findFirst() diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java index a7dc61ac4..2de893a09 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java @@ -95,6 +95,10 @@ public class PostTransformOperatorTest { .physicalColumn("name", DataTypes.STRING()) .physicalColumn("name_upper", DataTypes.STRING()) .physicalColumn("tbname", DataTypes.STRING().notNull()) + .physicalColumn("tbname_sid", DataTypes.STRING()) + .physicalColumn("sid_tbname", DataTypes.STRING()) + .physicalColumn("tbname_name", DataTypes.STRING()) + .physicalColumn("name_tbname", DataTypes.STRING()) .primaryKey("sid") .build(); @@ -543,7 +547,9 @@ public class PostTransformOperatorTest { PostTransformOperator.newBuilder() .addTransform( METADATA_AS_TABLEID.identifier(), - "sid, name, UPPER(name) as name_upper, __table_name__ as tbname", + "sid, name, UPPER(name) as name_upper, __table_name__ as tbname, " + + "concat(__table_name__,'_',sid) as tbname_sid, concat(sid,'_',__table_name__) as sid_tbname," + + "concat(__table_name__,'_',name) as tbname_name, concat(name,'_',__table_name__) as name_tbname", "sid < 3") .build(); RegularEventOperatorTestHarness<PostTransformOperator, Event> @@ -561,7 +567,16 @@ public class PostTransformOperatorTest { DataChangeEvent.insertEvent( METADATA_AS_TABLEID, recordDataGenerator.generate( - new Object[] {1, new BinaryStringData("abc"), null, null})); + new Object[] { + 1, + new BinaryStringData("abc"), + null, + null, + null, + null, + null, + null + })); DataChangeEvent insertEventExpect = DataChangeEvent.insertEvent( METADATA_AS_TABLEID, @@ -570,7 +585,11 @@ public class PostTransformOperatorTest { 1, new BinaryStringData("abc"), new BinaryStringData("ABC"), - new BinaryStringData("metadata_as_table") + new BinaryStringData("metadata_as_table"), + new BinaryStringData("metadata_as_table_1"), + new BinaryStringData("1_metadata_as_table"), + new BinaryStringData("metadata_as_table_abc"), + new BinaryStringData("abc_metadata_as_table") })); transform.processElement(new StreamRecord<>(createTableEvent)); Assertions.assertThat(