lsyldliu commented on code in PR #24240:
URL: https://github.com/apache/flink/pull/24240#discussion_r1477184327
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
Review Comment:
MiniBatch Regular Joins. This optimization only works for Regular joins, not
other joins such as window join、temporal join、lookup join.
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e.,
(1) look up records from state according to joinKey, (2) write or retract input
in state, (3) process the input and joined records. This processing pattern may
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer
inside of the mini-batch join operator. Reduce data in the cache, and then when
the cache is triggered for processing, perform specific optimizations based on
certain scenarios. Some of input records would be folded according to specified
rule illustrated below:
+
+{{< img src="/fig/table-streaming/folded.png" width="70%" height="70%" >}}
+
+When the bundle of inputs is triggered to be processed, the inputs inside of
the bundle are records that could not be folded further. Another optimization
for update records is applied for the bundle. When encountering the pair of -U
and +U records, they would be recognized and redundant records in their output
would be suppressed. The graph below explains the principle here.
+
+{{< img src="/fig/table-streaming/suppress.jpg" width="70%" height="70%" >}}
+
+Besides, the order in which the left and right stream bundles are processed
can also help reduce redundant records, but this only applies to cases where
there is an outer join. The following graph clarifies the principle:
+
+{{< img src="/fig/table-streaming/order.jpg" width="70%" height="70%" >}}
+
+MiniBatch optimization is disabled by default for regular join. In order to
enable this optimization, you should set options
`table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and
`table.exec.mini-batch.size`. Please see [configuration]({{< ref
"docs/dev/table/config" >}}#execution-options) page for more details.
+The examples could refer to the part of MiniBatch Aggregation.
Review Comment:
Please link it and click on it to jump over.
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e.,
(1) look up records from state according to joinKey, (2) write or retract input
in state, (3) process the input and joined records. This processing pattern may
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer
inside of the mini-batch join operator. Reduce data in the cache, and then when
the cache is triggered for processing, perform specific optimizations based on
certain scenarios. Some of input records would be folded according to specified
rule illustrated below:
+
+{{< img src="/fig/table-streaming/folded.png" width="70%" height="70%" >}}
+
+When the bundle of inputs is triggered to be processed, the inputs inside of
the bundle are records that could not be folded further. Another optimization
for update records is applied for the bundle. When encountering the pair of -U
and +U records, they would be recognized and redundant records in their output
would be suppressed. The graph below explains the principle here.
Review Comment:
Delete this paragraph.
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e.,
(1) look up records from state according to joinKey, (2) write or retract input
in state, (3) process the input and joined records. This processing pattern may
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer
inside of the mini-batch join operator. Reduce data in the cache, and then when
the cache is triggered for processing, perform specific optimizations based on
certain scenarios. Some of input records would be folded according to specified
rule illustrated below:
+
+{{< img src="/fig/table-streaming/folded.png" width="70%" height="70%" >}}
+
+When the bundle of inputs is triggered to be processed, the inputs inside of
the bundle are records that could not be folded further. Another optimization
for update records is applied for the bundle. When encountering the pair of -U
and +U records, they would be recognized and redundant records in their output
would be suppressed. The graph below explains the principle here.
+
+{{< img src="/fig/table-streaming/suppress.jpg" width="70%" height="70%" >}}
+
+Besides, the order in which the left and right stream bundles are processed
can also help reduce redundant records, but this only applies to cases where
there is an outer join. The following graph clarifies the principle:
Review Comment:
Delete this paragraph.
##########
docs/content.zh/docs/dev/table/tuning.md:
##########
@@ -259,5 +259,23 @@ GROUP BY day
Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在
`user_id` 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。
+## MiniBatch Join
+默认情况下,普通join算子是逐条处理输入的记录,即:(1)从状态中根据joinKey查询记录,(2)将当前记录写到对应状态,(3)处理输入和从状态中查找出的记录。这种处理模式可能会增加
StateBackend 开销(尤其是对于 RocksDB StateBackend )。
Review Comment:
Please pay attention to the code style and keep a space between Chinese and
English.
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e.,
(1) look up records from state according to joinKey, (2) write or retract input
in state, (3) process the input and joined records. This processing pattern may
increase the overhead of StateBackend (especially for RocksDB StateBackend).
Review Comment:
```
By default, regular join operator process input records one by one, i.e.,
(1) lookup associated records from the state of counterpart based on the join
key of the current input record, (2) add or retract the current input record to
state, (3) output the join results according to the current record and
associated records. This processing pattern may increase the overhead of
StateBackend (especially for RocksDB StateBackend). Beside, this can lead to
severe record amplification, especially in cascading Join scenarios, generating
too many intermediate results and further leading to performance degradation.
```
You need to highlight the current problem with Regular Join, which is what
the minibatch join is trying to solve.
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e.,
(1) look up records from state according to joinKey, (2) write or retract input
in state, (3) process the input and joined records. This processing pattern may
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer
inside of the mini-batch join operator. Reduce data in the cache, and then when
the cache is triggered for processing, perform specific optimizations based on
certain scenarios. Some of input records would be folded according to specified
rule illustrated below:
Review Comment:
Here you can list the core optimization ideas of Minibatch join by an
ordered table, for example:
1) xxx
2) yyy
3) zzz
Then combine it with a diagram visualization to introduce the principle.
##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
Flink SQL optimizer can recognize the different filter arguments on the same
distinct key. For example, in the above example, all the three COUNT DISTINCT
are on `user_id` column.
Then Flink can use just one shared state instance instead of three state
instances to reduce state access and state size. In some workloads, this can
get significant performance improvements.
+## MiniBatch Join
Review Comment:
Please also update this sentence, which should contain the regular join:
https://github.com/apache/flink/pull/24240/files#diff-8d75f38adf6c9f25a579a8fd856b1946386d4aae2ecdc8bbdb2c2d548ac2a590R31
```In this page, we will introduce some useful optimization options and the
internals of streaming aggregation, regular join which will bring great
improvement in some cases.```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]