This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 17d21a957 [FLINK-35984][cdc-runtime] Fix bug that metadata column name 
can not be used in transform rule
17d21a957 is described below

commit 17d21a957a4e73648788c5abe493bf7be6b63a5b
Author: MOBIN <[email protected]>
AuthorDate: Mon Aug 12 14:34:10 2024 +0800

    [FLINK-35984][cdc-runtime] Fix bug that metadata column name can not be 
used in transform rule
    
    This closes  #3528.
    
    Co-authored-by: yuxiqian <[email protected]>
---
 .../transform/ProjectionColumnProcessor.java       | 30 ++++++-------
 .../flink/cdc/runtime/parser/TransformParser.java  | 26 +++--------
 .../transform/UnifiedTransformOperatorTest.java    | 51 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 35 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index a27af2370..cbe290dcb 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -156,22 +156,22 @@ public class ProjectionColumnProcessor {
                 }
             }
         }
-        if (scriptExpression.contains(TransformParser.DEFAULT_NAMESPACE_NAME)
-                && 
!argumentNames.contains(TransformParser.DEFAULT_NAMESPACE_NAME)) {
-            argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
-            paramTypes.add(String.class);
-        }
-
-        if (scriptExpression.contains(TransformParser.DEFAULT_SCHEMA_NAME)
-                && 
!argumentNames.contains(TransformParser.DEFAULT_SCHEMA_NAME)) {
-            argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
-            paramTypes.add(String.class);
-        }
 
-        if (scriptExpression.contains(TransformParser.DEFAULT_TABLE_NAME)
-                && 
!argumentNames.contains(TransformParser.DEFAULT_TABLE_NAME)) {
-            argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
-            paramTypes.add(String.class);
+        for (String originalColumnName : originalColumnNames) {
+            switch (originalColumnName) {
+                case TransformParser.DEFAULT_NAMESPACE_NAME:
+                    argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
+                    paramTypes.add(String.class);
+                    break;
+                case TransformParser.DEFAULT_SCHEMA_NAME:
+                    argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
+                    paramTypes.add(String.class);
+                    break;
+                case TransformParser.DEFAULT_TABLE_NAME:
+                    argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
+                    paramTypes.add(String.class);
+                    break;
+            }
         }
 
         argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 6260bf7b0..ae598b7da 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -109,7 +109,7 @@ public class TransformParser {
             List<Column> columns,
             SqlNode sqlNode,
             List<UserDefinedFunctionDescriptor> udfDescriptors) {
-        List<Column> columnsWithMetadata = 
copyFillMetadataColumn(sqlNode.toString(), columns);
+        List<Column> columnsWithMetadata = copyFillMetadataColumn(columns);
         CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
         SchemaPlus schema = rootSchema.plus();
         Map<String, Object> operand = new HashMap<>();
@@ -498,29 +498,15 @@ public class TransformParser {
         return parseSelect(statement.toString());
     }
 
-    private static List<Column> copyFillMetadataColumn(
-            String transformStatement, List<Column> columns) {
+    private static List<Column> copyFillMetadataColumn(List<Column> columns) {
+        // Add metaColumn for SQLValidator.validate
         List<Column> columnsWithMetadata = new ArrayList<>(columns);
-        if (transformStatement.contains(DEFAULT_NAMESPACE_NAME)
-                && !containsMetadataColumn(columnsWithMetadata, 
DEFAULT_NAMESPACE_NAME)) {
-            columnsWithMetadata.add(
-                    Column.physicalColumn(DEFAULT_NAMESPACE_NAME, 
DataTypes.STRING()));
-        }
-        if (transformStatement.contains(DEFAULT_SCHEMA_NAME)
-                && !containsMetadataColumn(columnsWithMetadata, 
DEFAULT_SCHEMA_NAME)) {
-            columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, 
DataTypes.STRING()));
-        }
-        if (transformStatement.contains(DEFAULT_TABLE_NAME)
-                && !containsMetadataColumn(columnsWithMetadata, 
DEFAULT_TABLE_NAME)) {
-            columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, 
DataTypes.STRING()));
-        }
+        columnsWithMetadata.add(Column.physicalColumn(DEFAULT_NAMESPACE_NAME, 
DataTypes.STRING()));
+        columnsWithMetadata.add(Column.physicalColumn(DEFAULT_SCHEMA_NAME, 
DataTypes.STRING()));
+        columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, 
DataTypes.STRING()));
         return columnsWithMetadata;
     }
 
-    private static boolean containsMetadataColumn(List<Column> columns, String 
columnName) {
-        return columns.stream().anyMatch(column -> 
column.getName().equals(columnName));
-    }
-
     private static boolean isMetadataColumn(String columnName) {
         return DEFAULT_TABLE_NAME.equals(columnName)
                 || DEFAULT_SCHEMA_NAME.equals(columnName)
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index 009d700fe..43a364e51 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -917,6 +917,57 @@ public class UnifiedTransformOperatorTest {
                 .destroyHarness();
     }
 
+    @Test
+    public void testMetadataTransformIncludeMetaColumnString() throws 
Exception {
+        TableId tableId = TableId.tableId("my_company", "my_branch", 
"schema_nullability");
+        UnifiedTransformTestCase.of(
+                        tableId,
+                        "id, name, age, id + age as computed, 
__namespace_name__ as metaColNameSpaceName,  __schema_name__ as 
metaColSchemaName, __table_name__ as metaColNameTableName, "
+                                + "UPPER(__schema_name__) as 
metaColSchemaNameUpper, '__table_name__' as metaColStr1, 
'__namespace__name__schema__name__table__name__' as metaColStr2",
+                        "id > 100",
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", 
DataTypes.STRING().notNull())
+                                .physicalColumn("age", 
DataTypes.INT().notNull())
+                                .primaryKey("id")
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", 
DataTypes.STRING().notNull())
+                                .physicalColumn("age", 
DataTypes.INT().notNull())
+                                .primaryKey("id")
+                                .build(),
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("name", 
DataTypes.STRING().notNull())
+                                .physicalColumn("age", 
DataTypes.INT().notNull())
+                                .physicalColumn("computed", DataTypes.INT())
+                                .physicalColumn("metaColNameSpaceName", 
DataTypes.STRING())
+                                .physicalColumn("metaColSchemaName", 
DataTypes.STRING())
+                                .physicalColumn("metaColNameTableName", 
DataTypes.STRING())
+                                .physicalColumn("metaColSchemaNameUpper", 
DataTypes.STRING())
+                                .physicalColumn("metaColStr1", 
DataTypes.STRING())
+                                .physicalColumn("metaColStr2", 
DataTypes.STRING())
+                                .primaryKey("id")
+                                .build())
+                .initializeHarness()
+                .insertSource(1000, "Alice", 17)
+                .insertPreTransformed(1000, "Alice", 17)
+                .insertPostTransformed(
+                        1000,
+                        "Alice",
+                        17,
+                        1017,
+                        "my_company",
+                        "my_branch",
+                        "schema_nullability",
+                        "MY_BRANCH",
+                        "__table_name__",
+                        "__namespace__name__schema__name__table__name__")
+                .runTests()
+                .destroyHarness();
+    }
+
     @Test
     public void testTransformWithCast() throws Exception {
         TableId tableId = TableId.tableId("my_company", "my_branch", 
"transform_with_cast");

Reply via email to