This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new 71fb41984cf [FLINK-38611][doc] Add doc for delta join (#27225)
71fb41984cf is described below

commit 71fb41984cf8af69828dcd6fc70c1ce7d023d281
Author: Xuyang <[email protected]>
AuthorDate: Fri Nov 14 13:32:13 2025 +0800

    [FLINK-38611][doc] Add doc for delta join (#27225)
    
    * [FLINK-38611][doc] Add doc for delta join
    
    (cherry picked from commit bcd8d7f5d743b5344be354fd88b18f1e3d5a5f39)
    
    * [FLINK-38625][doc] Fix broken anchor links about the table options in 
Performance Tuning page
    
    (cherry picked from commit 1029df0f6bc85b892abdd0453101d037b4a6c9b4)
---
 docs/content.zh/docs/dev/table/config.md       |  10 +++++
 docs/content.zh/docs/dev/table/tuning.md       |  57 +++++++++++++++++++++++++
 docs/content/docs/dev/table/tuning.md          |  54 +++++++++++++++++++++++
 docs/static/fig/table-streaming/delta_join.png | Bin 0 -> 89632 bytes
 4 files changed, 121 insertions(+)

diff --git a/docs/content.zh/docs/dev/table/config.md 
b/docs/content.zh/docs/dev/table/config.md
index 1748fde1b72..258993043e1 100644
--- a/docs/content.zh/docs/dev/table/config.md
+++ b/docs/content.zh/docs/dev/table/config.md
@@ -116,30 +116,40 @@ Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
 {{< /tab >}}
 {{< /tabs >}}
 
+<a name="execution-options" />
+
 ### 执行配置
 
 以下选项可用于优化查询执行的性能。
 
 {{< generated/execution_config_configuration >}}
 
+<a name="optimizer-options" />
+
 ### 优化器配置
 
 以下配置可以用于调整查询优化器的行为以获得更好的执行计划。
 
 {{< generated/optimizer_config_configuration >}}
 
+<a name="table-options" />
+
 ### Planner 配置
 
 以下配置可以用于调整 planner 的行为。
 
 {{< generated/table_config_configuration >}}
 
+<a name="materialized-table-options" />
+
 ### Materialized Table 配置
 
 以下配置可以用于调整 Materialized Table 的行为。
 
 {{< generated/materialized_table_config_configuration >}}
 
+<a name="sql-client-options" />
+
 ### SQL Client 配置
 
 以下配置可以用于调整 sql client 的行为。
diff --git a/docs/content.zh/docs/dev/table/tuning.md 
b/docs/content.zh/docs/dev/table/tuning.md
index 7166f449572..e8ff6d8d2c9 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -289,3 +289,60 @@ ON a.id = b.id
 默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 
`table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和 
`table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" 
>}}#execution-options)页面。
 
 {{< top >}}
