This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ebcaf5b348 [FLINK-34256][docs] Add documentation for mini-batch
regular join
1ebcaf5b348 is described below
commit 1ebcaf5b34860026bfcf0ad78eaaf2847cce275c
Author: yeming <[email protected]>
AuthorDate: Wed Jan 31 14:07:35 2024 +0800
[FLINK-34256][docs] Add documentation for mini-batch regular join
This closes #24240
---
docs/content.zh/docs/dev/table/tuning.md | 30 ++++++++++++++++-
docs/content/docs/dev/table/tuning.md | 36 ++++++++++++++++++++-
docs/static/fig/table-streaming/minibatch_join.png | Bin 0 -> 249117 bytes
3 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/tuning.md
b/docs/content.zh/docs/dev/table/tuning.md
index 0fe999a57bd..7166f449572 100644
--- a/docs/content.zh/docs/dev/table/tuning.md
+++ b/docs/content.zh/docs/dev/table/tuning.md
@@ -29,7 +29,7 @@ under the License.
SQL 是数据分析中使用最广泛的语言。Flink Table API 和 SQL 使用户能够以更少的时间和精力定义高效的流分析应用程序。此外,Flink
Table API 和 SQL
是高效优化过的,它集成了许多查询优化和算子优化。但并不是所有的优化都是默认开启的,因此对于某些工作负载,可以通过打开某些选项来提高性能。
-在这一页,我们将介绍一些实用的优化选项以及流式聚合的内部原理,它们在某些情况下能带来很大的提升。
+在这一页,我们将介绍一些实用的优化选项以及流式聚合和普通连接的内部原理,它们在某些情况下能带来很大的提升。
{{< hint info >}}
目前 [分组聚合] ({{< ref "docs/dev/table/sql/queries/group-agg" >}}) 和
[窗口表值函数聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}})
(会话窗口表值函数聚合除外)都支持本页提到的流式聚合优化。
@@ -259,5 +259,33 @@ GROUP BY day
Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在
`user_id` 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。
+## MiniBatch Regular Joins
+
+默认情况下,regular join 算子是逐条处理输入的记录,即:(1)根据当前输入记录的 join key
关联对方状态中的记录,(2)根据当前记录写入或者撤回状态中的记录,(3)根据当前的输入记录和关联到的记录输出结果。
+这种处理模式可能会增加 StateBackend 的开销(尤其是对于 RocksDB StateBackend
)。除此之外,这会导致严重的中间结果放大。尤其在多级级联 join 的场景,会产生很多的中间结果从而导致性能降低。
+
+MiniBatch join 主要解决 regular join 存在的中间结果放大和 StateBackend
开销较大的问题。其核心思想是将一组输入的数据缓存在 join 算子内部的缓冲区中,一旦达到时间阈值或者缓存容量阈值,就触发 join 执行流程。
+这有两个主要的优化点:
+
+1) 在缓存中折叠数据,以此减少 join 的次数。
+2) 尽最大可能在处理数据时抑制冗余数据下发。
+
+以 left join 为例子,左右流的输入都是 join key 包含 unique key 的情况。假设 `id` 为 join key 和
unique key (数字代表 `id`, 字母代表 `content`), 具体 SQL 如下:
+
+```sql
+SET 'table.exec.mini-batch.enabled' = 'true';
+SET 'table.exec.mini-batch.allow-latency' = '5S';
+SET 'table.exec.mini-batch.size' = '5000';
+
+SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content
+FROM a LEFT JOIN b
+ON a.id = b.id
+```
+
+针对上述场景,mini-batch join 算子的具体处理过程如下图所示。
+
+{{< img src="/fig/table-streaming/minibatch_join.png" width="70%" height="70%"
>}}
+
+默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项
`table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和
`table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config"
>}}#execution-options)页面。
{{< top >}}
diff --git a/docs/content/docs/dev/table/tuning.md
b/docs/content/docs/dev/table/tuning.md
index b78c4687c36..d6c6c2e9f2c 100644
--- a/docs/content/docs/dev/table/tuning.md
+++ b/docs/content/docs/dev/table/tuning.md
@@ -28,7 +28,7 @@ under the License.
SQL is the most widely used language for data analytics. Flink's Table API and
SQL enables users to define efficient stream analytics applications in less
time and effort. Moreover, Flink Table API and SQL is effectively optimized, it
integrates a lot of query optimizations and tuned operator implementations. But
not all of the optimizations are enabled by default, so for some workloads, it
is possible to improve performance by turning on some options.
-In this page, we will introduce some useful optimization options and the
internals of streaming aggregation which will bring great improvement in some
cases.
+In this page, we will introduce some useful optimization options and the
internals of streaming aggregation, regular join which will bring great
improvement in some cases.
{{< hint info >}}
The streaming aggregation optimizations mentioned in this page are all
supported for [Group Aggregations]({{< ref
"docs/dev/table/sql/queries/group-agg" >}}) and [Window TVF Aggregations]({{<
ref "docs/dev/table/sql/queries/window-agg" >}}) (except Session Window TVF
Aggregation) now.
@@ -266,5 +266,39 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Regular Joins
+
+By default, regular join operator processes input records one by one, i.e.,
+(1) lookup associated records from the state of counterpart based on the join
key of the current input record,
+(2) update the state by adding current input record or retracting it,
+(3) output the join results according to the current record and associated
records.
+This processing pattern may increase the overhead of StateBackend (especially
for RocksDB StateBackend).
+Besides, this can lead to severe record amplification, especially in cascading
join scenarios, generating too many intermediate results and further leading to
performance degradation.
+
+MiniBatch join seeks to resolve the aforementioned issues. Its core idea is to
cache a bundle of inputs in a buffer inside of the mini-batch join operator.
+Once the buffer reaches a specified size or time threshold, the records are
forwarded to the join process.
+There are two core optimizations:
+
+1) fold records in the buffer to reduce the number of data before join process.
+2) try best to suppress outputting redundant results when the records in
buffer are being processed.
+
+For example, consider following SQL:
+
+```sql
+SET 'table.exec.mini-batch.enabled' = 'true';
+SET 'table.exec.mini-batch.allow-latency' = '5S';
+SET 'table.exec.mini-batch.size' = '5000';
+
+SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content
+FROM a LEFT JOIN b
+ON a.id = b.id
+```
+
+Both the left and right input side have unique key contained by join key which
is `id` (assuming the number represents `id`, and letter represents the
`content`).
+The execution of mini-batch join operator are as shown in the figure below.
+
+{{< img src="/fig/table-streaming/minibatch_join.png" width="70%" height="70%"
>}}
+
+MiniBatch optimization is disabled by default for regular join. In order to
enable this optimization, you should set options
`table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and
`table.exec.mini-batch.size`. Please see [configuration]({{< ref
"docs/dev/table/config" >}}#execution-options) page for more details.
{{< top >}}
diff --git a/docs/static/fig/table-streaming/minibatch_join.png
b/docs/static/fig/table-streaming/minibatch_join.png
new file mode 100644
index 00000000000..e4a60d6428d
Binary files /dev/null and b/docs/static/fig/table-streaming/minibatch_join.png
differ