gustavodemorais commented on code in PR #28199: URL: https://github.com/apache/flink/pull/28199#discussion_r3267731755
########## docs/content/docs/sql/reference/queries/changelog.md: ########## @@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema changes**. The partition ke Prefer row semantics, when possible. `PARTITION BY` is only necessary when downstream operators are keyed on that column and you want to co-locate rows for the same key in the same parallel operator instance. +#### Avoiding ChangelogNormalize for upsert sources + +When the input is an upsert source (emits `UPDATE_AFTER` but no `UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. This operator is stateful and can be expensive. When `PARTITION BY` is provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit the corresponding kinds: + +* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` materialization. +* If the source emits partial `DELETE` events (only the keys flow through, common with Flink's `upsert-kafka` connector or other key-compacted topics), it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` materialization step that also happens in `ChangelogNormalize`. Review Comment: I agree this is advanced but I wanted to have this information somewhere and can't think of a better place yet - maybe a new session called performance optimization at the bottom? So people only get to it if necessary. I can imagine advanced users wanting to tune their jobs to not have ChangelogNormalize and this explains how -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
