This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new f309d311f [FLINK-37429][transform] Map each column name to a new name 
in generated expression
f309d311f is described below

commit f309d311f8f79e0108a0666bf8cae7bd609cd2ef
Author: Shawn Huang <[email protected]>
AuthorDate: Tue Mar 11 12:09:37 2025 +0800

    [FLINK-37429][transform] Map each column name to a new name in generated 
expression
    
    This closes  #3939
    
    Co-authored-by: Leonard Xu <[email protected]>
---
 .../flink/FlinkPipelineTransformITCase.java        |  63 +++++++
 .../values/source/TimestampTypeMetadataColumn.java |  48 ++++++
 .../connectors/values/source/ValuesDataSource.java |   4 +-
 .../values/source/ValuesDataSourceHelper.java      | 131 +++++++++++++-
 .../operators/transform/PostTransformOperator.java |   2 +-
 .../operators/transform/ProjectionColumn.java      |  41 +++--
 .../transform/ProjectionColumnProcessor.java       |  13 +-
 .../transform/TransformExpressionCompiler.java     |   5 +-
 .../transform/TransformExpressionKey.java          |  23 ++-
 .../operators/transform/TransformFilter.java       |  22 ++-
 .../transform/TransformFilterProcessor.java        |  36 ++--
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |  53 ++++--
 .../flink/cdc/runtime/parser/TransformParser.java  |  40 ++++-
 .../transform/PostTransformOperatorTest.java       | 110 ++++++++++++
 .../transform/PreTransformOperatorTest.java        |  47 +++++
 .../TransformOperatorWithSchemaEvolveTest.java     | 129 ++++++++++++++
 .../transform/UnifiedTransformOperatorTest.java    |  34 ++++
 .../cdc/runtime/parser/TransformParserTest.java    | 191 +++++++++++++++++----
 18 files changed, 880 insertions(+), 112 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 4426374b6..e678b3737 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -956,6 +956,69 @@ class FlinkPipelineTransformITCase {
         Arrays.stream(outputEvents).forEach(this::extractDataLines);
     }
 
