This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new d83ccbeeb [FLINK-36754][transform] Projection should treated as an 
asterisk when projection expression is empty or null
d83ccbeeb is described below

commit d83ccbeeb45bc0d9a430f6c1988691b6eb636314
Author: MOBIN <18814118...@163.com>
AuthorDate: Thu Jan 16 15:32:26 2025 +0800

    [FLINK-36754][transform] Projection should treated as an asterisk when 
projection expression is empty or null
    
    This closes  #3749
---
 .../cdc/composer/definition/TransformDef.java      |  13 +-
 .../flink/translator/TransformTranslator.java      |   8 +-
 .../flink/FlinkPipelineTransformITCase.java        | 156 ++++++++++++++++++++-
 .../operators/transform/PreTransformOperator.java  |  13 +-
 .../runtime/operators/transform/TransformRule.java |   3 +-
 5 files changed, 165 insertions(+), 28 deletions(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
index ff54eb173..c081178b8 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
@@ -20,7 +20,6 @@ package org.apache.flink.cdc.composer.definition;
 import org.apache.flink.cdc.common.utils.StringUtils;
 
 import java.util.Objects;
-import java.util.Optional;
 
 /**
  * Definition of a transformation.
@@ -75,24 +74,24 @@ public class TransformDef {
         return sourceTable;
     }
 
-    public Optional<String> getProjection() {
-        return Optional.ofNullable(projection);
+    public String getProjection() {
+        return projection;
     }
 
     public boolean isValidProjection() {
         return !StringUtils.isNullOrWhitespaceOnly(projection);
     }
 
-    public Optional<String> getFilter() {
-        return Optional.ofNullable(filter);
+    public String getFilter() {
+        return filter;
     }
 
     public boolean isValidFilter() {
         return !StringUtils.isNullOrWhitespaceOnly(filter);
     }
 
-    public Optional<String> getDescription() {
-        return Optional.ofNullable(description);
+    public String getDescription() {
+        return description;
     }
 
     public String getPrimaryKeys() {
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index c7fb15541..b49cdc114 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -59,8 +59,8 @@ public class TransformTranslator {
         for (TransformDef transform : transforms) {
             preTransformFunctionBuilder.addTransform(
                     transform.getSourceTable(),
-                    transform.getProjection().orElse(null),
-                    transform.getFilter().orElse(null),
+                    transform.getProjection(),
+                    transform.getFilter(),
                     transform.getPrimaryKeys(),
                     transform.getPartitionKeys(),
                     transform.getTableOptions(),
@@ -98,8 +98,8 @@ public class TransformTranslator {
             if (transform.isValidProjection() || transform.isValidFilter()) {
                 postTransformFunctionBuilder.addTransform(
                         transform.getSourceTable(),
-                        transform.isValidProjection() ? 
transform.getProjection().get() : null,
-                        transform.isValidFilter() ? 
transform.getFilter().get() : null,
+                        transform.getProjection(),
+                        transform.getFilter(),
                         transform.getPrimaryKeys(),
                         transform.getPartitionKeys(),
                         transform.getTableOptions(),
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 6364b6981..2ac394add 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -77,6 +77,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Integration test for {@link FlinkPipelineComposer}. */
 class FlinkPipelineTransformITCase {
@@ -196,8 +197,8 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, 
options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}"));
     }
 
@@ -218,9 +219,9 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@@ -370,6 +371,151 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
     }
 
