fhueske commented on code in PR #28166:
URL: https://github.com/apache/flink/pull/28166#discussion_r3249957448
##########
docs/content/docs/dev/table/functions/ptfs.md:
##########
@@ -1095,6 +1095,8 @@ watermark for the given input table advances, at which
point all buffered rows w
less than or equal to the watermark are delivered to the eval() method in
sorted order. Late
events (arriving after the watermark) are dropped to maintain the ordering
guarantee.
+This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{<
ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which
converts an append-only CDC stream into a changelog. Combining `PARTITION BY`
on the primary key with `ORDER BY` on the event-time column reorders
out-of-order CDC events per key before the operations are applied.
Review Comment:
```suggestion
This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{<
ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which
converts an append-only CDC stream into a changelog. Combining `PARTITION BY`
on the primary key with `ORDER BY` on the event-time column reorders
out-of-order CDC events per key before the conversion operations are applied.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
Review Comment:
Explain what happens with a record that matches a dropped record?
For example what happens to an `UPDATE_AFTER` that wasn't late if its
matching `UPDATE_BEFORE` is dropped?
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+ id INT,
+ op STRING,
+ name STRING,
+ event_time TIMESTAMP_LTZ(3),
+ WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream
+ PARTITION BY id
+ ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Assume the watermark strategy advances the watermark `5` minutes behind the
largest observed `event_time`, and the current watermark is `10:00`:
+
+| Incoming row | Current watermark | Outcome |
+|---|---|---|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted
later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped.
Timestamp is below the current watermark. |
Review Comment:
```suggestion
| `+I[id: 5, op: 'INSERT', event_time: '09:57']` | `10:00` | Dropped.
Timestamp is below the current watermark. |
```
Otherwise, readers might think that the 5 min difference between `9:55` and
`10:00` would have any relevance.
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+ id INT,
+ op STRING,
+ name STRING,
+ event_time TIMESTAMP_LTZ(3),
+ WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream
+ PARTITION BY id
+ ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Assume the watermark strategy advances the watermark `5` minutes behind the
largest observed `event_time`, and the current watermark is `10:00`:
+
+| Incoming row | Current watermark | Outcome |
+|---|---|---|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted
later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped.
Timestamp is below the current watermark. |
+| `+I[id: 7, op: 'INSERT', event_time: '10:11']` | `10:06` | Record `id=6` is
emitted; this row is buffered until the watermark passes `10:11`. |
Review Comment:
to be precise, the watermark would only be incremented after the record was
processed.
So "Current Watermark" shouldn't be incremented to `10:06` yet.
This happens when a new WM message is received and processed by the PTF.
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+ id INT,
+ op STRING,
+ name STRING,
+ event_time TIMESTAMP_LTZ(3),
+ WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
Review Comment:
```suggestion
WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
```
to keep it aligned with the example below?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]