+    @ParameterizedTest
+    @EnumSource
+    public void testTransformWithColumnNameMap(ValuesDataSink.SinkApi sinkApi) 
throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.COMPLEX_COLUMN_NAME_TABLE);
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup transform
+        TransformDef transformDef =
+                new TransformDef(
+                        "default_namespace.default_schema.table1",
+                        "*, `timestamp-type`",
+                        "`foo-bar` > 0",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        new ArrayList<>(Arrays.asList(transformDef)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`class` STRING NOT NULL,`foo-bar` INT,`bar-foo` 
INT,`timestamp-type` STRING NOT NULL}, primaryKeys=class, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[class1, 1, 10, type1], op=INSERT, meta=({timestamp-type=type1})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[class2, 2, 100, type2], op=INSERT, meta=({timestamp-type=type2})}",
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`import-package` STRING, 
position=AFTER, existedColumnName=bar-foo}]}",
+                        
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, 
nameMapping={bar-foo=bar-baz}}",
+                        
"DropColumnEvent{tableId=default_namespace.default_schema.table1, 
droppedColumnNames=[bar-baz]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, 
before=[class1, 1, , type1], after=[], op=DELETE, 
meta=({timestamp-type=type1})}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, 
before=[class2, 2, , type2], after=[new-class2, 20, new-package2, type2], 
op=UPDATE, meta=({timestamp-type=type2})}");
+    }
+
     void runGenericTransformTest(
             ValuesDataSink.SinkApi sinkApi,
             List<TransformDef> transformDefs,
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
new file mode 100644
index 000000000..c7b086d11
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.values.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for timestamp-type. */
+public class TimestampTypeMetadataColumn implements SupportedMetadataColumn {
+
+    @Override
+    public String getName() {
+        return "timestamp-type";
+    }
+
+    @Override
+    public DataType getType() {
+        return DataTypes.STRING();
+    }
+
+    @Override
+    public Class<?> getJavaClass() {
+        return String.class;
+    }
+
+    @Override
+    public Object read(Map<String, String> metadata) {
+        return metadata.getOrDefault(getName(), null);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
index 60ed244a5..5ba1e336d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
@@ -83,7 +83,9 @@ public class ValuesDataSource implements DataSource {
 
     @Override
     public SupportedMetadataColumn[] supportedMetadataColumns() {
-        return new SupportedMetadataColumn[] {new OpTsMetadataColumn()};
+        return new SupportedMetadataColumn[] {
+            new OpTsMetadataColumn(), new TimestampTypeMetadataColumn()
+        };
     }
 
     /**
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
index af9c1890b..75cba1ad3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
@@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 
 import java.util.ArrayList;
@@ -54,7 +55,8 @@ public class ValuesDataSourceHelper {
         SINGLE_SPLIT_MULTI_TABLES,
         MULTI_SPLITS_SINGLE_TABLE,
         CUSTOM_SOURCE_EVENTS,
-        TRANSFORM_TABLE
+        TRANSFORM_TABLE,
+        COMPLEX_COLUMN_NAME_TABLE
     }
 
     public static final TableId TABLE_1 =
@@ -120,6 +122,11 @@ public class ValuesDataSourceHelper {
                     sourceEvents = transformTable();
                     break;
                 }
+            case COMPLEX_COLUMN_NAME_TABLE:
+                {
+                    sourceEvents = complexColumnNameTable();
+                    break;
+                }
             default:
                 throw new IllegalArgumentException(eventType + " is not 
supported");
         }
@@ -644,4 +651,126 @@ public class ValuesDataSourceHelper {
         eventOfSplits.add(split1);
         return eventOfSplits;
     }
+
+    public static List<List<Event>> complexColumnNameTable() {
+        List<List<Event>> eventOfSplits = new ArrayList<>();
+        List<Event> split1 = new ArrayList<>();
+
+        // create table
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("class", DataTypes.STRING())
+                        .physicalColumn("foo-bar", DataTypes.INT())
+                        .physicalColumn("bar-foo", DataTypes.INT())
+                        .primaryKey("class")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        split1.add(createTableEvent);
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator((RowType) 
schema.toRowDataType());
+        // insert
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("class0"), 0, 
0,
+                                }),
+                        new HashMap<String, String>() {
+                            {
+                                put("timestamp-type", "type0");
+                            }
+                        });
+        split1.add(insertEvent1);
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("class1"), 1, 
10,
+                                }),
+                        new HashMap<String, String>() {
+                            {
+                                put("timestamp-type", "type1");
+                            }
+                        });
+        split1.add(insertEvent2);
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] 
{BinaryStringData.fromString("class2"), 2, 100}),
+                        new HashMap<String, String>() {
+                            {
+                                put("timestamp-type", "type2");
+                            }
+                        });
+        split1.add(insertEvent3);
+
+        // add column
+        AddColumnEvent.ColumnWithPosition columnWithPosition =
+                new AddColumnEvent.ColumnWithPosition(
+                        Column.physicalColumn("import-package", 
DataTypes.STRING()));
+        AddColumnEvent addColumnEvent =
+                new AddColumnEvent(TABLE_1, 
Collections.singletonList(columnWithPosition));
+        split1.add(addColumnEvent);
+        schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
+
+        // rename column
+        Map<String, String> nameMapping = new HashMap<>();
+        nameMapping.put("bar-foo", "bar-baz");
+        RenameColumnEvent renameColumnEvent = new RenameColumnEvent(TABLE_1, 
nameMapping);
+        split1.add(renameColumnEvent);
+        schema = SchemaUtils.applySchemaChangeEvent(schema, renameColumnEvent);
+
+        // drop column
+        DropColumnEvent dropColumnEvent =
+                new DropColumnEvent(TABLE_1, 
Collections.singletonList("bar-baz"));
+        split1.add(dropColumnEvent);
+        schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent);
+
+        generator = new BinaryRecordDataGenerator((RowType) 
schema.toRowDataType());
+
+        // delete
+        split1.add(
+                DataChangeEvent.deleteEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("class1"),
+                                    1,
+                                    BinaryStringData.fromString(""),
+                                }),
+                        new HashMap<String, String>() {
+                            {
+                                put("timestamp-type", "type1");
+                            }
+                        }));
+
+        // update
+        split1.add(
+                DataChangeEvent.updateEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("class2"),
+                                    2,
+                                    BinaryStringData.fromString("")
+                                }),
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("new-class2"),
+                                    20,
+                                    
BinaryStringData.fromString("new-package2"),
+                                }),
+                        new HashMap<String, String>() {
+                            {
+                                put("timestamp-type", "type2");
+                            }
+                        }));
+
+        eventOfSplits.add(split1);
+        return eventOfSplits;
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index c7c69fa6c..a12e97f93 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -409,7 +409,7 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                 Optional<TransformFilter> transformFilterOptional = 
transform.getFilter();
 
                 if (transformFilterOptional.isPresent()
-                        && transformFilterOptional.get().isVaild()) {
+                        && transformFilterOptional.get().isValid()) {
                     TransformFilter transformFilter = 
transformFilterOptional.get();
                     if (!transformFilterProcessorMap.containsKey(
                             Tuple2.of(tableId, transformFilter))) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
index 58f95392b..bd2e19695 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
@@ -24,7 +24,9 @@ import org.apache.flink.cdc.common.utils.StringUtils;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * The ProjectionColumn applies to describe the information of the 
transformation column. If it only
@@ -48,17 +50,19 @@ public class ProjectionColumn implements Serializable {
     private final String expression;
     private final String scriptExpression;
     private final List<String> originalColumnNames;
-    private TransformExpressionKey transformExpressionKey;
+    private final Map<String, String> columnNameMap;
 
     public ProjectionColumn(
             Column column,
             String expression,
             String scriptExpression,
-            List<String> originalColumnNames) {
+            List<String> originalColumnNames,
+            Map<String, String> columnNameMap) {
         this.column = column;
         this.expression = expression;
         this.scriptExpression = scriptExpression;
         this.originalColumnNames = originalColumnNames;
+        this.columnNameMap = columnNameMap;
     }
 
     public ProjectionColumn copy() {
@@ -66,7 +70,8 @@ public class ProjectionColumn implements Serializable {
                 column.copy(column.getName()),
                 expression,
                 scriptExpression,
-                new ArrayList<>(originalColumnNames));
+                new ArrayList<>(originalColumnNames),
+                new HashMap<>(columnNameMap));
     }
 
     public Column getColumn() {
@@ -89,8 +94,8 @@ public class ProjectionColumn implements Serializable {
         return originalColumnNames;
     }
 
-    public void setTransformExpressionKey(TransformExpressionKey 
transformExpressionKey) {
-        this.transformExpressionKey = transformExpressionKey;
+    public Map<String, String> getColumnNameMap() {
+        return columnNameMap;
     }
 
     public boolean isValidTransformedProjectionColumn() {
@@ -102,9 +107,11 @@ public class ProjectionColumn implements Serializable {
      * Just like column {@code id} in {@code id, name AS new_name, age + 1 AS 
new_age}. <br>
      * Comments and default expressions will be intact.
      */
-    public static ProjectionColumn ofForwarded(Column column) {
+    public static ProjectionColumn ofForwarded(Column column, String 
mappedColumnName) {
         String name = column.getName();
-        return new ProjectionColumn(column, name, name, 
Collections.singletonList(name));
+        Map<String, String> columnNameMap = Collections.singletonMap(name, 
mappedColumnName);
+        return new ProjectionColumn(
+                column, name, mappedColumnName, 
Collections.singletonList(name), columnNameMap);
     }
 
     /**
@@ -112,13 +119,17 @@ public class ProjectionColumn implements Serializable {
      * Just like column {@code new_name} in {@code id, name AS new_name, age + 
1 AS new_age}. <br>
      * Comments and default expressions will be intact.
      */
-    public static ProjectionColumn ofAliased(Column column, String newName) {
+    public static ProjectionColumn ofAliased(
+            Column column, String newName, String mappedColumnName) {
         String originalName = column.getName();
+        Map<String, String> columnNameMap =
+                Collections.singletonMap(originalName, mappedColumnName);
         return new ProjectionColumn(
                 column.copy(newName),
                 originalName,
-                originalName,
-                Collections.singletonList(originalName));
+                mappedColumnName,
+                Collections.singletonList(originalName),
+                columnNameMap);
     }
 
     /**
@@ -131,12 +142,14 @@ public class ProjectionColumn implements Serializable {
             DataType dataType,
             String expression,
             String scriptExpression,
-            List<String> originalColumnNames) {
+            List<String> originalColumnNames,
+            Map<String, String> columnNameMap) {
         return new ProjectionColumn(
                 Column.physicalColumn(columnName, dataType),
                 expression,
                 scriptExpression,
-                originalColumnNames);
+                originalColumnNames,
+                columnNameMap);
     }
 
     @Override
@@ -152,8 +165,8 @@ public class ProjectionColumn implements Serializable {
                 + '\''
                 + ", originalColumnNames="
                 + originalColumnNames
-                + ", transformExpressionKey="
-                + transformExpressionKey
+                + ", columnNameMap="
+                + columnNameMap
                 + '}';
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index 83de37926..903a2b9b9 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -99,10 +99,11 @@ public class ProjectionColumnProcessor {
             return expressionEvaluator.evaluate(generateParams(record, 
epochTime, opType, meta));
         } catch (InvocationTargetException e) {
             LOG.error(
-                    "Table:{} column:{} projection:{} execute failed. {}",
+                    "Table:{} column:{} projection:{} column name map:{} 
execute failed. {}",
                     tableInfo.getName(),
                     projectionColumn.getColumnName(),
                     projectionColumn.getScriptExpression(),
+                    projectionColumn.getColumnNameMap(),
                     e);
             throw new RuntimeException(e);
         }
@@ -176,12 +177,13 @@ public class ProjectionColumnProcessor {
         List<Class<?>> paramTypes = new ArrayList<>();
         List<Column> columns = 
tableInfo.getPreTransformedSchema().getColumns();
         String scriptExpression = projectionColumn.getScriptExpression();
+        Map<String, String> columnNameMap = 
projectionColumn.getColumnNameMap();
         LinkedHashSet<String> originalColumnNames =
                 new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
         for (String originalColumnName : originalColumnNames) {
             for (Column column : columns) {
                 if (column.getName().equals(originalColumnName)) {
-                    argumentNames.add(originalColumnName);
+                    argumentNames.add(columnNameMap.get(originalColumnName));
                     
paramTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
                     break;
                 }
@@ -192,7 +194,7 @@ public class ProjectionColumnProcessor {
                     .findFirst()
                     .ifPresent(
                             col -> {
-                                argumentNames.add(col.f0);
+                                argumentNames.add(columnNameMap.get(col.f0));
                                 paramTypes.add(col.f2);
                             });
             Stream.of(supportedMetadataColumns)
@@ -200,7 +202,7 @@ public class ProjectionColumnProcessor {
                     .findFirst()
                     .ifPresent(
                             col -> {
-                                argumentNames.add(col.getName());
+                                
argumentNames.add(columnNameMap.get(col.getName()));
                                 paramTypes.add(col.getJavaClass());
                             });
         }
@@ -214,6 +216,7 @@ public class ProjectionColumnProcessor {
                 JaninoCompiler.loadSystemFunction(scriptExpression),
                 argumentNames,
                 paramTypes,
-                
DataTypeConverter.convertOriginalClass(projectionColumn.getDataType()));
+                
DataTypeConverter.convertOriginalClass(projectionColumn.getDataType()),
+                columnNameMap);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
index 326ae7ba7..06749a29f 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
@@ -74,8 +74,9 @@ public class TransformExpressionCompiler {
                             expressionEvaluator.cook(key.getExpression());
                         } catch (CompileException e) {
                             throw new InvalidProgramException(
-                                    "Expression cannot be compiled. This is a 
bug. Please file an issue.\nExpression: "
-                                            + key.getExpression(),
+                                    String.format(
+                                            "Expression cannot be compiled. 
This is a bug. Please file an issue.\nExpression: %s\nColumn name map: %s",
+                                            key.getExpression(), 
key.getColumnNameMap()),
                                     e);
                         }
                         return expressionEvaluator;
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
index 0cbf1f827..927ab2711 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.operators.transform;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -31,6 +32,8 @@ import java.util.Objects;
  *   <li>argumentNames: a list for the argument names in expression.
  *   <li>argumentClasses: a list for the argument classes in expression.
  *   <li>returnClass: a class for the return class in expression
+ *   <li>columnNameMap: a map whose key is the original column name and value 
is the mapped column
+ *       name
  * </ul>
  */
 public class TransformExpressionKey implements Serializable {
@@ -39,16 +42,19 @@ public class TransformExpressionKey implements Serializable 
{
     private final List<String> argumentNames;
     private final List<Class<?>> argumentClasses;
     private final Class<?> returnClass;
+    private final Map<String, String> columnNameMap;
 
     private TransformExpressionKey(
             String expression,
             List<String> argumentNames,
             List<Class<?>> argumentClasses,
-            Class<?> returnClass) {
+            Class<?> returnClass,
+            Map<String, String> columnNameMap) {
         this.expression = expression;
         this.argumentNames = argumentNames;
         this.argumentClasses = argumentClasses;
         this.returnClass = returnClass;
+        this.columnNameMap = columnNameMap;
     }
 
     public String getExpression() {
@@ -67,12 +73,18 @@ public class TransformExpressionKey implements Serializable 
{
         return returnClass;
     }
 
+    public Map<String, String> getColumnNameMap() {
+        return columnNameMap;
+    }
+
     public static TransformExpressionKey of(
             String expression,
             List<String> argumentNames,
             List<Class<?>> argumentClasses,
-            Class<?> returnClass) {
-        return new TransformExpressionKey(expression, argumentNames, 
argumentClasses, returnClass);
+            Class<?> returnClass,
+            Map<String, String> columnNameMap) {
+        return new TransformExpressionKey(
+                expression, argumentNames, argumentClasses, returnClass, 
columnNameMap);
     }
 
     @Override
@@ -87,11 +99,12 @@ public class TransformExpressionKey implements Serializable 
{
         return expression.equals(that.expression)
                 && argumentNames.equals(that.argumentNames)
                 && argumentClasses.equals(that.argumentClasses)
-                && returnClass.equals(that.returnClass);
+                && returnClass.equals(that.returnClass)
+                && columnNameMap.equals(that.columnNameMap);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(expression, argumentNames, argumentClasses, 
returnClass);
+        return Objects.hash(expression, argumentNames, argumentClasses, 
returnClass, columnNameMap);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
index 4f1aee57e..f12880d41 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.runtime.parser.TransformParser;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -41,11 +42,17 @@ public class TransformFilter implements Serializable {
     private final String expression;
     private final String scriptExpression;
     private final List<String> columnNames;
+    private final Map<String, String> columnNameMap;
 
-    public TransformFilter(String expression, String scriptExpression, 
List<String> columnNames) {
+    public TransformFilter(
+            String expression,
+            String scriptExpression,
+            List<String> columnNames,
+            Map<String, String> columnNameMap) {
         this.expression = expression;
         this.scriptExpression = scriptExpression;
         this.columnNames = columnNames;
+        this.columnNameMap = columnNameMap;
     }
 
     public String getExpression() {
@@ -60,19 +67,26 @@ public class TransformFilter implements Serializable {
         return columnNames;
     }
 
+    public Map<String, String> getColumnNameMap() {
+        return columnNameMap;
+    }
+
     public static Optional<TransformFilter> of(
             String filterExpression, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
         if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
             return Optional.empty();
         }
         List<String> columnNames = 
TransformParser.parseFilterColumnNameList(filterExpression);
+        Map<String, String> columnNameMap = 
TransformParser.generateColumnNameMap(columnNames);
         String scriptExpression =
                 TransformParser.translateFilterExpressionToJaninoExpression(
-                        filterExpression, udfDescriptors);
-        return Optional.of(new TransformFilter(filterExpression, 
scriptExpression, columnNames));
+                        filterExpression, udfDescriptors, columnNameMap);
+        return Optional.of(
+                new TransformFilter(
+                        filterExpression, scriptExpression, columnNames, 
columnNameMap));
     }
 
-    public boolean isVaild() {
+    public boolean isValid() {
         return !columnNames.isEmpty();
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 220f61109..d84209e78 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -96,45 +96,46 @@ public class TransformFilterProcessor {
                     expressionEvaluator.evaluate(generateParams(record, 
epochTime, opType, meta));
         } catch (InvocationTargetException e) {
             LOG.error(
-                    "Table:{} filter:{} execute failed. {}",
+                    "Table:{} filter:{} column name map:{} execute failed. {}",
                     tableInfo.getName(),
                     transformFilter.getExpression(),
+                    transformFilter.getColumnNameMap(),
                     e);
             throw new RuntimeException(e);
         }
     }
 
-    private Tuple2<List<String>, List<Class<?>>> generateArguments() {
+    private Tuple2<List<String>, List<Class<?>>> generateArguments(boolean 
mapColumnNames) {
         List<String> argNames = new ArrayList<>();
         List<Class<?>> argTypes = new ArrayList<>();
-        String scriptExpression = transformFilter.getScriptExpression();
+        String expression = transformFilter.getExpression();
         List<Column> columns = 
tableInfo.getPreTransformedSchema().getColumns();
+        Map<String, String> columnNameMap = transformFilter.getColumnNameMap();
         LinkedHashSet<String> columnNames = new 
LinkedHashSet<>(transformFilter.getColumnNames());
         for (String columnName : columnNames) {
             for (Column column : columns) {
                 if (column.getName().equals(columnName)) {
-                    argNames.add(columnName);
+                    argNames.add(mapColumnNames ? 
columnNameMap.get(columnName) : columnName);
                     
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
                     break;
                 }
             }
         }
 
-        METADATA_COLUMNS.stream()
-                .forEach(
-                        col -> {
-                            if (scriptExpression.contains(col.f0) && 
!argNames.contains(col.f0)) {
-                                argNames.add(col.f0);
-                                argTypes.add(col.f2);
-                            }
-                        });
+        METADATA_COLUMNS.forEach(
+                col -> {
+                    if (expression.contains(col.f0) && 
!argNames.contains(col.f0)) {
+                        argNames.add(mapColumnNames ? 
columnNameMap.get(col.f0) : col.f0);
+                        argTypes.add(col.f2);
+                    }
+                });
 
         supportedMetadataColumns
                 .keySet()
                 .forEach(
                         colName -> {
-                            if (scriptExpression.contains(colName) && 
!argNames.contains(colName)) {
-                                argNames.add(colName);
+                            if (expression.contains(colName) && 
!argNames.contains(colName)) {
+                                argNames.add(mapColumnNames ? 
columnNameMap.get(colName) : colName);
                                 
argTypes.add(supportedMetadataColumns.get(colName).getJavaClass());
                             }
                         });
@@ -147,7 +148,7 @@ public class TransformFilterProcessor {
         List<Column> columns = 
tableInfo.getPreTransformedSchema().getColumns();
 
         // 1 - Add referenced columns
-        Tuple2<List<String>, List<Class<?>>> args = generateArguments();
+        Tuple2<List<String>, List<Class<?>>> args = generateArguments(false);
         RecordData.FieldGetter[] fieldGetters = 
tableInfo.getPreTransformedFieldGetters();
         for (String columnName : args.f0) {
             switch (columnName) {
@@ -191,7 +192,7 @@ public class TransformFilterProcessor {
     }
 
     private TransformExpressionKey generateTransformExpressionKey() {
-        Tuple2<List<String>, List<Class<?>>> args = generateArguments();
+        Tuple2<List<String>, List<Class<?>>> args = generateArguments(true);
 
         args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE);
         args.f1.add(String.class);
@@ -202,6 +203,7 @@ public class TransformFilterProcessor {
                 
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
                 args.f0,
                 args.f1,
-                Boolean.class);
+                Boolean.class,
+                transformFilter.getColumnNameMap());
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index d9f644b0b..c5bcb3139 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -42,6 +42,7 @@ import org.codehaus.janino.Java;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -105,8 +106,11 @@ public class JaninoCompiler {
     }
 
     public static String translateSqlNodeToJaninoExpression(
-            SqlNode transform, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
-        Java.Rvalue rvalue = translateSqlNodeToJaninoRvalue(transform, 
udfDescriptors);
+            SqlNode transform,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            Map<String, String> columnNameMap) {
+        Java.Rvalue rvalue =
+                translateSqlNodeToJaninoRvalue(transform, udfDescriptors, 
columnNameMap);
         if (rvalue != null) {
             return rvalue.toString();
         }
@@ -114,20 +118,23 @@ public class JaninoCompiler {
     }
 
     public static Java.Rvalue translateSqlNodeToJaninoRvalue(
-            SqlNode transform, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
+            SqlNode transform,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            Map<String, String> columnNameMap) {
         if (transform instanceof SqlIdentifier) {
-            return translateSqlIdentifier((SqlIdentifier) transform);
+            return translateSqlIdentifier((SqlIdentifier) transform, 
columnNameMap);
         } else if (transform instanceof SqlBasicCall) {
-            return translateSqlBasicCall((SqlBasicCall) transform, 
udfDescriptors);
+            return translateSqlBasicCall((SqlBasicCall) transform, 
udfDescriptors, columnNameMap);
         } else if (transform instanceof SqlCase) {
-            return translateSqlCase((SqlCase) transform, udfDescriptors);
+            return translateSqlCase((SqlCase) transform, udfDescriptors, 
columnNameMap);
         } else if (transform instanceof SqlLiteral) {
             return translateSqlSqlLiteral((SqlLiteral) transform);
         }
         return null;
     }
 
-    private static Java.Rvalue translateSqlIdentifier(SqlIdentifier 
sqlIdentifier) {
+    private static Java.Rvalue translateSqlIdentifier(
+            SqlIdentifier sqlIdentifier, Map<String, String> columnNameMap) {
         String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() 
- 1);
         if 
(TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
             return generateTimezoneFreeTemporalFunctionOperation(columnName);
@@ -139,7 +146,9 @@ public class JaninoCompiler {
                 columnName.toUpperCase())) {
             return 
generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
         } else {
-            return new Java.AmbiguousName(Location.NOWHERE, new String[] 
{columnName});
+            return new Java.AmbiguousName(
+                    Location.NOWHERE,
+                    new String[] {columnNameMap.getOrDefault(columnName, 
columnName)});
         }
     }
 
@@ -166,11 +175,13 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue translateSqlBasicCall(
-            SqlBasicCall sqlBasicCall, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
+            SqlBasicCall sqlBasicCall,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            Map<String, String> columnNameMap) {
         List<SqlNode> operandList = sqlBasicCall.getOperandList();
         List<Java.Rvalue> atoms = new ArrayList<>();
         for (SqlNode sqlNode : operandList) {
-            translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors);
+            translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors, 
columnNameMap);
         }
         if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(
                 sqlBasicCall.getOperator().getName().toUpperCase())) {
@@ -188,19 +199,22 @@ public class JaninoCompiler {
     }
 
     private static Java.Rvalue translateSqlCase(
-            SqlCase sqlCase, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
+            SqlCase sqlCase,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            Map<String, String> columnNameMap) {
         SqlNodeList whenOperands = sqlCase.getWhenOperands();
         SqlNodeList thenOperands = sqlCase.getThenOperands();
         SqlNode elseOperand = sqlCase.getElseOperand();
         List<Java.Rvalue> whenAtoms = new ArrayList<>();
         for (SqlNode sqlNode : whenOperands) {
-            translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors);
+            translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors, 
columnNameMap);
         }
         List<Java.Rvalue> thenAtoms = new ArrayList<>();
         for (SqlNode sqlNode : thenOperands) {
-            translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors);
+            translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors, 
columnNameMap);
         }
-        Java.Rvalue elseAtoms = translateSqlNodeToJaninoRvalue(elseOperand, 
udfDescriptors);
+        Java.Rvalue elseAtoms =
+                translateSqlNodeToJaninoRvalue(elseOperand, udfDescriptors, 
columnNameMap);
         Java.Rvalue sqlCaseRvalueTemp = elseAtoms;
         for (int i = whenAtoms.size() - 1; i >= 0; i--) {
             sqlCaseRvalueTemp =
@@ -216,19 +230,20 @@ public class JaninoCompiler {
     private static void translateSqlNodeToAtoms(
             SqlNode sqlNode,
             List<Java.Rvalue> atoms,
-            List<UserDefinedFunctionDescriptor> udfDescriptors) {
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            Map<String, String> columnNameMap) {
         if (sqlNode instanceof SqlIdentifier) {
-            atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode));
+            atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode, 
columnNameMap));
         } else if (sqlNode instanceof SqlLiteral) {
             atoms.add(translateSqlSqlLiteral((SqlLiteral) sqlNode));
         } else if (sqlNode instanceof SqlBasicCall) {
-            atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode, 
udfDescriptors));
+            atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode, 
udfDescriptors, columnNameMap));
         } else if (sqlNode instanceof SqlNodeList) {
             for (SqlNode node : (SqlNodeList) sqlNode) {
-                translateSqlNodeToAtoms(node, atoms, udfDescriptors);
+                translateSqlNodeToAtoms(node, atoms, udfDescriptors, 
columnNameMap);
             }
         } else if (sqlNode instanceof SqlCase) {
-            atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors));
+            atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors, 
columnNameMap));
         }
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index ee29bbf2c..1fc3101ae 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -95,6 +95,8 @@ public class TransformParser {
     private static final Logger LOG = 
LoggerFactory.getLogger(TransformParser.class);
     private static final String DEFAULT_SCHEMA = "default_schema";
     private static final String DEFAULT_TABLE = "TB";
+    private static final String MAPPED_COLUMN_NAME_PREFIX = "$";
+    private static final String MAPPED_SINGLE_COLUMN_NAME = 
MAPPED_COLUMN_NAME_PREFIX + "0";
 
     private static SqlParser getCalciteParser(String sql) {
         return SqlParser.create(
@@ -334,6 +336,8 @@ public class TransformParser {
                                     columnName,
                                     supportedMetadataColumns);
                 } else {
+                    List<String> originalColumnNames = 
parseColumnNameList(exprNode);
+                    Map<String, String> columnNameMap = 
generateColumnNameMap(originalColumnNames);
                     projectionColumn =
                             ProjectionColumn.ofCalculated(
                                     columnName,
@@ -341,8 +345,9 @@ public class TransformParser {
                                             relDataTypeMap.get(columnName)),
                                     exprNode.toString(),
                                     
JaninoCompiler.translateSqlNodeToJaninoExpression(
-                                            exprNode, udfDescriptors),
-                                    parseColumnNameList(exprNode));
+                                            exprNode, udfDescriptors, 
columnNameMap),
+                                    originalColumnNames,
+                                    columnNameMap);
                 }
             }
             // ... or an existing column's name identifier.
@@ -384,6 +389,8 @@ public class TransformParser {
             String identifier,
             String projectedColumnName,
             SupportedMetadataColumn[] supportedMetadataColumns) {
+        Map<String, String> columnNameMap =
+                Collections.singletonMap(identifier, 
MAPPED_SINGLE_COLUMN_NAME);
         if (isMetadataColumn(identifier, supportedMetadataColumns)) {
             // For a metadata column, we simply generate a projection column 
with the same
             return ProjectionColumn.ofCalculated(
@@ -393,8 +400,9 @@ public class TransformParser {
                                     relDataTypeMap.get(projectedColumnName))
                             .notNull(),
                     identifier,
-                    identifier,
-                    Collections.singletonList(identifier));
+                    columnNameMap.get(identifier),
+                    Collections.singletonList(identifier),
+                    columnNameMap);
         }
 
         Preconditions.checkArgument(
@@ -404,14 +412,17 @@ public class TransformParser {
 
         Column column = originalColumnMap.get(identifier);
         if (Objects.equals(identifier, projectedColumnName)) {
-            return ProjectionColumn.ofForwarded(column);
+            return ProjectionColumn.ofForwarded(column, 
MAPPED_SINGLE_COLUMN_NAME);
         } else {
-            return ProjectionColumn.ofAliased(column, projectedColumnName);
+            return ProjectionColumn.ofAliased(
+                    column, projectedColumnName, MAPPED_SINGLE_COLUMN_NAME);
         }
     }
 
     public static String translateFilterExpressionToJaninoExpression(
-            String filterExpression, List<UserDefinedFunctionDescriptor> 
udfDescriptors) {
+            String filterExpression,
+            List<UserDefinedFunctionDescriptor> udfDescriptors,
+            Map<String, String> columnNameMap) {
         if (isNullOrWhitespaceOnly(filterExpression)) {
             return "";
         }
@@ -420,7 +431,8 @@ public class TransformParser {
             return "";
         }
         SqlNode where = sqlSelect.getWhere();
-        return JaninoCompiler.translateSqlNodeToJaninoExpression(where, 
udfDescriptors);
+        return JaninoCompiler.translateSqlNodeToJaninoExpression(
+                where, udfDescriptors, columnNameMap);
     }
 
     public static List<String> parseComputedColumnNames(
@@ -642,4 +654,16 @@ public class TransformParser {
             return false;
         }
     }
+
+    public static Map<String, String> generateColumnNameMap(List<String> 
originalColumnNames) {
+        int i = 0;
+        Map<String, String> columnNameMap = new HashMap<>();
+        for (String columnName : originalColumnNames) {
+            if (!columnNameMap.containsKey(columnName)) {
+                columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i);
+                i++;
+            }
+        }
+        return columnNameMap;
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 07ae5369b..cf84a8214 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -362,6 +362,20 @@ public class PostTransformOperatorTest {
                     .primaryKey("id")
                     .build();
 
+    private static final TableId COL_NAME_MAPPING_TABLEID =
+            TableId.tableId("my_company", "my_branch", 
"col_name_mapping_table");
+    private static final Schema COL_NAME_MAPPING_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("foo", DataTypes.INT())
+                    .physicalColumn("bar", DataTypes.INT())
+                    .physicalColumn("foo-bar", DataTypes.INT())
+                    .physicalColumn("bar-foo", DataTypes.INT())
+                    .physicalColumn("class", DataTypes.STRING())
+                    .physicalColumn("f0", DataTypes.INT())
+                    .physicalColumn("f1", DataTypes.INT())
+                    .physicalColumn("f2", DataTypes.INT())
+                    .build();
+
     @Test
     void testDataChangeEventTransform() throws Exception {
         PostTransformOperator transform =
@@ -3249,4 +3263,100 @@ public class PostTransformOperatorTest {
                 .isEqualTo(new StreamRecord<>(updateEventExpect));
         transformFunctionEventEventOperatorTestHarness.close();
     }
+
+    @Test
+    void testColumnNameMapping() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COL_NAME_MAPPING_TABLEID.identifier(),
+                                "*, class, foo-bar AS f0, bar-foo AS f1, 
`foo-bar`-`bar-foo` AS f2",
+                                "`foo-bar`-`bar-foo` <> 0")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(COL_NAME_MAPPING_TABLEID, 
COL_NAME_MAPPING_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COL_NAME_MAPPING_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        COL_NAME_MAPPING_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    1,
+                                    2,
+                                    3,
+                                    4,
+                                    BinaryStringData.fromString("class0"),
+                                    null,
+                                    null,
+                                    null
+                                }));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        COL_NAME_MAPPING_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    1, 2, 3, 4, 
BinaryStringData.fromString("class0"), -1, 1, -1
+                                }));
+        // Update
+        DataChangeEvent updateEvent =
+                DataChangeEvent.updateEvent(
+                        COL_NAME_MAPPING_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    1,
+                                    2,
+                                    3,
+                                    4,
+                                    BinaryStringData.fromString("class0"),
+                                    null,
+                                    null,
+                                    null
+                                }),
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    2,
+                                    4,
+                                    6,
+                                    8,
+                                    BinaryStringData.fromString("class1"),
+                                    null,
+                                    null,
+                                    null
+                                }));
+        DataChangeEvent updateEventExpect =
+                DataChangeEvent.updateEvent(
+                        COL_NAME_MAPPING_TABLEID,
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    1, 2, 3, 4, 
BinaryStringData.fromString("class0"), -1, 1, -1
+                                }),
+                        recordDataGenerator.generate(
+                                new Object[] {
+                                    2, 4, 6, 8, 
BinaryStringData.fromString("class1"), -2, 2, -2
+                                }));
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        COL_NAME_MAPPING_TABLEID, 
COL_NAME_MAPPING_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect));
+        transform.processElement(new StreamRecord<>(updateEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(updateEventExpect));
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
index 441d6f86c..763dee7c4 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
@@ -182,6 +182,24 @@ public class PreTransformOperatorTest {
                     .primaryKey("id")
                     .build();
 
+    private static final Schema COL_NAME_MAPPING_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("foo", DataTypes.INT())
+                    .physicalColumn("bar", DataTypes.INT())
+                    .physicalColumn("foo-bar", DataTypes.INT())
+                    .physicalColumn("bar-foo", DataTypes.INT())
+                    .physicalColumn("class", DataTypes.INT())
+                    .build();
+
+    private static final Schema EXPECTED_COL_NAME_MAPPING_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("foo", DataTypes.INT())
+                    .physicalColumn("bar", DataTypes.INT())
+                    .physicalColumn("foo-bar", DataTypes.INT())
+                    .physicalColumn("bar-foo", DataTypes.INT())
+                    .physicalColumn("class", DataTypes.INT())
+                    .build();
+
     @Test
     void testEventTransform() throws Exception {
         PreTransformOperator transform =
@@ -681,4 +699,33 @@ public class PreTransformOperatorTest {
                                 new CreateTableEvent(CUSTOMERS_TABLEID, 
MULTITRANSFORM_SCHEMA)));
         transformFunctionEventEventOperatorTestHarness.close();
     }
+
+    @Test
+    void testColumnNameMapping() throws Exception {
+        PreTransformOperator transform =
+                PreTransformOperator.newBuilder()
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(),
+                                "foo, `foo-bar`, foo-bar AS f0, `bar-foo` AS 
f1, class",
+                                " `foo-bar` > 1 and foo-bar > 1 and class > 1")
+                        .build();
+
+        RegularEventOperatorTestHarness<PreTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(CUSTOMERS_TABLEID, 
COL_NAME_MAPPING_SCHEMA);
+        transform.processElement(new StreamRecord<>(createTableEvent));
+
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        CUSTOMERS_TABLEID, 
EXPECTED_COL_NAME_MAPPING_SCHEMA)));
+        transformFunctionEventEventOperatorTestHarness.close();
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
index 50521e2a5..3c0af3d39 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
@@ -29,7 +30,10 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
 import 
