This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 49dc957ac [FLINK-36981][transform] Considering sharding tables with 
different schema in transform projection
49dc957ac is described below

commit 49dc957ac6e3f8bde4b6f91603a2e7735e5977ec
Author: Wink <809097...@qq.com>
AuthorDate: Thu Jan 9 12:13:20 2025 +0800

    [FLINK-36981][transform] Considering sharding tables with different schema 
in transform projection
    
    This closes #3826.
---
 .../flink/FlinkPipelineComposerITCase.java         | 237 ++++++++++++++++++++-
 .../operators/transform/PostTransformOperator.java |  37 +++-
 .../operators/transform/TransformProjection.java   |  13 +-
 .../transform/TransformProjectionProcessor.java    |   4 +
 4 files changed, 262 insertions(+), 29 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index c48f94402..6d77bcfcd 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -54,7 +54,6 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -495,8 +494,9 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
     }
 
-    @Test
-    void testOneToOneRouting() throws Exception {
+    @ParameterizedTest
+    @EnumSource
+    void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -510,6 +510,7 @@ class FlinkPipelineComposerITCase {
         // Setup value sink
         Configuration sinkConfig = new Configuration();
         sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
         SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
 
         // Setup route
@@ -570,8 +571,9 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[2, 
2], after=[2, x], op=UPDATE, meta=()}");
     }
 
-    @Test
-    void testIdenticalOneToOneRouting() throws Exception {
+    @ParameterizedTest
+    @EnumSource
+    void testIdenticalOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -585,6 +587,7 @@ class FlinkPipelineComposerITCase {
         // Setup value sink
         Configuration sinkConfig = new Configuration();
         sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
         SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
 
         // Setup route
@@ -645,8 +648,9 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
2], after=[2, x], op=UPDATE, meta=()}");
     }
 
