This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit da653566ee6541a91f6ddae1f971a081fe8d100c
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Thu Nov 14 12:04:55 2024 +0800

    [FLINK-36596][transform] Fix unable to schema evolve with project-less 
transform rules
    
    This closes  #3665.
---
 .../flink/FlinkPipelineTransformITCase.java        | 94 ++++++++++++++++++++++
 .../flink/cdc/runtime/parser/TransformParser.java  |  3 +-
 2 files changed, 96 insertions(+), 1 deletion(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index abea0c943..5c19ae484 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -1227,6 +1227,100 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 
-> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
     }
 
+    @Test
+    void testTransformWithFilterButNoProjection() throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId tableId = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        List<Event> events = generateSchemaEvolutionEvents(tableId);
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        
"default_namespace.default_schema.\\.*",
+                                        null,
+                                        "id > 1",
+                                        null,
+                                        null,
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        assertThat(outputEvents)
+                .containsExactly(
+                        // Initial stage
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, 
Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Barcarolle, 22], after=[], op=DELETE, meta=()}",
+
+                        // Add column stage
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, 
addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, 
existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, 
position=AFTER, existedColumnName=age}]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
+
+                        // Alter column type stage
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+                        // Rename column stage
+                        
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=biological_sex, age=toshi}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+                        // Drop column stage
+                        
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, 
droppedColumnNames=[biological_sex, toshi]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[10th, 13, Munroe], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[11th, 14, Neko], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[12th, 15, Oops], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
+    }
+
     @Test
     void testTransformUnmatchedSchemaEvolution() throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index 38121f35d..547d9cd87 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -588,7 +588,8 @@ public class TransformParser {
 
     public static boolean hasAsterisk(@Nullable String projection) {
         if (isNullOrWhitespaceOnly(projection)) {
-            return false;
+            // Providing an empty projection expression is equivalent to 
writing `*` explicitly.
+            return true;
         }
         return parseProjectionExpression(projection).getOperandList().stream()
                 .anyMatch(TransformParser::hasAsterisk);

Reply via email to