org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -72,6 +76,10 @@ public class TransformOperatorWithSchemaEvolveTest {
         private PreTransformOperator preTransformOperator;
         private PostTransformOperator postTransformOperator;
 
+        private BinaryRecordDataGenerator sourceRecordGenerator;
+        private BinaryRecordDataGenerator preTransformedRecordGenerator;
+        private BinaryRecordDataGenerator postTransformedRecordGenerator;
+
         private RegularEventOperatorTestHarness<PreTransformOperator, Event>
                 preTransformOperatorHarness;
         private RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -101,6 +109,16 @@ public class TransformOperatorWithSchemaEvolveTest {
 
         public TransformWithSchemaEvolveTestCase 
evolveFromSource(SchemaChangeEvent event) {
             sourceEvents.add(event);
+            sourceSchema = SchemaUtils.applySchemaChangeEvent(sourceSchema, 
event);
+            sourceRecordGenerator =
+                    new BinaryRecordDataGenerator((RowType) 
sourceSchema.toRowDataType());
+            return this;
+        }
+
+        public TransformWithSchemaEvolveTestCase insertSource(Object... 
record) {
+            sourceEvents.add(
+                    DataChangeEvent.insertEvent(
+                            tableId, 
sourceRecordGenerator.generate(stringify(record))));
             return this;
         }
 
@@ -111,6 +129,16 @@ public class TransformOperatorWithSchemaEvolveTest {
 
         public TransformWithSchemaEvolveTestCase 
expectInPreTransformed(SchemaChangeEvent event) {
             preTransformedEvents.add(event);
+            preTransformedSchema = 
SchemaUtils.applySchemaChangeEvent(preTransformedSchema, event);
+            preTransformedRecordGenerator =
+                    new BinaryRecordDataGenerator((RowType) 
preTransformedSchema.toRowDataType());
+            return this;
+        }
+
+        public TransformWithSchemaEvolveTestCase 
expectInPreTransformed(Object... record) {
+            preTransformedEvents.add(
+                    DataChangeEvent.insertEvent(
+                            tableId, 
preTransformedRecordGenerator.generate(stringify(record))));
             return this;
         }
 
@@ -121,6 +149,17 @@ public class TransformOperatorWithSchemaEvolveTest {
 
         public TransformWithSchemaEvolveTestCase 
expectInPostTransformed(SchemaChangeEvent event) {
             postTransformedEvents.add(event);
+            postTransformedSchema =
+                    SchemaUtils.applySchemaChangeEvent(postTransformedSchema, 
event);
+            postTransformedRecordGenerator =
+                    new BinaryRecordDataGenerator((RowType) 
postTransformedSchema.toRowDataType());
+            return this;
+        }
+
+        public TransformWithSchemaEvolveTestCase 
expectInPostTransformed(Object... event) {
+            postTransformedEvents.add(
+                    DataChangeEvent.insertEvent(
+                            tableId, 
postTransformedRecordGenerator.generate(stringify(event))));
             return this;
         }
 
@@ -139,6 +178,13 @@ public class TransformOperatorWithSchemaEvolveTest {
             this.preTransformedSchema = preTransformedSchema;
             this.postTransformedSchema = postTransformedSchema;
 
+            this.sourceRecordGenerator =
+                    new BinaryRecordDataGenerator((RowType) 
sourceSchema.toRowDataType());
+            this.preTransformedRecordGenerator =
+                    new BinaryRecordDataGenerator((RowType) 
preTransformedSchema.toRowDataType());
+            this.postTransformedRecordGenerator =
+                    new BinaryRecordDataGenerator((RowType) 
postTransformedSchema.toRowDataType());
+
             this.sourceEvents = new ArrayList<>();
             this.preTransformedEvents = new ArrayList<>();
             this.postTransformedEvents = new ArrayList<>();
@@ -793,4 +839,87 @@ public class TransformOperatorWithSchemaEvolveTest {
                                                 "name"))))
                 .runTests("inserting columns at last");
     }
+
+    /** This case tests column name map when schema evolution happens. */
+    @Test
+    public void testSchemaChangeWithColumnNameMap() throws Exception {
+        TableId tableId = TableId.tableId("my_company", "my_branch", 
"data_changes");
+        TransformWithSchemaEvolveTestCase.of(
+                        tableId,
+                        "*, foo-bar as computed",
+                        "class <> 'class0'",
+                        Schema.newBuilder()
+                                .physicalColumn("foo", DataTypes.INT())
+                                .physicalColumn("bar", DataTypes.INT())
+                                .physicalColumn("foo-bar", DataTypes.INT())
+                                .physicalColumn("class", DataTypes.STRING())
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("foo", DataTypes.INT())
+                                .physicalColumn("bar", DataTypes.INT())
+                                .physicalColumn("foo-bar", DataTypes.INT())
+                                .physicalColumn("class", DataTypes.STRING())
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("foo", DataTypes.INT())
+                                .physicalColumn("bar", DataTypes.INT())
+                                .physicalColumn("foo-bar", DataTypes.INT())
+                                .physicalColumn("class", DataTypes.STRING())
+                                .physicalColumn("computed", DataTypes.INT())
+                                .build())
+                .initializeHarness()
+                .runTests("initializing table")
+                .insertSource(0, 0, 0, "class0")
+                .expectInPreTransformed(0, 0, 0, "class0")
+                .expectNothingInPostTransformed()
+                .insertSource(1, 2, 3, "class1")
+                .expectInPreTransformed(1, 2, 3, "class1")
+                .expectInPostTransformed(1, 2, 3, "class1", -1)
+                .evolveFromSource(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("bar-foo", DataTypes.INT()),
+                                                
AddColumnEvent.ColumnPosition.FIRST,
+                                                null))))
+                .expectInPreTransformed(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("bar-foo", DataTypes.INT()),
+                                                
AddColumnEvent.ColumnPosition.BEFORE,
+                                                "foo"))))
+                .expectInPostTransformed(
+                        new AddColumnEvent(
+                                tableId,
+                                Collections.singletonList(
+                                        new AddColumnEvent.ColumnWithPosition(
+                                                
Column.physicalColumn("bar-foo", DataTypes.INT()),
+                                                
AddColumnEvent.ColumnPosition.BEFORE,
+                                                "foo"))))
+                .insertSource(10, 2, 4, 6, "class2")
+                .expectInPreTransformed(10, 2, 4, 6, "class2")
+                .expectInPostTransformed(10, 2, 4, 6, "class2", -2)
+                .insertSource(20, 2, 4, 6, "class0")
+                .expectInPreTransformed(20, 2, 4, 6, "class0")
+                .expectNothingInPostTransformed()
+                .evolveFromSource(
+                        new RenameColumnEvent(
+                                tableId, Collections.singletonMap("bar-foo", 
"package")))
+                .expectInPreTransformed(
+                        new RenameColumnEvent(
+                                tableId, Collections.singletonMap("bar-foo", 
"package")))
+                .expectInPostTransformed(
+                        new RenameColumnEvent(
+                                tableId, Collections.singletonMap("bar-foo", 
"package")))
+                .insertSource(30, 3, 6, 9, "class3")
+                .expectInPreTransformed(30, 3, 6, 9, "class3")
+                .expectInPostTransformed(30, 3, 6, 9, "class3", -3)
+                .insertSource(40, 3, 6, 9, "class0")
+                .expectInPreTransformed(40, 3, 6, 9, "class0")
+                .expectNothingInPostTransformed()
+                .runTests("schema evolution with mapped column names");
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index 59abfa06a..04d64b6c1 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -1164,4 +1164,38 @@ public class UnifiedTransformOperatorTest {
                 .runTests()
                 .destroyHarness();
     }