-    @Test
-    void testMergingWithRoute() throws Exception {
+    @ParameterizedTest
+    @EnumSource
+    void testMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws Exception 
{
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -678,7 +682,7 @@ class FlinkPipelineComposerITCase {
         // Table 1: +I[1, Alice, 18]
         // Table 1: +I[2, Bob, 20]
         // Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
-        // Create table 2 [id, name, age]
+        // Create table 2 [id, name, age, description]
         // Table 2: +I[3, Charlie, 15, student]
         // Table 2: +I[4, Donald, 25, student]
         // Table 2: -D[4, Donald, 25, student]
@@ -782,6 +786,7 @@ class FlinkPipelineComposerITCase {
         // Setup value sink
         Configuration sinkConfig = new Configuration();
         sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
         SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
 
         // Setup route
@@ -841,8 +846,9 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[6, Frank, 30, student, null, male], op=INSERT, meta=()}");
     }
 
-    @Test
-    void testTransformMergingWithRoute() throws Exception {
+    @ParameterizedTest
+    @EnumSource
+    void testTransformMergingWithRoute(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
 
         // Setup value source
@@ -978,6 +984,7 @@ class FlinkPipelineComposerITCase {
         // Setup value sink
         Configuration sinkConfig = new Configuration();
         sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
         SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
 
         // Setup transform
@@ -1049,6 +1056,216 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testTransformMergingWithRouteChangeOrder(ValuesDataSink.SinkApi 
sinkApi) throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId myTable1 = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        TableId myTable2 = TableId.tableId("default_namespace", 
"default_schema", "mytable2");
+        Schema table1Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .physicalColumn("age", DataTypes.INT())
+                        .primaryKey("id")
+                        .build();
+        Schema table2Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT())
+                        .physicalColumn("name", DataTypes.VARCHAR(255))
+                        .physicalColumn("age", DataTypes.TINYINT())
+                        .physicalColumn("description", DataTypes.STRING())
+                        .primaryKey("id")
+                        .build();
+
+        // Create test dataset:
+        // Create table 1 [id, name, age]
+        // Create table 2 [id, name, age, description]
+        // Table 1: +I[1, Alice, 18]
+        // Table 1: +I[2, Bob, 20]
+        // Table 1: -U[2, Bob, 20] +U[2, Bob, 30]
+        // Table 2: +I[3, Charlie, 15, student]
+        // Table 2: +I[4, Donald, 25, student]
+        // Table 2: -D[4, Donald, 25, student]
+        // Rename column for table 1: name -> last_name
+        // Add column for table 2: gender
+        // Table 1: +I[5, Eliza, 24]
+        // Table 2: +I[6, Frank, 30, student, male]
+        List<Event> events = new ArrayList<>();
+        BinaryRecordDataGenerator table1dataGenerator =
+                new BinaryRecordDataGenerator(
+                        table1Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+        BinaryRecordDataGenerator table2dataGenerator =
+                new BinaryRecordDataGenerator(
+                        table2Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+        events.add(new CreateTableEvent(myTable1, table1Schema));
+        events.add(new CreateTableEvent(myTable2, table2Schema));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Alice"), 18})));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 20})));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 20}),
+                        table1dataGenerator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 30})));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    3L,
+                                    BinaryStringData.fromString("Charlie"),
+                                    (byte) 15,
+                                    BinaryStringData.fromString("student")
+                                })));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    4L,
+                                    BinaryStringData.fromString("Donald"),
+                                    (byte) 25,
+                                    BinaryStringData.fromString("student")
+                                })));
+        events.add(
+                DataChangeEvent.deleteEvent(
+                        myTable2,
+                        table2dataGenerator.generate(
+                                new Object[] {
+                                    4L,
+                                    BinaryStringData.fromString("Donald"),
+                                    (byte) 25,
+                                    BinaryStringData.fromString("student")
+                                })));
+        //        events.add(new RenameColumnEvent(myTable1, 
ImmutableMap.of("name", "last_name")));
+        events.add(
+                new AddColumnEvent(
+                        myTable2,
+                        Collections.singletonList(
+                                new AddColumnEvent.ColumnWithPosition(
+                                        Column.physicalColumn("gender", 
DataTypes.STRING())))));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        table1dataGenerator.generate(
+                                new Object[] {5, 
BinaryStringData.fromString("Eliza"), 24})));
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable2,
+                        new BinaryRecordDataGenerator(
+                                        new DataType[] {
+                                            DataTypes.BIGINT(),
+                                            DataTypes.VARCHAR(255),
+                                            DataTypes.TINYINT(),
+                                            DataTypes.STRING(),
+                                            DataTypes.STRING()
+                                        })
+                                .generate(
+                                        new Object[] {
+                                            6L,
+                                            
BinaryStringData.fromString("Frank"),
+                                            (byte) 30,
+                                            
BinaryStringData.fromString("student"),
+                                            BinaryStringData.fromString("male")
+                                        })));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup transform
+        List<TransformDef> transformDef =
+                Collections.singletonList(
+                        new TransformDef(
+                                
"default_namespace.default_schema.mytable[0-9]",
+                                "*,'last_name' as last_name",
+                                null,
+                                null,
+                                null,
+                                null,
+                                "",
+                                null));
+
+        // Setup route
+        TableId mergedTable = TableId.tableId("default_namespace", 
"default_schema", "merged");
+        List<RouteDef> routeDef =
+                Collections.singletonList(
+                        new RouteDef(
+                                
"default_namespace.default_schema.mytable[0-9]",
+                                mergedTable.toString(),
+                                null,
+                                null));
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        routeDef,
+                        transformDef,
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+        Schema mergedTableSchema = ValuesDatabase.getTableSchema(mergedTable);
+        assertThat(mergedTableSchema)
+                .isEqualTo(
+                        Schema.newBuilder()
+                                .physicalColumn("id", DataTypes.BIGINT())
+                                .physicalColumn("name", DataTypes.STRING())
+                                .physicalColumn("age", DataTypes.INT())
+                                .physicalColumn("last_name", 
DataTypes.STRING())
+                                .physicalColumn("description", 
DataTypes.STRING())
+                                .physicalColumn("gender", DataTypes.STRING())
+                                .primaryKey("id")
+                                .build());
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+        assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.merged, 
schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, 
primaryKeys=id, options=()}",
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.merged, 
addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, 
existedColumnName=last_name}]}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, 
typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[1, Alice, 18, last_name, null], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[2, Bob, 20, last_name, null], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, 
Bob, 20, last_name, null], after=[2, Bob, 30, last_name, null], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, 
Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.merged, 
addedColumns=[ColumnWithPosition{column=`gender` STRING, position=AFTER, 
existedColumnName=description}]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[5, Eliza, 24, last_name, null, null], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[6, Frank, 30, last_name, student, male], op=INSERT, meta=()}");
+    }
+
     @ParameterizedTest
     @EnumSource
     void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index c565b5610..c7c69fa6c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -81,7 +81,7 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
     private List<UserDefinedFunctionDescriptor> udfDescriptors;
     private transient Map<String, Object> udfFunctionInstances;
 
-    private transient Map<Tuple2<TableId, TransformProjection>, 
TransformProjectionProcessor>
+    private transient Map<Tuple2<TableId, String>, 
TransformProjectionProcessor>
             transformProjectionProcessorMap;
     private transient Map<Tuple2<TableId, TransformFilter>, 
TransformFilterProcessor>
             transformFilterProcessorMap;
