fhueske commented on code in PR #28166:
URL: https://github.com/apache/flink/pull/28166#discussion_r3249957448


##########
docs/content/docs/dev/table/functions/ptfs.md:
##########
@@ -1095,6 +1095,8 @@ watermark for the given input table advances, at which 
point all buffered rows w
 less than or equal to the watermark are delivered to the eval() method in 
sorted order. Late
 events (arriving after the watermark) are dropped to maintain the ordering 
guarantee.
 
+This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< 
ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which 
converts an append-only CDC stream into a changelog. Combining `PARTITION BY` 
on the primary key with `ORDER BY` on the event-time column reorders 
out-of-order CDC events per key before the operations are applied.

Review Comment:
   ```suggestion
   This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{< 
ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which 
converts an append-only CDC stream into a changelog. Combining `PARTITION BY` 
on the primary key with `ORDER BY` on the event-time column reorders 
out-of-order CDC events per key before the conversion operations are applied.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's 
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are 
partitioned across upstream brokers. If the source itself does not guarantee 
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref 
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each 
partition before they are processed. The framework buffers events per partition 
and flushes them to the function in sorted order once the watermark advances. 
Late events (arriving after the watermark) are dropped.

Review Comment:
   Explain what happens with a record that matches a dropped record?
   For example what happens to an `UPDATE_AFTER` that wasn't late if its 
matching `UPDATE_BEFORE` is dropped? 



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's 
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are 
partitioned across upstream brokers. If the source itself does not guarantee 
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref 
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each 
partition before they are processed. The framework buffers events per partition 
and flushes them to the function in sorted order once the watermark advances. 
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in 
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined 
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+  id INT,
+  op STRING,
+  name STRING,
+  event_time TIMESTAMP_LTZ(3),
+  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+    PARTITION BY id
+    ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Assume the watermark strategy advances the watermark `5` minutes behind the 
largest observed `event_time`, and the current watermark is `10:00`:
+
+| Incoming row | Current watermark | Outcome |
+|---|---|---|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted 
later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. 
Timestamp is below the current watermark. |

Review Comment:
   ```suggestion
   | `+I[id: 5, op: 'INSERT', event_time: '09:57']` | `10:00` | Dropped. 
Timestamp is below the current watermark. |
   ```
   Otherwise, readers might think that the 5 min difference between `9:55` and 
`10:00` would have any relevance.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's 
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are 
partitioned across upstream brokers. If the source itself does not guarantee 
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref 
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each 
partition before they are processed. The framework buffers events per partition 
and flushes them to the function in sorted order once the watermark advances. 
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in 
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined 
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+  id INT,
+  op STRING,
+  name STRING,
+  event_time TIMESTAMP_LTZ(3),
+  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+    PARTITION BY id
+    ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Assume the watermark strategy advances the watermark `5` minutes behind the 
largest observed `event_time`, and the current watermark is `10:00`:
+
+| Incoming row | Current watermark | Outcome |
+|---|---|---|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` | Buffered. Emitted 
later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:55']` | `10:00` | Dropped. 
Timestamp is below the current watermark. |
+| `+I[id: 7, op: 'INSERT', event_time: '10:11']` | `10:06` | Record `id=6` is 
emitted; this row is buffered until the watermark passes `10:11`. |

Review Comment:
   to be precise, the watermark would only be incremented after the record was 
processed. 
   So "Current Watermark" shouldn't be incremented to `10:06` yet. 
   This happens when a new WM message is received and processed by the PTF.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -149,6 +149,47 @@ Prefer row semantics, when possible. `PARTITION BY` is 
only necessary when downs
 
 If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
 
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's 
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are 
partitioned across upstream brokers. If the source itself does not guarantee 
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref 
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each 
partition before they are processed. The framework buffers events per partition 
and flushes them to the function in sorted order once the watermark advances. 
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in 
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined 
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+  id INT,
+  op STRING,
+  name STRING,
+  event_time TIMESTAMP_LTZ(3),
+  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

Review Comment:
   ```suggestion
     WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
   ```
   to keep it aligned with the example below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to