+
+    @Test
+    public void testTransformWithColumnNameMap() throws Exception {
+        TableId tableId = TableId.tableId("my_company", "my_branch", 
"column_name_map");
+        UnifiedTransformTestCase.of(
+                        tableId,
+                        "foo-bar AS f0, `foo-bar`, foo-bar-`foo-bar` AS f1, 
class",
+                        "foo-bar <> 0",
+                        Schema.newBuilder()
+                                .physicalColumn("foo", DataTypes.INT())
+                                .physicalColumn("bar", DataTypes.INT())
+                                .physicalColumn("foo-bar", DataTypes.INT())
+                                .physicalColumn("bar-foo", DataTypes.INT())
+                                .physicalColumn("class", DataTypes.STRING())
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("foo", DataTypes.INT())
+                                .physicalColumn("bar", DataTypes.INT())
+                                .physicalColumn("foo-bar", DataTypes.INT())
+                                .physicalColumn("class", DataTypes.STRING())
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("f0", DataTypes.INT())
+                                .physicalColumn("foo-bar", DataTypes.INT())
+                                .physicalColumn("f1", DataTypes.INT())
+                                .physicalColumn("class", DataTypes.STRING())
+                                .build())
+                .initializeHarness()
+                .insertSource(1, 2, 3, 4, "class")
+                .insertPreTransformed(1, 2, 3, "class")
+                .insertPostTransformed(-1, 3, -4, "class")
+                .runTests()
+                .destroyHarness();
+    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 9f6e0ecdf..47b99e92d 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.io.ParseException;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
 import 
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
@@ -422,14 +423,18 @@ public class TransformParserTest {
         Assertions.assertThatThrownBy(
                         () -> {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
-                                    "TIMESTAMPDIFF(SECONDS, dt1, dt2)", 
Collections.emptyList());
+                                    "TIMESTAMPDIFF(SECONDS, dt1, dt2)",
+                                    Collections.emptyList(),
+                                    Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
                 .hasMessage("Statements can not be parsed.");
         Assertions.assertThatThrownBy(
                         () -> {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
-                                    "TIMESTAMPDIFF(QUARTER, dt1, dt2)", 
Collections.emptyList());
+                                    "TIMESTAMPDIFF(QUARTER, dt1, dt2)",
+                                    Collections.emptyList(),
+                                    Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
                 .hasMessage(
@@ -437,14 +442,18 @@ public class TransformParserTest {
         Assertions.assertThatThrownBy(
                         () -> {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
-                                    "TIMESTAMPADD(SECONDS, dt1, dt2)", 
Collections.emptyList());
+                                    "TIMESTAMPADD(SECONDS, dt1, dt2)",
+                                    Collections.emptyList(),
+                                    Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
                 .hasMessage("Statements can not be parsed.");
         Assertions.assertThatThrownBy(
                         () -> {
                             
TransformParser.translateFilterExpressionToJaninoExpression(
-                                    "TIMESTAMPADD(QUARTER, dt1, dt2)", 
Collections.emptyList());
+                                    "TIMESTAMPADD(QUARTER, dt1, dt2)",
+                                    Collections.emptyList(),
+                                    Collections.emptyMap());
                         })
                 .isExactlyInstanceOf(ParseException.class)
                 .hasMessage(
@@ -474,13 +483,13 @@ public class TransformParserTest {
 
         List<String> expected =
                 Arrays.asList(
-                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='id', originalColumnNames=[id], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`name` STRING, 
expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', 
originalColumnNames=[name], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`newage` INT, 
expression='`TB`.`age` + 1', scriptExpression='age + 1', 
originalColumnNames=[age], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`newCreateTime` TIMESTAMP(3) 
'newCreateTime', expression='createTime', scriptExpression='createTime', 
originalColumnNames=[createTime], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`newAddress` VARCHAR(50) 
'newAddress', expression='address', scriptExpression='address', 
originalColumnNames=[address], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`deposits` DECIMAL(10, 2) 
'deposit', expression='deposit', scriptExpression='deposit', 
originalColumnNames=[deposit], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`bmi` DOUBLE, 
expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)', 
scriptExpression='weight / height * height', originalColumnNames=[weight, 
height, height], transformExpressionKey=null}");
+                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='$0', originalColumnNames=[id], 
columnNameMap={id=$0}}",
+                        "ProjectionColumn{column=`name` STRING, 
expression='UPPER(`TB`.`name`)', scriptExpression='upper($0)', 
originalColumnNames=[name], columnNameMap={name=$0}}",
+                        "ProjectionColumn{column=`newage` INT, 
expression='`TB`.`age` + 1', scriptExpression='$0 + 1', 
originalColumnNames=[age], columnNameMap={age=$0}}",
+                        "ProjectionColumn{column=`newCreateTime` TIMESTAMP(3) 
'newCreateTime', expression='createTime', scriptExpression='$0', 
originalColumnNames=[createTime], columnNameMap={createTime=$0}}",
+                        "ProjectionColumn{column=`newAddress` VARCHAR(50) 
'newAddress', expression='address', scriptExpression='$0', 
originalColumnNames=[address], columnNameMap={address=$0}}",
+                        "ProjectionColumn{column=`deposits` DECIMAL(10, 2) 
'deposit', expression='deposit', scriptExpression='$0', 
originalColumnNames=[deposit], columnNameMap={deposit=$0}}",
+                        "ProjectionColumn{column=`bmi` DOUBLE, 
expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)', 
scriptExpression='$0 / $1 * $1', originalColumnNames=[weight, height, height], 
columnNameMap={weight=$0, height=$1}}");
         Assertions.assertThat(result).hasToString("[" + String.join(", ", 
expected) + "]");
 
         List<ProjectionColumn> metadataResult =
