raminqaf commented on code in PR #28166:
URL: https://github.com/apache/flink/pull/28166#discussion_r3252510897
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
Review Comment:
Added a section where I describe the behavior in retract/upsert mode
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+ id INT,
+ op STRING,
+ name STRING,
+ event_time TIMESTAMP_LTZ(3),
+ WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
Review Comment:
Thanks for pointing this out. Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]