+
+## Delta Joins
+
+在流作业中,regular join 
会维护来自两个输入的所有历史数据,以确保结果的准确性。随着时间的推移,这会导致状态不断增长,从而增加资源的使用,并影响作业的稳定性。
+
+为了应对这些挑战,Flink 引入了 delta join 算子。其核心思想是基于双向 lookup join 来替代 regular join 
所维护的大状态,直接重用源表中的数据。与传统的 regular join 相比,delta join 
显著减少了状态大小,提高了作业的稳定性,并降低了总体的资源消耗。
+
+该功能默认启用。当满足以下所有条件时, regular join 将自动优化为 delta join。
+
+1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)。
+2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 
中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 
文档](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)。
+
+### 工作原理
+
+在 Flink 中,regular join 将来自两个输入端的所有输入数据存储在状态中,以确保当对侧的数据到达时,能够正确地匹配对应的记录。
+
+相比之下,delta join 利用了外部存储系统的索引功能,并不执行状态查找,而是直接对外部存储发出高效的、基于索引的查询,以获取匹配的记录。该方法消除了 
Flink 状态与外部系统之间冗余的数据存储。
+
+{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+
+### 关键参数
+
+Delta join 优化默认启用。您可以通过设置以下配置手动禁用此功能:
+
+```sql
+SET 'table.optimizer.delta-join.strategy' = 'NONE';
+```
+
+详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#optimizer-options)页面。
+
+您还可以配置以下参数来调整优化 delta join 的性能。
+
+- `table.exec.delta-join.cache-enabled`
+- `table.exec.delta-join.left.cache-size`
+- `table.exec.delta-join.right.cache-size`
+
+详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
+
+<a name="supported-features-and-limitations" />
+
+### 支持的功能和限制
+
+目前 delta join 仍在持续演进中,当前版本已支持的功能如下:
+
+1. 支持 **INSERT-only** 的表作为源表。
+2. 支持不带 **DELETE 操作**的 **CDC** 表作为源表。
+3. 支持源表和 delta join 间包含 **project** 和 **filter** 算子。
+4. Delta join 算子内支持**缓存**。
+
+然而,delta join 也存在几个**限制**,包含以下任何条件的作业无法优化为 delta join。
+
+1. 表的**索引键**必须包含在 join 的**等值条件**中
+2. 目前仅支持 **INNER JOIN**。
+3. **下游节点**必须能够处理**冗余变更**。例如以 **UPSERT 模式**运行、不带 `upsertMaterialize` 的 sink 节点。
+4. 当消费 **CDC 流**时,**join key** 必须是**主键**的一部分。
+5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key** 上。
+6. 所有 project 和 filter 都不能包含**非确定性函数**。
diff --git a/docs/content/docs/dev/table/tuning.md 
b/docs/content/docs/dev/table/tuning.md
index ae372f08296..81aaa020944 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -368,3 +368,57 @@ FROM TenantKafka t
          LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
 ```
 
+## Delta Joins
+
+In streaming jobs, regular joins keep all historical data from both inputs to 
ensure accuracy. Over time, this causes the state to grow continuously, 
increasing resource usage and impacting stability. 
+
+To mitigate these challenges, Flink introduces the delta join operator. The 
key idea is to replace the large state maintained by regular joins with a 
bidirectional lookup-based join that directly reuses data from the source 
tables. Compared to traditional regular joins, delta joins substantially reduce 
state size, enhances job stability, and lowers overall resource consumption.
+
+This feature is enabled by default. A regular join will be automatically 
optimized into a delta join when all the following conditions are met:
+
+1. The SQL pattern satisfies the optimization criteria. For details, please 
refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" 
>}}#supported-features-and-limitations)
+2. The external storage system of the source table provides index information 
for fast querying for delta joins. Currently, [Apache 
Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has 
provided index information at the table level for Flink, allowing such tables 
to be used as source tables for delta joins. Please refer to the [Fluss 
documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
 for more details.
+
+### Working Principle
+
+In Flink, regular joins store all incoming records from both input sides in 
the state to ensure that corresponding records can be matched correctly when 
data arrives from the opposite side.
+
+In contrast, delta joins leverage the indexing capabilities of external 
storage systems. Instead of performing state lookups, delta joins issue 
efficient index-based queries directly against the external storage to retrieve 
matching records. This approach eliminates redundant data storage between the 
Flink state and the external system.
+
+{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
+
+### Important Configurations
+
+Delta join optimization is enabled by default. You can disable this feature 
manually by setting the following configuration:
+
+```sql
+SET 'table.optimizer.delta-join.strategy' = 'NONE';
+```
+
+Please see [Configuration]({{< ref "docs/dev/table/config" 
>}}#optimizer-options) page for more details.
+
+To fine-tune the performance of delta joins, you can also configure the 
following parameters:
+
+- `table.exec.delta-join.cache-enabled`
+- `table.exec.delta-join.left.cache-size`
+- `table.exec.delta-join.right.cache-size`
+
+Please see [Configuration]({{< ref "docs/dev/table/config" 
>}}#execution-options) page for more details.
+
+### Supported Features and Limitations
+
+Delta joins are continuously evolving, and supports the following features 
currently.
+
+1. Support for **INSERT-only** tables as source tables.
+2. Support for **CDC** tables without **DELETE operations** as source tables.
+3. Support for **projection** and **filter** operations between the source and 
the delta join.
+4. Support for **caching** within the delta join operator.
+
+However, Delta Joins also have several **limitations**. Jobs containing any of 
the following conditions cannot be optimized into a delta join:
+
+1. The **index key** of the table must be included in the join’s **equivalence 
conditions**.
+2. Only **INNER JOIN** is currently supported.
+3. The **downstream operator** must be able to handle **duplicate changes**, 
such as a sink operating in **UPSERT mode** without `upsertMaterialize`.
+4. When consuming a **CDC stream**, the **join key** must be part of the 
**primary key**.
+5. When consuming a **CDC stream**, all **filters** must be applied on the 
**upsert key**.
+6. **Non-deterministic functions** are not allowed in filters or projections.
\ No newline at end of file
diff --git a/docs/static/fig/table-streaming/delta_join.png 
b/docs/static/fig/table-streaming/delta_join.png
new file mode 100644
index 00000000000..6645cef4364
Binary files /dev/null and b/docs/static/fig/table-streaming/delta_join.png 
differ

Reply via email to