@@ -492,17 +501,17 @@ public class TransformParserTest {
 
         List<String> metadataExpected =
                 Arrays.asList(
-                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='id', originalColumnNames=[id], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`name` STRING 'name', 
expression='name', scriptExpression='name', originalColumnNames=[name], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`age` INT 'age', 
expression='age', scriptExpression='age', originalColumnNames=[age], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`createTime` TIMESTAMP(3) 
'newCreateTime', expression='createTime', scriptExpression='createTime', 
originalColumnNames=[createTime], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`address` VARCHAR(50) 
'newAddress', expression='address', scriptExpression='address', 
originalColumnNames=[address], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`deposit` DECIMAL(10, 2) 
'deposit', expression='deposit', scriptExpression='deposit', 
originalColumnNames=[deposit], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`weight` DOUBLE 'weight', 
expression='weight', scriptExpression='weight', originalColumnNames=[weight], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`height` DOUBLE 'height', 
expression='height', scriptExpression='height', originalColumnNames=[height], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`__namespace_name__` STRING 
NOT NULL, expression='__namespace_name__', 
scriptExpression='__namespace_name__', 
originalColumnNames=[__namespace_name__], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`__schema_name__` STRING NOT 
NULL, expression='__schema_name__', scriptExpression='__schema_name__', 
originalColumnNames=[__schema_name__], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`__table_name__` STRING NOT 
NULL, expression='__table_name__', scriptExpression='__table_name__', 
originalColumnNames=[__table_name__], transformExpressionKey=null}");
+                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='$0', originalColumnNames=[id], 
columnNameMap={id=$0}}",
+                        "ProjectionColumn{column=`name` STRING 'name', 
expression='name', scriptExpression='$0', originalColumnNames=[name], 
columnNameMap={name=$0}}",
+                        "ProjectionColumn{column=`age` INT 'age', 
expression='age', scriptExpression='$0', originalColumnNames=[age], 
columnNameMap={age=$0}}",
+                        "ProjectionColumn{column=`createTime` TIMESTAMP(3) 
'newCreateTime', expression='createTime', scriptExpression='$0', 
originalColumnNames=[createTime], columnNameMap={createTime=$0}}",
+                        "ProjectionColumn{column=`address` VARCHAR(50) 
'newAddress', expression='address', scriptExpression='$0', 
originalColumnNames=[address], columnNameMap={address=$0}}",
+                        "ProjectionColumn{column=`deposit` DECIMAL(10, 2) 
'deposit', expression='deposit', scriptExpression='$0', 
originalColumnNames=[deposit], columnNameMap={deposit=$0}}",
+                        "ProjectionColumn{column=`weight` DOUBLE 'weight', 
expression='weight', scriptExpression='$0', originalColumnNames=[weight], 
columnNameMap={weight=$0}}",
+                        "ProjectionColumn{column=`height` DOUBLE 'height', 
expression='height', scriptExpression='$0', originalColumnNames=[height], 
columnNameMap={height=$0}}",
+                        "ProjectionColumn{column=`__namespace_name__` STRING 
NOT NULL, expression='__namespace_name__', scriptExpression='$0', 
originalColumnNames=[__namespace_name__], 
columnNameMap={__namespace_name__=$0}}",
+                        "ProjectionColumn{column=`__schema_name__` STRING NOT 
NULL, expression='__schema_name__', scriptExpression='$0', 
originalColumnNames=[__schema_name__], columnNameMap={__schema_name__=$0}}",
+                        "ProjectionColumn{column=`__table_name__` STRING NOT 
NULL, expression='__table_name__', scriptExpression='$0', 
originalColumnNames=[__table_name__], columnNameMap={__table_name__=$0}}");
         Assertions.assertThat(metadataResult)
                 .map(ProjectionColumn::toString)
                 .containsExactlyElementsOf(metadataExpected);
