yuxiqian opened a new pull request, #3285: URL: https://github.com/apache/flink-cdc/pull/3285
This closes [FLINK-35272](https://issues.apache.org/jira/browse/FLINK-35272). Currently, pipeline jobs with transform (including projection and filtering) are constructed with the following topology: ``` SchemaTransformOp --> DataTransformOp --> SchemaOp ``` where schema projections are applied in `SchemaTransformOp` and data projection & filtering are applied in `DataTransformOp`. The idea is `SchemaTransformOp` might be embedded in `Sources` in the future to reduce payload data size transferred in Flink Job. However, current implementation has a known defect that omits unused columns too early, causing some downstream-relied columns got removed after they arrived in `DataTransformOp`. See a example as follows: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, upper(name) as newname filter: age > 18 ``` Such transformation rules will fail since `name` and `age` columns are removed in `SchemaTransformOp`, and those data rows could not be retrieved in `DataTransformOp`, where the actual expression evaluation and filtering comes into effect. This PR introduces a new design, renaming the transform topology as follows: ``` PreTransformOp --> PostTransformOp --> SchemaOp ``` where the `PreTransformOp` filters out columns, but only if: * The column is not present in projection rules * The column is not indirectly referenced by calculation and filtering expressions If a column is explicity written down, it will be passed to downstream as-is. But for referenced columns, a special prefix will be added to their names. In the example above, a schema like `[id, newname, __PREFIX__name, __PREFIX__age]` will be generated to downstream. Notice that the expression evaluation and filtering will not come into effect for now, so a `DataChangeEvent` would be like `[1, null, 'Alice', 19]`. Adding prefix is meant to deal with such cases: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, upper(name) as name filter: age > 18 ``` Here we need to distinguish the calculated column `(new) name` and the referenced original column `(old) name`. So after the name mangling process the schema would be like: `[id, name, __PREFIX__name]`. Also, the filtering process is still done in `PostTransformOp` since user could write down a filter expression that references calculated column, but their value won't be available until `PostTransformOp`'s evaluation. It also means in the following somewhat ambigious case: ``` # Schema is (ID INT NOT NULL, NAME STRING, AGE INT) transform: - source-table: employee projection: id, age * 2 as age filter: age > 18 ``` The filtering expression is applied to the calculated `age` column (doubled!) instead of the original one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org