yuxiqian opened a new pull request, #3285:
URL: https://github.com/apache/flink-cdc/pull/3285

   This closes [FLINK-35272](https://issues.apache.org/jira/browse/FLINK-35272).
   
   Currently, pipeline jobs with transform (including projection and filtering) 
are constructed with the following topology:
   
   ```
   SchemaTransformOp --> DataTransformOp --> SchemaOp
   ```
   
   where schema projections are applied in `SchemaTransformOp` and data 
projection & filtering are applied in `DataTransformOp`. The idea is 
`SchemaTransformOp` might be embedded in `Sources` in the future to reduce 
payload data size transferred in Flink Job.
   
   However, current implementation has a known defect that omits unused columns 
too early, causing some downstream-relied columns got removed after they 
arrived in `DataTransformOp`. See a example as follows:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
     - source-table: employee
       projection: id, upper(name) as newname
       filter: age > 18
   ```
   
   Such transformation rules will fail since `name` and `age` columns are 
removed in `SchemaTransformOp`, and those data rows could not be retrieved in 
`DataTransformOp`, where the actual expression evaluation and filtering comes 
into effect.
   
   This PR introduces a new design, renaming the transform topology as follows:
   
   ```
   PreTransformOp --> PostTransformOp --> SchemaOp
   ```
   
   where the `PreTransformOp` filters out columns, but only if:
   * The column is not present in projection rules
   * The column is not indirectly referenced by calculation and filtering 
expressions
   
   If a column is explicity written down, it will be passed to downstream 
as-is. But for referenced columns, a special prefix will be added to their 
names. In the example above, a schema like `[id, newname, __PREFIX__name, 
__PREFIX__age]` will be generated to downstream. Notice that the expression 
evaluation and filtering will not come into effect for now, so a 
`DataChangeEvent` would be like `[1, null, 'Alice', 19]`.
   
   Adding prefix is meant to deal with such cases:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
     - source-table: employee
       projection: id, upper(name) as name
       filter: age > 18
   ```
   
   Here we need to distinguish the calculated column `(new) name` and the 
referenced original column `(old) name`. So after the name mangling process the 
schema would be like: `[id, name, __PREFIX__name]`.
   
   Also, the filtering process is still done in `PostTransformOp` since user 
could write down a  filter expression that references calculated column, but 
their value won't be available until `PostTransformOp`'s evaluation. It also 
means in the following somewhat ambigious case:
   
   ```
   # Schema is (ID INT NOT NULL, NAME STRING, AGE INT)
   transform:
     - source-table: employee
       projection: id, age * 2 as age
       filter: age > 18
   ```
   
   The filtering expression is applied to the calculated `age` column 
(doubled!) instead of the original one.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to