@@ -544,15 +553,15 @@ public class TransformParserTest {
 
         List<String> expected =
                 Arrays.asList(
-                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='id', originalColumnNames=[id], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`name2` STRING, 
expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)', 
originalColumnNames=[name], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`sex2` STRING, 
expression='UPPER(`TB`.`sex`)', scriptExpression='upper(sex)', 
originalColumnNames=[sex], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`address2` BINARY(50), 
expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE 
`TB`.`address` END', scriptExpression='(null != address ? address : address)', 
originalColumnNames=[address, address, address], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`phone2` VARBINARY(50), 
expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE 
`TB`.`phone` END', scriptExpression='(null != phone ? phone : phone)', 
originalColumnNames=[phone, phone, phone], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`deposit2` DECIMAL(10, 2), 
expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE 
`TB`.`deposit` END', scriptExpression='(null != deposit ? deposit : deposit)', 
originalColumnNames=[deposit, deposit, deposit], transformExpressionKey=null}",
-                        "ProjectionColumn{column=`birthday2` TIMESTAMP(3), 
expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE 
`TB`.`birthday` END', scriptExpression='(null != birthday ? birthday : 
birthday)', originalColumnNames=[birthday, birthday, birthday], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`birthday_ltz2` 
TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN 
`TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null != 
birthday_ltz ? birthday_ltz : birthday_ltz)', 
originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz], 
transformExpressionKey=null}",
-                        "ProjectionColumn{column=`update_time2` TIME(3), 
expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time` 
ELSE `TB`.`update_time` END', scriptExpression='(null != update_time ? 
update_time : update_time)', originalColumnNames=[update_time, update_time, 
update_time], transformExpressionKey=null}");
+                        "ProjectionColumn{column=`id` INT 'id', 
expression='id', scriptExpression='$0', originalColumnNames=[id], 
columnNameMap={id=$0}}",
+                        "ProjectionColumn{column=`name2` STRING, 
expression='UPPER(`TB`.`name`)', scriptExpression='upper($0)', 
originalColumnNames=[name], columnNameMap={name=$0}}",
+                        "ProjectionColumn{column=`sex2` STRING, 
expression='UPPER(`TB`.`sex`)', scriptExpression='upper($0)', 
originalColumnNames=[sex], columnNameMap={sex=$0}}",
+                        "ProjectionColumn{column=`address2` BINARY(50), 
expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE 
`TB`.`address` END', scriptExpression='(null != $0 ? $0 : $0)', 
originalColumnNames=[address, address, address], columnNameMap={address=$0}}",
+                        "ProjectionColumn{column=`phone2` VARBINARY(50), 
expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE 
`TB`.`phone` END', scriptExpression='(null != $0 ? $0 : $0)', 
originalColumnNames=[phone, phone, phone], columnNameMap={phone=$0}}",
+                        "ProjectionColumn{column=`deposit2` DECIMAL(10, 2), 
expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE 
`TB`.`deposit` END', scriptExpression='(null != $0 ? $0 : $0)', 
originalColumnNames=[deposit, deposit, deposit], columnNameMap={deposit=$0}}",
+                        "ProjectionColumn{column=`birthday2` TIMESTAMP(3), 
expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE 
`TB`.`birthday` END', scriptExpression='(null != $0 ? $0 : $0)', 
originalColumnNames=[birthday, birthday, birthday], 
columnNameMap={birthday=$0}}",
+                        "ProjectionColumn{column=`birthday_ltz2` 
TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN 
`TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null != 
$0 ? $0 : $0)', originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz], 
columnNameMap={birthday_ltz=$0}}",
+                        "ProjectionColumn{column=`update_time2` TIME(3), 
expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time` 
ELSE `TB`.`update_time` END', scriptExpression='(null != $0 ? $0 : $0)', 
originalColumnNames=[update_time, update_time, update_time], 
columnNameMap={update_time=$0}}");
         Assertions.assertThat(result).hasToString("[" + String.join(", ", 
expected) + "]");
     }
 
