This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 334e01f0a48 [FLINK-39616][docs] Document ORDER BY usage with
FROM_CHANGELOG (#28166)
334e01f0a48 is described below
commit 334e01f0a48d38508abcd1096c44806d9739d88e
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon May 18 09:49:23 2026 +0200
[FLINK-39616][docs] Document ORDER BY usage with FROM_CHANGELOG (#28166)
CDC streams often deliver events out of order. The PTF ORDER BY clause
introduced in FLINK-39256 aligns naturally with FROM_CHANGELOG: partitioning by
the primary key and ordering by event time reorders out-of-order CDC events per
key before the operations are applied.
Adds a new "Ordering CDC events with ORDER BY" subsection to the
FROM_CHANGELOG documentation with the SQL example and the requirements
(watermarked time attribute, ASC on the first order column, set semantics via
PARTITION BY). Extends the Table API examples with the equivalent
partitionBy(...).orderBy(...).fromChangelog() pattern.
Adds a corresponding example in the PartitionedTable#fromChangelog Javadoc.
Adds a cross-reference from the PTF ORDER BY guide to FROM_CHANGELOG so
users discover the pattern from either side.
---
docs/content/docs/dev/table/functions/ptfs.md | 2 +
.../docs/sql/reference/queries/changelog.md | 60 +++++++++++++++++++++-
.../apache/flink/table/api/PartitionedTable.java | 9 ++++
3 files changed, 69 insertions(+), 2 deletions(-)
diff --git a/docs/content/docs/dev/table/functions/ptfs.md
b/docs/content/docs/dev/table/functions/ptfs.md
index 8957f36ea88..4bc78dc8a20 100644
--- a/docs/content/docs/dev/table/functions/ptfs.md
+++ b/docs/content/docs/dev/table/functions/ptfs.md
@@ -1095,6 +1095,8 @@ watermark for the given input table advances, at which
point all buffered rows w
less than or equal to the watermark are delivered to the eval() method in
sorted order. Late
events (arriving after the watermark) are dropped to maintain the ordering
guarantee.
+This pattern is particularly useful for the built-in [`FROM_CHANGELOG`]({{<
ref "docs/sql/reference/queries/changelog" >}}#from_changelog) PTF, which
converts an append-only CDC stream into a changelog. Combining `PARTITION BY`
on the primary key with `ORDER BY` on the event-time column reorders
out-of-order CDC events per key before the conversion operations are applied.
+
The following example demonstrates ordered processing with secondary sorting.
First, the function implementation:
```java
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index f1718642b9d..b02215129e5 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -45,7 +45,7 @@ Note: This version requires that your CDC data encodes
updates using a full imag
```sql
SELECT * FROM FROM_CHANGELOG(
- input => TABLE source_table [PARTITION BY key_col],
+ input => TABLE source_table [PARTITION BY key_col [ORDER BY time_col]],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP[
'c, r', 'INSERT',
@@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG(
| Parameter | Required | Description
[...]
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| `input` | Yes | The input table. Must be append-only. Use
`PARTITION BY` to ensure rows for the same key are processed together.
|
+| `input` | Yes | The input table. Must be append-only. Use
`PARTITION BY` to ensure rows for the same key are processed together. Add
`ORDER BY` (requires `PARTITION BY`) to reorder out-of-order events per key;
the first sort column must be a watermarked time attribute in `ASC` order.
[...]
| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING.
[...]
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change
operation may appear at most once across all entries. |
| `error_handling` | No | Controls behavior when an input row's operation code
is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) —
throw a `TableRuntimeException`, `SKIP` — silently drop the row. |
@@ -149,6 +149,55 @@ Prefer row semantics, when possible. `PARTITION BY` is
only necessary when downs
If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+#### Ordering CDC events with ORDER BY
+
+CDC streams can deliver events out of order. For example, a key's
`UPDATE_AFTER` may arrive before its matching `UPDATE_BEFORE` when events are
partitioned across upstream brokers. If the source itself does not guarantee
ordering, applying such a changelog directly produces incorrect state.
+
+`FROM_CHANGELOG` accepts an [ORDER BY clause]({{< ref
"docs/dev/table/functions/ptfs" >}}#ordering) that sorts events within each
partition before they are processed. The framework buffers events per partition
and flushes them to the function in sorted order once the watermark advances.
Late events (arriving after the watermark) are dropped.
+
+Requirements:
+
+* The input table must declare a `WATERMARK` on the time attribute used in
`ORDER BY`.
+* The first `ORDER BY` column must be that time attribute in `ASC` order.
+* `ORDER BY` requires `PARTITION BY` (set semantics). It cannot be combined
with row semantics.
+
+```sql
+-- Source declares a watermarked event time
+CREATE TABLE cdc_stream (
+ id INT,
+ op STRING,
+ name STRING,
+ event_time TIMESTAMP_LTZ(3),
+ WATERMARK FOR event_time AS event_time - INTERVAL '5' MINUTE
+) WITH (...);
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream
+ PARTITION BY id
+ ORDER BY event_time
+)
+```
+
+**How buffering and watermarks interact**
+
+Watermarks are separate events that flow through the stream alongside data
records. A data record arriving does not advance the watermark on its own; the
operator's current watermark advances only when a watermark event is received.
+
+Assume the watermark strategy advances the watermark `5` minutes behind the
largest observed `event_time`, and the current watermark at the PTF is `10:00`:
+
+| Event | Current watermark | Outcome
|
+|------------------------------------------------|-------------------|------------------------------------------------------------|
+| `+I[id: 6, op: 'INSERT', event_time: '10:05']` | `10:00` |
Buffered. Emitted later when the watermark passes `10:05`. |
+| `+I[id: 5, op: 'INSERT', event_time: '09:57']` | `10:00` |
Dropped. Timestamp is below the current watermark. |
+| `+I[id: 7, op: 'INSERT', event_time: '10:11']` | `10:00` |
Buffered. |
+| Watermark event advances to `10:06` | `10:06` | Record
`id=6` is emitted. |
+
+Late records can never be recovered after the watermark passes them. Choose
the watermark allowed-lateness based on the upstream's expected
out-of-orderness.
+
+FROM_CHANGELOG does not synthesize a replacement for a dropped event;
downstream consequences depend on the output changelog mode:
+
+* **Retract output**: retract semantics rely on `UPDATE_BEFORE`/`UPDATE_AFTER`
pairs. When `ORDER BY` drops a late `UPDATE_BEFORE` but its matching
`UPDATE_AFTER` arrives on time, downstream sees the `UPDATE_AFTER` alone.
Operators that rely on retract semantics (retract-style aggregations, retract
sinks) miss the retract and may produce incorrect results.
+* **Upsert output** (`PARTITION BY` + `op_mapping` without `UPDATE_BEFORE`): a
dropped `INSERT` or `UPDATE_AFTER` is recovered by the next `UPDATE_AFTER` for
that key, since upserts are idempotent. A dropped `DELETE` has no recovery path
— the row stays in downstream state until a future event for that key
overwrites it.
+
#### Invalid operation code handling
Two `error_handling` modes are supported. The job can either fail upon an
invalid or unknown op code, or skip the row and continue processing.
@@ -191,6 +240,13 @@ Table result = cdcStream.fromChangelog(
// Set semantics: co-locate rows with the same key in the same parallel
operator instance.
// Equivalent to PARTITION BY in SQL. The partition keys are prepended to the
output columns.
Table result = cdcStream.partitionBy($("id")).fromChangelog();
+
+// Reorder out-of-order CDC events per key by event time before applying the
changelog.
+// The input must declare a WATERMARK on the time attribute used in orderBy().
+Table result = cdcStream
+ .partitionBy($("id"))
+ .orderBy($("event_time").asc())
+ .fromChangelog();
```
## TO_CHANGELOG
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 7adfc13fc8d..3f65a22075b 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -252,6 +252,14 @@ public interface PartitionedTable {
* Table result = cdcStream
* .partitionBy($("id"))
* .fromChangelog(lit("SKIP").asArgument("error_handling"));
+ *
+ * // Reorder out-of-order CDC events per key by event time. The input
must declare a
+ * // WATERMARK on the time attribute used in orderBy(); the first order
column must be that
+ * // time attribute in ASC order.
+ * Table result = cdcStream
+ * .partitionBy($("id"))
+ * .orderBy($("event_time").asc())
+ * .fromChangelog();
* }</pre>
*
* @param arguments optional named arguments for {@code op}, {@code
op_mapping}, and {@code
@@ -259,6 +267,7 @@ public interface PartitionedTable {
* @return a dynamic {@link Table} with output schema {@code
[partition_keys,
* non_partition_non_op_input_columns]}
* @see Table#fromChangelog(Expression...)
+ * @see #orderBy(Expression...)
*/
Table fromChangelog(Expression... arguments);
}