This is an automated email from the ASF dual-hosted git repository.
gustavodemorais pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new 1bfbc35817d [FLINK-39156][docs] Document ON CONFLICT (#28237)
1bfbc35817d is described below
commit 1bfbc35817d43e9f7fb110be3fa957f99f0e5289
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon May 25 18:39:20 2026 +0200
[FLINK-39156][docs] Document ON CONFLICT (#28237)
This closes #27683.
Co-authored-by: Dawid Wysakowicz <[email protected]>
---
docs/content.zh/docs/sql/reference/dml/insert.md | 172 +++++++++++++++++++++++
docs/content/docs/sql/reference/dml/insert.md | 172 +++++++++++++++++++++++
2 files changed, 344 insertions(+)
diff --git a/docs/content.zh/docs/sql/reference/dml/insert.md
b/docs/content.zh/docs/sql/reference/dml/insert.md
index 53a3fa74107..0d879c1c91a 100644
--- a/docs/content.zh/docs/sql/reference/dml/insert.md
+++ b/docs/content.zh/docs/sql/reference/dml/insert.md
@@ -302,4 +302,176 @@ INSERT INTO students
END;
```
+## ON CONFLICT clause
+
+When a query produces a changelog stream with an upsert key that differs from
the sink table's primary key, multiple records with different upsert keys may
map to the same primary key. The `ON CONFLICT` clause specifies how to resolve
these primary key conflicts at the sink.
+
+### When is ON CONFLICT required?
+
+By default, Flink requires an explicit `ON CONFLICT` clause whenever the
query's upsert key differs from the sink table's primary key. Without it, the
query fails at planning time. This forces you to consider whether your query
genuinely has a conflict scenario or whether there is a logic issue (e.g., a
missing `GROUP BY`).
+
+This check is controlled by the configuration option
`table.exec.sink.require-on-conflict` (default: `true`). Setting it to `false`
restores the legacy behavior where no `ON CONFLICT` clause was required, but
may lead to non-deterministic results.
+
+Alternatively, if you do not need consistency guarantees for conflicting keys,
you can disable the sink upsert materializer entirely by setting
`table.exec.sink.upsert-materialize` to `NONE`. This removes the materializer
operator from the pipeline, so no buffering, compaction, or conflict resolution
is performed. Records are passed directly to the sink as they arrive.
+
+### Syntax
+
+```sql
+[EXECUTE] INSERT INTO [catalog_name.][db_name.]table_name
+ select_statement
+ ON CONFLICT conflict_action
+
+conflict_action:
+ DO NOTHING
+ | DO ERROR
+ | DO DEDUPLICATE
+```
+
+### Strategies
+
+#### DO ERROR
+
+Throws an exception at runtime if multiple records with different upsert keys
map to the same primary key. Use this when you believe no real conflict exists
— for example, the planner could not prove that the upsert key matches the
primary key, but you know they are logically equivalent.
+
+Buffered records are compacted on watermark progression before conflict
checking, so transient disorder from changelog reordering does not cause false
errors.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO ERROR;
+```
+
+#### DO NOTHING
+
+Keeps the first record that arrives for a given primary key and silently
discards subsequent conflicting records. Use this when it is acceptable to drop
duplicate primary key values from different upsert keys.
+
+Like `DO ERROR`, this strategy uses watermark-based compaction before applying
conflict resolution.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+#### DO DEDUPLICATE
+
+{{< hint warning >}}
+`DO DEDUPLICATE` maintains the full history of changes per primary key in
state to support rollback on retraction. This results in significantly higher
state usage compared to `DO ERROR` and `DO NOTHING`.
+{{< /hint >}}
+
+Maintains the full history of changes per primary key so that retractions can
be correctly rolled back. This is the most correct strategy when true
multi-source updates to the same primary key occur and correctness cannot be
sacrificed.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO DEDUPLICATE;
+```
+
+### How conflicts happen
+
+A conflict occurs when the query's upsert key differs from the sink table's
primary key. For example, consider a join whose result has an upsert key
derived from the join condition, but the target table has a different primary
key. Records from different upstream upsert keys can then collide on the same
primary key in the sink.
+
+Because retraction (`-U`) and update (`+U`) messages may travel different
paths through the pipeline, they can arrive at the sink out of order. `DO
ERROR` and `DO NOTHING` use watermark-based compaction to wait for a consistent
set of changes before applying conflict resolution, preventing false positives
from transient reordering.
+
+### Watermark-based compaction
+
+Changelog messages produced by operators such as joins can arrive at the sink
out of order. A retraction (`-U`) for a row may arrive after a new insert
(`+I`) for a different row that shares the same primary key, making it look
like two active records exist for that key — a false conflict.
+
+Watermark-based compaction solves this by buffering incoming records keyed by
their primary key and upsert key. When a watermark advances, all buffered
records with timestamps up to that watermark are compacted: matching insert and
retraction pairs for the same upsert key cancel each other out (for example,
`+I` and `-D`, or `-U` and `+U` pairs).
+
+**Example.** Using the `orders JOIN products` query from above, suppose order
1 changes its product from `Laptop` to `Phone` while order 3 is also for
`Laptop`. The join emits these changelog records:
+
+```
++I[Laptop, 1] -- upsert key: order_id=1
++I[Laptop, 3] -- upsert key: order_id=3
+-U[Laptop, 1] -- upsert key: order_id=1 (retraction for order 1's old
product)
++U[Phone, 1] -- upsert key: order_id=1 (order 1 now maps to Phone)
+```
+
+Without compaction, after the first two `+I` records arrive the operator sees
two active records for PK `Laptop` with different upsert keys (`order_id=1` and
`order_id=3`) — a false conflict. With compaction, the operator waits for the
watermark. The retraction `-U[Laptop, 1]` then cancels the earlier `+I[Laptop,
1]` (same upsert key `order_id=1`), leaving only `+I[Laptop, 3]` for PK
`Laptop` — no conflict.
+
+After compaction, if zero or one record remains per primary key, there is no
conflict. If multiple records with different upsert keys still remain, a
genuine conflict exists and is resolved by the chosen strategy (`DO ERROR` or
`DO NOTHING`). `DO DEDUPLICATE` does not use watermark-based compaction;
instead, it maintains the full history of changes in state to support correct
rollback on retraction.
+
+### Examples
+
+```sql
+-- Source and dimension tables
+CREATE TABLE orders (
+ order_id BIGINT,
+ product_name STRING,
+ quantity INT,
+ PRIMARY KEY(order_id) NOT ENFORCED
+) WITH (...);
+
+CREATE TABLE products (
+ name STRING,
+ PRIMARY KEY(name) NOT ENFORCED
+) WITH (...);
+
+-- Sink table
+CREATE TABLE product_orders (
+ product_name STRING,
+ last_order_id BIGINT,
+ PRIMARY KEY(product_name) NOT ENFORCED
+) WITH (...);
+
+-- This join produces an upsert key that may differ from the sink's PK,
+-- so ON CONFLICT is required.
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+Given the following data in the source tables:
+
+```
+orders: products:
++----------+--------------+----------+ +--------+
+| order_id | product_name | quantity | | name |
++----------+--------------+----------+ +--------+
+| 1 | Laptop | 2 | | Laptop |
+| 2 | Phone | 1 | | Phone |
+| 3 | Laptop | 5 | +--------+
++----------+--------------+----------+
+```
+
+The join produces these changelog records for `product_orders`:
+
+```
++I[Laptop, 1] -- upsert key: order_id=1
++I[Phone, 2] -- upsert key: order_id=2
++I[Laptop, 3] -- upsert key: order_id=3 ← conflicts with order_id=1 on PK
'Laptop'
+```
+
+Two records with different upsert keys (`order_id=1` and `order_id=3`) target
the same
+primary key (`product_name='Laptop'`). This is the conflict each strategy
resolves differently:
+
+- **`DO ERROR`** — throws a runtime exception because two distinct upsert keys
map to the same primary key.
+- **`DO NOTHING`** — keeps the first record and discards the conflict:
+
+ | product_name | last_order_id |
+ |:-------------|:--------------|
+ | Laptop | 1 |
+ | Phone | 2 |
+
+- **`DO DEDUPLICATE`** — accepts both; the last arriving value is visible:
+
+ | product_name | last_order_id |
+ |:-------------|:--------------|
+ | Laptop | 3 |
+ | Phone | 2 |
+
+**What happens on retraction?** If order 3 is later deleted from the source,
the join
+emits a retraction `-D[Laptop, 3]`:
+
+- **`DO NOTHING`** — the retraction has no effect because `(Laptop, 3)` was
never written.
+ The Laptop row remains with `last_order_id=1`.
+- **`DO DEDUPLICATE`** — rolls back to the previous value. Laptop falls back
to order 1,
+ producing `{(Laptop, 1), (Phone, 2)}`. The full history kept in state
enables this
+ correct rollback.
+
{{< top >}}
diff --git a/docs/content/docs/sql/reference/dml/insert.md
b/docs/content/docs/sql/reference/dml/insert.md
index 4213b915479..11bc66632fe 100644
--- a/docs/content/docs/sql/reference/dml/insert.md
+++ b/docs/content/docs/sql/reference/dml/insert.md
@@ -312,5 +312,177 @@ INSERT INTO students
END;
```
+## ON CONFLICT clause
+
+When a query produces an updating table with an upsert key that differs from
the sink table's primary key, multiple records with different upsert keys may
map to the same primary key. The `ON CONFLICT` clause specifies how to resolve
these primary key conflicts at the sink.
+
+### When is ON CONFLICT required?
+
+By default, Flink requires an explicit `ON CONFLICT` clause whenever the
query's upsert key differs from the sink table's primary key. Without it, the
query fails at planning time. This forces you to consider whether your query
genuinely has a conflict scenario or whether there is a logic issue (e.g., a
missing `GROUP BY`).
+
+This check is controlled by the configuration option
`table.exec.sink.require-on-conflict` (default: `true`). Setting it to `false`
restores the legacy behavior where no `ON CONFLICT` clause was required, but
may lead to non-deterministic results.
+
+Alternatively, if you do not need consistency guarantees for conflicting keys,
you can disable the sink upsert materializer entirely by setting
`table.exec.sink.upsert-materialize` to `NONE`. This removes the materializer
operator from the pipeline, so no buffering, compaction, or conflict resolution
is performed. Records are passed directly to the sink as they arrive.
+
+### Syntax
+
+```sql
+[EXECUTE] INSERT INTO [catalog_name.][db_name.]table_name
+ select_statement
+ ON CONFLICT conflict_action
+
+conflict_action:
+ DO NOTHING
+ | DO ERROR
+ | DO DEDUPLICATE
+```
+
+### Strategies
+
+#### DO ERROR
+
+Throws an exception at runtime if multiple records with different upsert keys
map to the same primary key. Use this when you believe no real conflict exists
— for example, the planner could not prove that the upsert key matches the
primary key, but you know they are logically equivalent.
+
+Buffered records are compacted on watermark progression before conflict
checking, so transient disorder from changelog reordering does not cause false
errors.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO ERROR;
+```
+
+#### DO NOTHING
+
+Keeps the first record that arrives for a given primary key and silently
discards subsequent conflicting records. Use this when it is acceptable to drop
duplicate primary key values from different upsert keys.
+
+Like `DO ERROR`, this strategy uses watermark-based compaction before applying
conflict resolution.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+#### DO DEDUPLICATE
+
+{{< hint warning >}}
+`DO DEDUPLICATE` maintains the full history of changes per primary key in
state to support rollback on retraction. This results in significantly higher
state usage compared to `DO ERROR` and `DO NOTHING`.
+{{< /hint >}}
+
+Maintains the full history of changes per primary key so that retractions can
be correctly rolled back. This is the most correct strategy when true
multi-source updates to the same primary key occur and correctness cannot be
sacrificed.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO DEDUPLICATE;
+```
+
+### How conflicts happen
+
+A conflict occurs when the query's upsert key differs from the sink table's
primary key. For example, consider a join whose result has an upsert key
derived from the join condition, but the target table has a different primary
key. Records from different upstream upsert keys can then collide on the same
primary key in the sink.
+
+Because retraction (`-U`) and update (`+U`) messages may travel different
paths through the pipeline, they can arrive at the sink out of order. `DO
ERROR` and `DO NOTHING` use watermark-based compaction to wait for a consistent
set of changes before applying conflict resolution, preventing false positives
from transient reordering.
+
+### Watermark-based compaction
+
+Changelog messages produced by operators such as joins can arrive at the sink
out of order. A retraction (`-U`) for a row may arrive after a new insert
(`+I`) for a different row that shares the same primary key, making it look
like two active records exist for that key — a false conflict.
+
+Watermark-based compaction solves this by buffering incoming records keyed by
their primary key and upsert key. When a watermark advances, all buffered
records with timestamps up to that watermark are compacted: matching insert and
retraction pairs for the same upsert key cancel each other out (for example,
`+I` and `-D`, or `-U` and `+U` pairs).
+
+**Example.** Using the `orders JOIN products` query from above, suppose order
1 changes its product from `Laptop` to `Phone` while order 3 is also for
`Laptop`. The join emits these changelog records:
+
+```
++I[Laptop, 1] -- upsert key: order_id=1
++I[Laptop, 3] -- upsert key: order_id=3
+-U[Laptop, 1] -- upsert key: order_id=1 (retraction for order 1's old
product)
++U[Phone, 1] -- upsert key: order_id=1 (order 1 now maps to Phone)
+```
+
+Without compaction, after the first two `+I` records arrive the operator sees
two active records for PK `Laptop` with different upsert keys (`order_id=1` and
`order_id=3`) — a false conflict. With compaction, the operator waits for the
watermark. The retraction `-U[Laptop, 1]` then cancels the earlier `+I[Laptop,
1]` (same upsert key `order_id=1`), leaving only `+I[Laptop, 3]` for PK
`Laptop` — no conflict.
+
+After compaction, if zero or one record remains per primary key, there is no
conflict. If multiple records with different upsert keys still remain, a
genuine conflict exists and is resolved by the chosen strategy (`DO ERROR` or
`DO NOTHING`). `DO DEDUPLICATE` does not use watermark-based compaction;
instead, it maintains the full history of changes in state to support correct
rollback on retraction.
+
+### Examples
+
+```sql
+-- Source and dimension tables
+CREATE TABLE orders (
+ order_id BIGINT,
+ product_name STRING,
+ quantity INT,
+ PRIMARY KEY(order_id) NOT ENFORCED
+) WITH (...);
+
+CREATE TABLE products (
+ name STRING,
+ PRIMARY KEY(name) NOT ENFORCED
+) WITH (...);
+
+-- Sink table
+CREATE TABLE product_orders (
+ product_name STRING,
+ last_order_id BIGINT,
+ PRIMARY KEY(product_name) NOT ENFORCED
+) WITH (...);
+
+-- This join produces an upsert key that may differ from the sink's PK,
+-- so ON CONFLICT is required.
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+Given the following data in the source tables:
+
+```
+orders: products:
++----------+--------------+----------+ +--------+
+| order_id | product_name | quantity | | name |
++----------+--------------+----------+ +--------+
+| 1 | Laptop | 2 | | Laptop |
+| 2 | Phone | 1 | | Phone |
+| 3 | Laptop | 5 | +--------+
++----------+--------------+----------+
+```
+
+The join produces these changelog records for `product_orders`:
+
+```
++I[Laptop, 1] -- upsert key: order_id=1
++I[Phone, 2] -- upsert key: order_id=2
++I[Laptop, 3] -- upsert key: order_id=3 ← conflicts with order_id=1 on PK
'Laptop'
+```
+
+Two records with different upsert keys (`order_id=1` and `order_id=3`) target
the same
+primary key (`product_name='Laptop'`). This is the conflict each strategy
resolves differently:
+
+- **`DO ERROR`** — throws a runtime exception because two distinct upsert keys
map to the same primary key.
+- **`DO NOTHING`** — keeps the first record and discards the conflict:
+
+ | product_name | last_order_id |
+ |:-------------|:--------------|
+ | Laptop | 1 |
+ | Phone | 2 |
+
+- **`DO DEDUPLICATE`** — accepts both; the last arriving value is visible:
+
+ | product_name | last_order_id |
+ |:-------------|:--------------|
+ | Laptop | 3 |
+ | Phone | 2 |
+
+**What happens on retraction?** If order 3 is later deleted from the source,
the join
+emits a retraction `-D[Laptop, 3]`:
+
+- **`DO NOTHING`** — the retraction has no effect because `(Laptop, 3)` was
never written.
+ The Laptop row remains with `last_order_id=1`.
+- **`DO DEDUPLICATE`** — rolls back to the previous value. Laptop falls back
to order 1,
+ producing `{(Laptop, 1), (Phone, 2)}`. The full history kept in state
enables this
+ correct rollback.
+
{{< top >}}