@@ -655,6 +664,55 @@ public class TransformParserTest {
                 
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")");
     }
 
+    @Test
+    public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() {
+        Map<String, String> columnNameMap = new HashMap<>();
+        columnNameMap.put("a", "$0");
+        columnNameMap.put("b", "$1");
+        columnNameMap.put("a-b", "$2");
+
+        testFilterExpressionWithUdf(
+                "format(upper(a))",
+                "__instanceOfFormatFunctionClass.eval(upper($0))",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "format(lower(b))",
+                "__instanceOfFormatFunctionClass.eval(lower($1))",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "format(concat(a,b))",
+                "__instanceOfFormatFunctionClass.eval(concat($0, $1))",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "format(SUBSTR(`a-b`,1))",
+                "__instanceOfFormatFunctionClass.eval(substr($2, 1))",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "typeof(`a-b` like '^[a-zA-Z]')",
+                "__instanceOfTypeOfFunctionClass.eval(like($2, 
\"^[a-zA-Z]\"))",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "typeof(`a-b` not like '^[a-zA-Z]')",
+                "__instanceOfTypeOfFunctionClass.eval(notLike($2, 
\"^[a-zA-Z]\"))",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "typeof(a-b-`a-b`)",
+                "__instanceOfTypeOfFunctionClass.eval($0 - $1 - $2)",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "typeof(a-b-2)",
+                "__instanceOfTypeOfFunctionClass.eval($0 - $1 - 2)",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "addone(addone(`a-b`)) > 4 OR typeof(a-b) <> 'bool' AND 
format('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
+                
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")",
+                columnNameMap);
+        testFilterExpressionWithUdf(
+                "ADDONE(ADDONE(`a-b`)) > 4 OR TYPEOF(a-b) <> 'bool' AND 
FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
+                
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
 4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") && 
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\", 
\"a\", \"z\", \"lie\"), \"\")",
+                columnNameMap);
+    }
+
     @Test
     void testLargeNumericalLiterals() {
         // For literals within [-2147483648, 2147483647] range, plain Integers 
are OK
@@ -671,26 +729,64 @@ public class TransformParserTest {
         Assertions.assertThatThrownBy(
                         () ->
                                 
TransformParser.translateFilterExpressionToJaninoExpression(
-                                        "id > 9223372036854775808", 
Collections.emptyList()))
+                                        "id > 9223372036854775808",
+                                        Collections.emptyList(),
+                                        Collections.emptyMap()))
                 .isExactlyInstanceOf(CalciteContextException.class)
                 .hasMessageContaining("Numeric literal '9223372036854775808' 
out of range");
 
         Assertions.assertThatThrownBy(
                         () ->
                                 
TransformParser.translateFilterExpressionToJaninoExpression(
-                                        "id < -9223372036854775809", 
Collections.emptyList()))
+                                        "id < -9223372036854775809",
+                                        Collections.emptyList(),
+                                        Collections.emptyMap()))
                 .isExactlyInstanceOf(CalciteContextException.class)
                 .hasMessageContaining("Numeric literal '-9223372036854775809' 
out of range");
     }
 
