This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f3901fa [FLINK-37012][transform] Fix argument type mismatch when 
metadata column used in function
f7f3901fa is described below

commit f7f3901fa38a0e1074609545703d19e99601aa5a
Author: MOBIN <18814118...@163.com>
AuthorDate: Mon Jan 13 15:23:58 2025 +0800

    [FLINK-37012][transform] Fix argument type mismatch when metadata column 
used in function
    
    This closes #3837
---
 .../transform/ProjectionColumnProcessor.java       |  2 --
 .../transform/PostTransformOperatorTest.java       | 25 +++++++++++++++++++---
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index 292ba416a..83de37926 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -186,9 +186,7 @@ public class ProjectionColumnProcessor {
                     break;
                 }
             }
-        }
 
-        for (String originalColumnName : originalColumnNames) {
             METADATA_COLUMNS.stream()
                     .filter(col -> col.f0.equals(originalColumnName))
                     .findFirst()
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index a7dc61ac4..2de893a09 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -95,6 +95,10 @@ public class PostTransformOperatorTest {
                     .physicalColumn("name", DataTypes.STRING())
                     .physicalColumn("name_upper", DataTypes.STRING())
                     .physicalColumn("tbname", DataTypes.STRING().notNull())
+                    .physicalColumn("tbname_sid", DataTypes.STRING())
+                    .physicalColumn("sid_tbname", DataTypes.STRING())
+                    .physicalColumn("tbname_name", DataTypes.STRING())
+                    .physicalColumn("name_tbname", DataTypes.STRING())
                     .primaryKey("sid")
                     .build();
 
@@ -543,7 +547,9 @@ public class PostTransformOperatorTest {
                 PostTransformOperator.newBuilder()
                         .addTransform(
                                 METADATA_AS_TABLEID.identifier(),
-                                "sid, name, UPPER(name) as name_upper, 
__table_name__ as tbname",
+                                "sid, name, UPPER(name) as name_upper, 
__table_name__ as tbname, "
+                                        + "concat(__table_name__,'_',sid) as 
tbname_sid, concat(sid,'_',__table_name__) as sid_tbname,"
+                                        + "concat(__table_name__,'_',name) as 
tbname_name, concat(name,'_',__table_name__) as name_tbname",
                                 "sid < 3")
                         .build();
         RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -561,7 +567,16 @@ public class PostTransformOperatorTest {
                 DataChangeEvent.insertEvent(
                         METADATA_AS_TABLEID,
                         recordDataGenerator.generate(
-                                new Object[] {1, new BinaryStringData("abc"), 
null, null}));
+                                new Object[] {
+                                    1,
+                                    new BinaryStringData("abc"),
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null
+                                }));
         DataChangeEvent insertEventExpect =
                 DataChangeEvent.insertEvent(
                         METADATA_AS_TABLEID,
@@ -570,7 +585,11 @@ public class PostTransformOperatorTest {
                                     1,
                                     new BinaryStringData("abc"),
                                     new BinaryStringData("ABC"),
-                                    new BinaryStringData("metadata_as_table")
+                                    new BinaryStringData("metadata_as_table"),
+                                    new 
BinaryStringData("metadata_as_table_1"),
+                                    new 
BinaryStringData("1_metadata_as_table"),
+                                    new 
BinaryStringData("metadata_as_table_abc"),
+                                    new 
BinaryStringData("abc_metadata_as_table")
                                 }));
         transform.processElement(new StreamRecord<>(createTableEvent));
         Assertions.assertThat(

Reply via email to