This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 1bfbc35817d [FLINK-39156][docs] Document ON CONFLICT (#28237)
1bfbc35817d is described below

commit 1bfbc35817d43e9f7fb110be3fa957f99f0e5289
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon May 25 18:39:20 2026 +0200

    [FLINK-39156][docs] Document ON CONFLICT (#28237)
    
    This closes #27683.
    
    Co-authored-by: Dawid Wysakowicz <[email protected]>
---
 docs/content.zh/docs/sql/reference/dml/insert.md | 172 +++++++++++++++++++++++
 docs/content/docs/sql/reference/dml/insert.md    | 172 +++++++++++++++++++++++
 2 files changed, 344 insertions(+)

diff --git a/docs/content.zh/docs/sql/reference/dml/insert.md 
b/docs/content.zh/docs/sql/reference/dml/insert.md
index 53a3fa74107..0d879c1c91a 100644
--- a/docs/content.zh/docs/sql/reference/dml/insert.md
+++ b/docs/content.zh/docs/sql/reference/dml/insert.md
@@ -302,4 +302,176 @@ INSERT INTO students
 END;
 ```
 
+## ON CONFLICT clause
+
+When a query produces a changelog stream with an upsert key that differs from 
the sink table's primary key, multiple records with different upsert keys may 
map to the same primary key. The `ON CONFLICT` clause specifies how to resolve 
these primary key conflicts at the sink.
+
+### When is ON CONFLICT required?
+
+By default, Flink requires an explicit `ON CONFLICT` clause whenever the 
query's upsert key differs from the sink table's primary key. Without it, the 
query fails at planning time. This forces you to consider whether your query 
genuinely has a conflict scenario or whether there is a logic issue (e.g., a 
missing `GROUP BY`).
+
+This check is controlled by the configuration option 
`table.exec.sink.require-on-conflict` (default: `true`). Setting it to `false` 
restores the legacy behavior where no `ON CONFLICT` clause was required, but 
may lead to non-deterministic results.
+
+Alternatively, if you do not need consistency guarantees for conflicting keys, 
you can disable the sink upsert materializer entirely by setting 
`table.exec.sink.upsert-materialize` to `NONE`. This removes the materializer 
operator from the pipeline, so no buffering, compaction, or conflict resolution 
is performed. Records are passed directly to the sink as they arrive.
+
+### Syntax
+
+```sql
+[EXECUTE] INSERT INTO [catalog_name.][db_name.]table_name
+    select_statement
+    ON CONFLICT conflict_action
+
+conflict_action:
+    DO NOTHING
+  | DO ERROR
+  | DO DEDUPLICATE
+```
+
+### Strategies
+
+#### DO ERROR
+
+Throws an exception at runtime if multiple records with different upsert keys 
map to the same primary key. Use this when you believe no real conflict exists 
— for example, the planner could not prove that the upsert key matches the 
primary key, but you know they are logically equivalent.
+
+Buffered records are compacted on watermark progression before conflict 
checking, so transient disorder from changelog reordering does not cause false 
errors.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO ERROR;
+```
+
+#### DO NOTHING
+
+Keeps the first record that arrives for a given primary key and silently 
discards subsequent conflicting records. Use this when it is acceptable to drop 
duplicate primary key values from different upsert keys.
+
+Like `DO ERROR`, this strategy uses watermark-based compaction before applying 
conflict resolution.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+#### DO DEDUPLICATE
+
+{{< hint warning >}}
+`DO DEDUPLICATE` maintains the full history of changes per primary key in 
state to support rollback on retraction. This results in significantly higher 
state usage compared to `DO ERROR` and `DO NOTHING`.
+{{< /hint >}}
+
+Maintains the full history of changes per primary key so that retractions can 
be correctly rolled back. This is the most correct strategy when true 
multi-source updates to the same primary key occur and correctness cannot be 
sacrificed.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO DEDUPLICATE;
+```
+
+### How conflicts happen
+
+A conflict occurs when the query's upsert key differs from the sink table's 
primary key. For example, consider a join whose result has an upsert key 
derived from the join condition, but the target table has a different primary 
key. Records from different upstream upsert keys can then collide on the same 
primary key in the sink.
+
+Because retraction (`-U`) and update (`+U`) messages may travel different 
paths through the pipeline, they can arrive at the sink out of order. `DO 
ERROR` and `DO NOTHING` use watermark-based compaction to wait for a consistent 
set of changes before applying conflict resolution, preventing false positives 
from transient reordering.
+
+### Watermark-based compaction
+
+Changelog messages produced by operators such as joins can arrive at the sink 
out of order. A retraction (`-U`) for a row may arrive after a new insert 
(`+I`) for a different row that shares the same primary key, making it look 
like two active records exist for that key — a false conflict.
+
+Watermark-based compaction solves this by buffering incoming records keyed by 
their primary key and upsert key. When a watermark advances, all buffered 
records with timestamps up to that watermark are compacted: matching insert and 
retraction pairs for the same upsert key cancel each other out (for example, 
`+I` and `-D`, or `-U` and `+U` pairs).
+
+**Example.** Using the `orders JOIN products` query from above, suppose order 
1 changes its product from `Laptop` to `Phone` while order 3 is also for 
`Laptop`. The join emits these changelog records:
+
+```
++I[Laptop, 1]   -- upsert key: order_id=1
++I[Laptop, 3]   -- upsert key: order_id=3
+-U[Laptop, 1]   -- upsert key: order_id=1  (retraction for order 1's old 
product)
++U[Phone,  1]   -- upsert key: order_id=1  (order 1 now maps to Phone)
+```
+
+Without compaction, after the first two `+I` records arrive the operator sees 
two active records for PK `Laptop` with different upsert keys (`order_id=1` and 
`order_id=3`) — a false conflict. With compaction, the operator waits for the 
watermark. The retraction `-U[Laptop, 1]` then cancels the earlier `+I[Laptop, 
1]` (same upsert key `order_id=1`), leaving only `+I[Laptop, 3]` for PK 
`Laptop` — no conflict.
+
+After compaction, if zero or one record remains per primary key, there is no 
conflict. If multiple records with different upsert keys still remain, a 
genuine conflict exists and is resolved by the chosen strategy (`DO ERROR` or 
`DO NOTHING`). `DO DEDUPLICATE` does not use watermark-based compaction; 
instead, it maintains the full history of changes in state to support correct 
rollback on retraction.
+
+### Examples
+
+```sql
+-- Source and dimension tables
+CREATE TABLE orders (
+    order_id BIGINT,
+    product_name STRING,
+    quantity INT,
+    PRIMARY KEY(order_id) NOT ENFORCED
+) WITH (...);
+
+CREATE TABLE products (
+    name STRING,
+    PRIMARY KEY(name) NOT ENFORCED
+) WITH (...);
+
+-- Sink table
+CREATE TABLE product_orders (
+    product_name STRING,
+    last_order_id BIGINT,
+    PRIMARY KEY(product_name) NOT ENFORCED
+) WITH (...);
+
+-- This join produces an upsert key that may differ from the sink's PK,
+-- so ON CONFLICT is required.
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+Given the following data in the source tables:
+
+```
+orders:                                    products:
++----------+--------------+----------+    +--------+
+| order_id | product_name | quantity |    | name   |
++----------+--------------+----------+    +--------+
+| 1        | Laptop       | 2        |    | Laptop |
+| 2        | Phone        | 1        |    | Phone  |
+| 3        | Laptop       | 5        |    +--------+
++----------+--------------+----------+
+```
+
+The join produces these changelog records for `product_orders`:
+
+```
++I[Laptop, 1]  -- upsert key: order_id=1
++I[Phone,  2]  -- upsert key: order_id=2
++I[Laptop, 3]  -- upsert key: order_id=3  ← conflicts with order_id=1 on PK 
'Laptop'
+```
+
+Two records with different upsert keys (`order_id=1` and `order_id=3`) target 
the same
+primary key (`product_name='Laptop'`). This is the conflict each strategy 
resolves differently:
+
+- **`DO ERROR`** — throws a runtime exception because two distinct upsert keys 
map to the same primary key.
+- **`DO NOTHING`** — keeps the first record and discards the conflict:
+
+  | product_name | last_order_id |
+  |:-------------|:--------------|
+  | Laptop       | 1             |
+  | Phone        | 2             |
+
+- **`DO DEDUPLICATE`** — accepts both; the last arriving value is visible:
+
+  | product_name | last_order_id |
+  |:-------------|:--------------|
+  | Laptop       | 3             |
+  | Phone        | 2             |
+
+**What happens on retraction?** If order 3 is later deleted from the source, 
the join
+emits a retraction `-D[Laptop, 3]`:
+
+- **`DO NOTHING`** — the retraction has no effect because `(Laptop, 3)` was 
never written.
+  The Laptop row remains with `last_order_id=1`.
+- **`DO DEDUPLICATE`** — rolls back to the previous value. Laptop falls back 
to order 1,
+  producing `{(Laptop, 1), (Phone, 2)}`. The full history kept in state 
enables this
+  correct rollback.
+
 {{< top >}}
diff --git a/docs/content/docs/sql/reference/dml/insert.md 
b/docs/content/docs/sql/reference/dml/insert.md
index 4213b915479..11bc66632fe 100644
--- a/docs/content/docs/sql/reference/dml/insert.md
+++ b/docs/content/docs/sql/reference/dml/insert.md
@@ -312,5 +312,177 @@ INSERT INTO students
 END;
 ```
 
+## ON CONFLICT clause
+
+When a query produces an updating table with an upsert key that differs from 
the sink table's primary key, multiple records with different upsert keys may 
map to the same primary key. The `ON CONFLICT` clause specifies how to resolve 
these primary key conflicts at the sink.
+
+### When is ON CONFLICT required?
+
+By default, Flink requires an explicit `ON CONFLICT` clause whenever the 
query's upsert key differs from the sink table's primary key. Without it, the 
query fails at planning time. This forces you to consider whether your query 
genuinely has a conflict scenario or whether there is a logic issue (e.g., a 
missing `GROUP BY`).
+
+This check is controlled by the configuration option 
`table.exec.sink.require-on-conflict` (default: `true`). Setting it to `false` 
restores the legacy behavior where no `ON CONFLICT` clause was required, but 
may lead to non-deterministic results.
+
+Alternatively, if you do not need consistency guarantees for conflicting keys, 
you can disable the sink upsert materializer entirely by setting 
`table.exec.sink.upsert-materialize` to `NONE`. This removes the materializer 
operator from the pipeline, so no buffering, compaction, or conflict resolution 
is performed. Records are passed directly to the sink as they arrive.
+
+### Syntax
+
+```sql
+[EXECUTE] INSERT INTO [catalog_name.][db_name.]table_name
+    select_statement
+    ON CONFLICT conflict_action
+
+conflict_action:
+    DO NOTHING
+  | DO ERROR
+  | DO DEDUPLICATE
+```
+
+### Strategies
+
+#### DO ERROR
+
+Throws an exception at runtime if multiple records with different upsert keys 
map to the same primary key. Use this when you believe no real conflict exists 
— for example, the planner could not prove that the upsert key matches the 
primary key, but you know they are logically equivalent.
+
+Buffered records are compacted on watermark progression before conflict 
checking, so transient disorder from changelog reordering does not cause false 
errors.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO ERROR;
+```
+
+#### DO NOTHING
+
+Keeps the first record that arrives for a given primary key and silently 
discards subsequent conflicting records. Use this when it is acceptable to drop 
duplicate primary key values from different upsert keys.
+
+Like `DO ERROR`, this strategy uses watermark-based compaction before applying 
conflict resolution.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+#### DO DEDUPLICATE
+
+{{< hint warning >}}
+`DO DEDUPLICATE` maintains the full history of changes per primary key in 
state to support rollback on retraction. This results in significantly higher 
state usage compared to `DO ERROR` and `DO NOTHING`.
+{{< /hint >}}
+
+Maintains the full history of changes per primary key so that retractions can 
be correctly rolled back. This is the most correct strategy when true 
multi-source updates to the same primary key occur and correctness cannot be 
sacrificed.
+
+```sql
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO DEDUPLICATE;
+```
+
+### How conflicts happen
+
+A conflict occurs when the query's upsert key differs from the sink table's 
primary key. For example, consider a join whose result has an upsert key 
derived from the join condition, but the target table has a different primary 
key. Records from different upstream upsert keys can then collide on the same 
primary key in the sink.
+
+Because retraction (`-U`) and update (`+U`) messages may travel different 
paths through the pipeline, they can arrive at the sink out of order. `DO 
ERROR` and `DO NOTHING` use watermark-based compaction to wait for a consistent 
set of changes before applying conflict resolution, preventing false positives 
from transient reordering.
+
+### Watermark-based compaction
+
+Changelog messages produced by operators such as joins can arrive at the sink 
out of order. A retraction (`-U`) for a row may arrive after a new insert 
(`+I`) for a different row that shares the same primary key, making it look 
like two active records exist for that key — a false conflict.
+
+Watermark-based compaction solves this by buffering incoming records keyed by 
their primary key and upsert key. When a watermark advances, all buffered 
records with timestamps up to that watermark are compacted: matching insert and 
retraction pairs for the same upsert key cancel each other out (for example, 
`+I` and `-D`, or `-U` and `+U` pairs).
+
+**Example.** Using the `orders JOIN products` query from above, suppose order 
1 changes its product from `Laptop` to `Phone` while order 3 is also for 
`Laptop`. The join emits these changelog records:
+
+```
++I[Laptop, 1]   -- upsert key: order_id=1
++I[Laptop, 3]   -- upsert key: order_id=3
+-U[Laptop, 1]   -- upsert key: order_id=1  (retraction for order 1's old 
product)
++U[Phone,  1]   -- upsert key: order_id=1  (order 1 now maps to Phone)
+```
+
+Without compaction, after the first two `+I` records arrive the operator sees 
two active records for PK `Laptop` with different upsert keys (`order_id=1` and 
`order_id=3`) — a false conflict. With compaction, the operator waits for the 
watermark. The retraction `-U[Laptop, 1]` then cancels the earlier `+I[Laptop, 
1]` (same upsert key `order_id=1`), leaving only `+I[Laptop, 3]` for PK 
`Laptop` — no conflict.
+
+After compaction, if zero or one record remains per primary key, there is no 
conflict. If multiple records with different upsert keys still remain, a 
genuine conflict exists and is resolved by the chosen strategy (`DO ERROR` or 
`DO NOTHING`). `DO DEDUPLICATE` does not use watermark-based compaction; 
instead, it maintains the full history of changes in state to support correct 
rollback on retraction.
+
+### Examples
+
+```sql
+-- Source and dimension tables
+CREATE TABLE orders (
+    order_id BIGINT,
+    product_name STRING,
+    quantity INT,
+    PRIMARY KEY(order_id) NOT ENFORCED
+) WITH (...);
+
+CREATE TABLE products (
+    name STRING,
+    PRIMARY KEY(name) NOT ENFORCED
+) WITH (...);
+
+-- Sink table
+CREATE TABLE product_orders (
+    product_name STRING,
+    last_order_id BIGINT,
+    PRIMARY KEY(product_name) NOT ENFORCED
+) WITH (...);
+
+-- This join produces an upsert key that may differ from the sink's PK,
+-- so ON CONFLICT is required.
+INSERT INTO product_orders
+SELECT p.name, o.order_id
+FROM orders o JOIN products p ON o.product_name = p.name
+ON CONFLICT DO NOTHING;
+```
+
+Given the following data in the source tables:
+
+```
+orders:                                    products:
++----------+--------------+----------+    +--------+
+| order_id | product_name | quantity |    | name   |
++----------+--------------+----------+    +--------+
+| 1        | Laptop       | 2        |    | Laptop |
+| 2        | Phone        | 1        |    | Phone  |
+| 3        | Laptop       | 5        |    +--------+
++----------+--------------+----------+
+```
+
+The join produces these changelog records for `product_orders`:
+
+```
++I[Laptop, 1]  -- upsert key: order_id=1
++I[Phone,  2]  -- upsert key: order_id=2
++I[Laptop, 3]  -- upsert key: order_id=3  ← conflicts with order_id=1 on PK 
'Laptop'
+```
+
+Two records with different upsert keys (`order_id=1` and `order_id=3`) target 
the same
+primary key (`product_name='Laptop'`). This is the conflict each strategy 
resolves differently:
+
+- **`DO ERROR`** — throws a runtime exception because two distinct upsert keys 
map to the same primary key.
+- **`DO NOTHING`** — keeps the first record and discards the conflict:
+
+  | product_name | last_order_id |
+  |:-------------|:--------------|
+  | Laptop       | 1             |
+  | Phone        | 2             |
+
+- **`DO DEDUPLICATE`** — accepts both; the last arriving value is visible:
+
+  | product_name | last_order_id |
+  |:-------------|:--------------|
+  | Laptop       | 3             |
+  | Phone        | 2             |
+
+**What happens on retraction?** If order 3 is later deleted from the source, 
the join
+emits a retraction `-D[Laptop, 3]`:
+
+- **`DO NOTHING`** — the retraction has no effect because `(Laptop, 3)` was 
never written.
+  The Laptop row remains with `last_order_id=1`.
+- **`DO DEDUPLICATE`** — rolls back to the previous value. Laptop falls back 
to order 1,
+  producing `{(Laptop, 1), (Phone, 2)}`. The full history kept in state 
enables this
+  correct rollback.
+
 {{< top >}}
 

Reply via email to