@@ -355,12 +355,14 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
         for (PostTransformer transform : transforms) {
             Selectors selectors = transform.getSelectors();
             if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
-                TransformProjection transformProjection = 
transform.getProjection().get();
+                TransformProjection transformProjection =
+                        
TransformProjection.of(transform.getProjection().get().getProjection())
+                                .get();
                 if (transformProjection.isValid()) {
                     if (!transformProjectionProcessorMap.containsKey(
-                            Tuple2.of(tableId, transformProjection))) {
+                            Tuple2.of(tableId, 
transformProjection.getProjection()))) {
                         transformProjectionProcessorMap.put(
-                                Tuple2.of(tableId, transformProjection),
+                                Tuple2.of(tableId, 
transformProjection.getProjection()),
                                 TransformProjectionProcessor.of(
                                         transformProjection,
                                         timezone,
@@ -370,7 +372,7 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                     }
                     TransformProjectionProcessor postTransformProcessor =
                             transformProjectionProcessorMap.get(
-                                    Tuple2.of(tableId, transformProjection));
+                                    Tuple2.of(tableId, 
transformProjection.getProjection()));
                     // update the columns of projection and add the column of 
projection into Schema
                     newSchemas.add(
                             postTransformProcessor.processSchema(
@@ -434,12 +436,9 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                         && transformProjectionOptional.get().isValid()) {
                     TransformProjection transformProjection = 
transformProjectionOptional.get();
                     if (!transformProjectionProcessorMap.containsKey(
-                                    Tuple2.of(tableId, transformProjection))
-                            || !transformProjectionProcessorMap
-                                    .get(Tuple2.of(tableId, 
transformProjection))
-                                    .hasTableInfo()) {
+                            Tuple2.of(tableId, 
transformProjection.getProjection()))) {
                         transformProjectionProcessorMap.put(
-                                Tuple2.of(tableId, transformProjection),
+                                Tuple2.of(tableId, 
transformProjection.getProjection()),
                                 TransformProjectionProcessor.of(
                                         tableInfo,
                                         transformProjection,
@@ -447,10 +446,26 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                                         udfDescriptors,
                                         getUdfFunctionInstances(),
                                         
transform.getSupportedMetadataColumns()));
+                    } else if (!transformProjectionProcessorMap
+                            .get(Tuple2.of(tableId, 
transformProjection.getProjection()))
+                            .hasTableInfo()) {
+                        TransformProjectionProcessor 
transformProjectionProcessorWithoutTableInfo =
+                                transformProjectionProcessorMap.get(
+                                        Tuple2.of(tableId, 
transformProjection.getProjection()));
+                        transformProjectionProcessorMap.put(
+                                Tuple2.of(tableId, 
transformProjection.getProjection()),
+                                TransformProjectionProcessor.of(
+                                        tableInfo,
+                                        
transformProjectionProcessorWithoutTableInfo
+                                                .getTransformProjection(),
+                                        timezone,
+                                        udfDescriptors,
+                                        getUdfFunctionInstances(),
+                                        
transform.getSupportedMetadataColumns()));
                     }
                     TransformProjectionProcessor postTransformProcessor =
                             transformProjectionProcessorMap.get(
-                                    Tuple2.of(tableId, transformProjection));
+                                    Tuple2.of(tableId, 
transformProjection.getProjection()));
                     dataChangeEventOptional =
                             processProjection(
                                     postTransformProcessor,
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java
index 94abba117..3d15e78aa 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjection.java
@@ -17,14 +17,12 @@
 
 package org.apache.flink.cdc.runtime.operators.transform;
 
-import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.utils.StringUtils;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * The projection of transform applies to describe a projection of filtering 
tables. Projection
@@ -42,6 +40,11 @@ public class TransformProjection implements Serializable {
     private String projection;
     private List<ProjectionColumn> projectionColumns;
 
+    public TransformProjection(String projection) {
+        this.projection = projection;
+        this.projectionColumns = new ArrayList<>();
+    }
+
     public TransformProjection(String projection, List<ProjectionColumn> 
projectionColumns) {
         this.projection = projection;
         this.projectionColumns = projectionColumns;
@@ -69,10 +72,4 @@ public class TransformProjection implements Serializable {
         }
         return Optional.of(new TransformProjection(projection, new 
ArrayList<>()));
     }
-
-    public List<Column> getAllColumnList() {
-        return projectionColumns.stream()
-                .map(ProjectionColumn::getColumn)
-                .collect(Collectors.toList());
-    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
index dc8c099d6..8ac1cbe56 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
@@ -81,6 +81,10 @@ public class TransformProjectionProcessor {
         return this.postTransformChangeInfo != null;
     }
 
+    public TransformProjection getTransformProjection() {
+        return transformProjection;
+    }
+
     public static TransformProjectionProcessor of(
             PostTransformChangeInfo tableInfo,
             TransformProjection transformProjection,

Reply via email to