gustavodemorais commented on code in PR #28199:
URL: https://github.com/apache/flink/pull/28199#discussion_r3267731755


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -369,6 +381,13 @@ When `PARTITION BY` is provided, **the output schema 
changes**. The partition ke
 
 Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
 
+#### Avoiding ChangelogNormalize for upsert sources
+
+When the input is an upsert source (emits `UPDATE_AFTER` but no 
`UPDATE_BEFORE`), the planner inserts a `ChangelogNormalize` operator by 
default to materialize `UPDATE_BEFORE` rows and complete `DELETE` payloads. 
This operator is stateful and can be expensive. When `PARTITION BY` is 
provided, the planner skips `ChangelogNormalize` if `op_mapping` does not emit 
the corresponding kinds:
+
+* Omit `UPDATE_BEFORE` from `op_mapping` to skip `UPDATE_BEFORE` 
materialization.
+* If the source emits partial `DELETE` events (only the keys flow through, 
common with Flink's `upsert-kafka` connector or other key-compacted topics), 
it's necessary to omit `DELETE` from `op_mapping` to skip the full-`DELETE` 
materialization step that also happens in `ChangelogNormalize`.

Review Comment:
   I agree this is advanced but I wanted to have this information somewhere and 
can't think of a better place yet - maybe a new session called performance 
optimization at the bottom? So people only get to it if necessary. I can 
imagine advanced users wanting to tune their jobs to not have 
ChangelogNormalize and this explains how



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to