+    @Test
+    public void testProjectionColumnsWithColumnNameMap() {
+        List<Column> testColumns =
+                Arrays.asList(
+                        Column.physicalColumn("a", DataTypes.INT(), "a"),
+                        Column.physicalColumn("b", DataTypes.INT(), "b"),
+                        Column.physicalColumn("a-b", DataTypes.DOUBLE(), 
"`a-b`"));
+
+        List<ProjectionColumn> result =
+                TransformParser.generateProjectionColumns(
+                        "a, b, a-b as c, `a-b`, `a-b` AS d, `a-b`-1 AS e, 
a-b+`a-b` AS f, `test-meta-col`, `test-meta-col`-a-b AS g",
+                        testColumns,
+                        Collections.emptyList(),
+                        new SupportedMetadataColumn[] {new 
TestMetadataColumn()});
+
+        List<String> expected =
+                Arrays.asList(
+                        "ProjectionColumn{column=`a` INT 'a', expression='a', 
scriptExpression='$0', originalColumnNames=[a], columnNameMap={a=$0}}",
+                        "ProjectionColumn{column=`b` INT 'b', expression='b', 
scriptExpression='$0', originalColumnNames=[b], columnNameMap={b=$0}}",
+                        "ProjectionColumn{column=`c` INT, expression='`TB`.`a` 
- `TB`.`b`', scriptExpression='$0 - $1', originalColumnNames=[a, b], 
columnNameMap={a=$0, b=$1}}",
+                        "ProjectionColumn{column=`a-b` DOUBLE '`a-b`', 
expression='a-b', scriptExpression='$0', originalColumnNames=[a-b], 
columnNameMap={a-b=$0}}",
+                        "ProjectionColumn{column=`d` DOUBLE '`a-b`', 
expression='a-b', scriptExpression='$0', originalColumnNames=[a-b], 
columnNameMap={a-b=$0}}",
+                        "ProjectionColumn{column=`e` DOUBLE, 
expression='`TB`.`a-b` - 1', scriptExpression='$0 - 1', 
originalColumnNames=[a-b], columnNameMap={a-b=$0}}",
+                        "ProjectionColumn{column=`f` DOUBLE, 
expression='`TB`.`a` - `TB`.`b` + `TB`.`a-b`', scriptExpression='$0 - $1 + $2', 
originalColumnNames=[a, b, a-b], columnNameMap={a=$0, b=$1, a-b=$2}}",
+                        "ProjectionColumn{column=`test-meta-col` INT NOT NULL, 
expression='test-meta-col', scriptExpression='$0', 
originalColumnNames=[test-meta-col], columnNameMap={test-meta-col=$0}}",
+                        "ProjectionColumn{column=`g` INT, 
expression='`TB`.`test-meta-col` - `TB`.`a` - `TB`.`b`', scriptExpression='$0 - 
$1 - $2', originalColumnNames=[test-meta-col, a, b], columnNameMap={a=$1, b=$2, 
test-meta-col=$0}}");
+        Assertions.assertThat(result).hasToString("[" + String.join(", ", 
expected) + "]");
+    }
+
     private void testFilterExpression(String expression, String 
expressionExpect) {
         String janinoExpression =
                 TransformParser.translateFilterExpressionToJaninoExpression(
-                        expression, Collections.emptyList());
+                        expression, Collections.emptyList(), 
Collections.emptyMap());
         Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
     }
 
     private void testFilterExpressionWithUdf(String expression, String 
expressionExpect) {
+        testFilterExpressionWithUdf(expression, expressionExpect, 
Collections.emptyMap());
+    }
+
+    private void testFilterExpressionWithUdf(
+            String expression, String expressionExpect, Map<String, String> 
columnNameMap) {
         String janinoExpression =
                 TransformParser.translateFilterExpressionToJaninoExpression(
                         expression,
@@ -703,7 +799,32 @@ public class TransformParserTest {
                                         
"org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass"),
                                 new UserDefinedFunctionDescriptor(
                                         "typeof",
-                                        
"org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")));
+                                        
"org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")),
+                        columnNameMap);
         Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
     }
+
+    /** Test metadata column. */
+    private static class TestMetadataColumn implements SupportedMetadataColumn 
{
+        @Override
+        public String getName() {
+            // Column name contains '-', which can be used to test column name 
map.
+            return "test-meta-col";
+        }
+
+        @Override
+        public DataType getType() {
+            return DataTypes.INT();
+        }
+
+        @Override
+        public Class<?> getJavaClass() {
+            return Integer.class;
+        }
+
+        @Override
+        public Object read(Map<String, String> metadata) {
+            return 0;
+        }
+    }
 }

Reply via email to