yuxiqian commented on code in PR #3738:
URL: https://github.com/apache/flink-cdc/pull/3738#discussion_r1846538740
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java:
##########
@@ -688,6 +1326,195 @@ void testPostAsteriskWithSchemaEvolution() throws
Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15
-> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
}
+ @Test
+ void testTransformWithFilterButNoProjection() throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId tableId = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ List<Event> events = generateSchemaEvolutionEvents(tableId);
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+
"default_namespace.default_schema.\\.*",
+ null,
+ "id > 1",
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ assertThat(outputEvents)
+ .containsExactly(
+ // Initial stage
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[3, Cecily, 23], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3,
Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Barcarolle, 22], after=[], op=DELETE, meta=()}",
+
+ // Add column stage
+
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1,
addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE,
existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT,
position=AFTER, existedColumnName=age}]}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
+
+ // Alter column type stage
+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1,
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE},
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
Review Comment:
```suggestion
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1,
nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
```
This a typo in AlterColumnType release-3.2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]