+    @ParameterizedTest
+    @EnumSource
+    @Disabled("to be fixed in FLINK-37132")
+    void testMultiTransformSchemaColumnsCompatibilityWithNullProjection(
+            ValuesDataSink.SinkApi sinkApi) {
+        TransformDef nullProjection =
+                new TransformDef(
+                        "default_namespace.default_schema.mytable2",
+                        null,
+                        "age < 18",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        assertThatThrownBy(
+                        () ->
+                                runGenericTransformTest(
+                                        sinkApi,
+                                        Arrays.asList(
+                                                nullProjection,
+                                                new TransformDef(
+                                                        
"default_namespace.default_schema.mytable2",
+                                                        // reference part 
column
+                                                        "id,UPPER(name) AS 
name",
+                                                        "age >= 18",
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        null)),
+                                        Collections.emptyList()))
+                .rootCause()
+                .isExactlyInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Unable to merge schema columns={`id` BIGINT,`name` 
VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+                                + "and columns={`id` BIGINT,`name` STRING}, 
primaryKeys=id, options=() with different column counts.");
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    @Disabled("to be fixed in FLINK-37132")
+    void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection(
+            ValuesDataSink.SinkApi sinkApi) {
+        TransformDef emptyProjection =
+                new TransformDef(
+                        "default_namespace.default_schema.mytable2",
+                        "",
+                        "age < 18",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        assertThatThrownBy(
+                        () ->
+                                runGenericTransformTest(
+                                        sinkApi,
+                                        Arrays.asList(
+                                                emptyProjection,
+                                                new TransformDef(
+                                                        
"default_namespace.default_schema.mytable2",
+                                                        // reference part 
column
+                                                        "id,UPPER(name) AS 
name",
+                                                        "age >= 18",
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        null,
+                                                        null)),
+                                        Collections.emptyList()))
+                .rootCause()
+                .isExactlyInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Unable to merge schema columns={`id` BIGINT,`name` 
VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+                                + "and columns={`id` BIGINT,`name` STRING}, 
primaryKeys=id, options=() with different column counts.");
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void 
testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi 
sinkApi)
+            throws Exception {
+        TransformDef nullProjection =
+                new TransformDef(
+                        "default_namespace.default_schema.mytable2",
+                        null,
+                        "age < 18",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        TransformDef emptyProjection =
+                new TransformDef(
+                        "default_namespace.default_schema.mytable2",
+                        "",
+                        "age < 18",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        TransformDef asteriskProjection =
+                new TransformDef(
+                        "default_namespace.default_schema.mytable2",
+                        "*",
+                        "age < 18",
+                        null,
+                        null,
+                        null,
+                        null,
+                        null);
+
+        runGenericTransformTest(
+                sinkApi,
+                Arrays.asList(
+                        // Setting projection as null, '', or * should be 
equivalent
+                        nullProjection,
+                        emptyProjection,
+                        asteriskProjection,
+                        new TransformDef(
+                                "default_namespace.default_schema.mytable2",
+                                // reference all column
+                                "id,UPPER(name) AS name,age,description",
+                                "age >= 18",
+                                null,
+                                null,
+                                null,
+                                null,
+                                null)),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` 
STRING}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
+    }
+
     /** This tests if transform generates metadata info correctly. */
     @ParameterizedTest
     @EnumSource
@@ -1447,7 +1593,7 @@ class FlinkPipelineTransformITCase {
         assertThat(outputEvents)
                 .containsExactly(
                         // Initial stage
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, 
Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index fd1d1d8c5..845fd4df1 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -389,17 +389,8 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
             if (!transform.getSelectors().isMatch(tableId)) {
                 continue;
             }
-            if (!transform.getProjection().isPresent()) {
-                processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
-                hasMatchTransform = true;
-            } else {
-                TransformProjection transformProjection = 
transform.getProjection().get();
-                if (transformProjection.isValid()) {
-                    processProjectionTransform(
-                            tableId, tableSchema, referencedColumnsSet, 
transform);
-                    hasMatchTransform = true;
-                }
-            }
+            processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, transform);
+            hasMatchTransform = true;
         }
         if (!hasMatchTransform) {
             processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
index 31a245257..d4459e5dd 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformRule.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.runtime.operators.transform;
 
 import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.utils.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -48,7 +49,7 @@ public class TransformRule implements Serializable {
             @Nullable String postTransformConverter,
             SupportedMetadataColumn[] supportedMetadataColumns) {
         this.tableInclusions = tableInclusions;
-        this.projection = projection;
+        this.projection = StringUtils.isNullOrWhitespaceOnly(projection) ? "*" 
: projection;
         this.filter = normalizeFilter(projection, filter);
         this.primaryKey = primaryKey;
         this.partitionKey = partitionKey;

Reply via email to