lsyldliu commented on code in PR #24240:
URL: https://github.com/apache/flink/pull/24240#discussion_r1477184327


##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join

Review Comment:
   MiniBatch Regular Joins. This optimization only works for Regular joins, not 
other joins such as window join、temporal join、lookup join.



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e., 
(1) look up records from state according to joinKey, (2) write or retract input 
in state, (3) process the input and joined records. This processing pattern may 
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer 
inside of the mini-batch join operator. Reduce data in the cache, and then when 
the cache is triggered for processing, perform specific optimizations based on 
certain scenarios. Some of input records would be folded according to specified 
rule illustrated below:
+
+{{< img src="/fig/table-streaming/folded.png" width="70%" height="70%" >}}
+
+When the bundle of inputs is triggered to be processed, the inputs inside of 
the bundle are records that could not be folded further. Another optimization 
for update records is applied for the bundle. When encountering the pair of -U 
and +U records, they would be recognized and redundant records in their output 
would be suppressed. The graph below explains the principle here.
+
+{{< img src="/fig/table-streaming/suppress.jpg" width="70%" height="70%" >}}
+
+Besides, the order in which the left and right stream bundles are processed 
can also help reduce redundant records, but this only applies to cases where 
there is an outer join. The following graph clarifies the principle:
+
+{{< img src="/fig/table-streaming/order.jpg" width="70%" height="70%" >}}
+
+MiniBatch optimization is disabled by default for regular join. In order to 
enable this optimization, you should set options 
`table.exec.mini-batch.enabled`, `table.exec.mini-batch.allow-latency` and 
`table.exec.mini-batch.size`. Please see [configuration]({{< ref 
"docs/dev/table/config" >}}#execution-options) page for more details.
+The examples could refer to the part of MiniBatch Aggregation.

Review Comment:
   Please link it and click on it to jump over.



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e., 
(1) look up records from state according to joinKey, (2) write or retract input 
in state, (3) process the input and joined records. This processing pattern may 
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer 
inside of the mini-batch join operator. Reduce data in the cache, and then when 
the cache is triggered for processing, perform specific optimizations based on 
certain scenarios. Some of input records would be folded according to specified 
rule illustrated below:
+
+{{< img src="/fig/table-streaming/folded.png" width="70%" height="70%" >}}
+
+When the bundle of inputs is triggered to be processed, the inputs inside of 
the bundle are records that could not be folded further. Another optimization 
for update records is applied for the bundle. When encountering the pair of -U 
and +U records, they would be recognized and redundant records in their output 
would be suppressed. The graph below explains the principle here.

Review Comment:
   Delete this paragraph.



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e., 
(1) look up records from state according to joinKey, (2) write or retract input 
in state, (3) process the input and joined records. This processing pattern may 
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer 
inside of the mini-batch join operator. Reduce data in the cache, and then when 
the cache is triggered for processing, perform specific optimizations based on 
certain scenarios. Some of input records would be folded according to specified 
rule illustrated below:
+
+{{< img src="/fig/table-streaming/folded.png" width="70%" height="70%" >}}
+
+When the bundle of inputs is triggered to be processed, the inputs inside of 
the bundle are records that could not be folded further. Another optimization 
for update records is applied for the bundle. When encountering the pair of -U 
and +U records, they would be recognized and redundant records in their output 
would be suppressed. The graph below explains the principle here.
+
+{{< img src="/fig/table-streaming/suppress.jpg" width="70%" height="70%" >}}
+
+Besides, the order in which the left and right stream bundles are processed 
can also help reduce redundant records, but this only applies to cases where 
there is an outer join. The following graph clarifies the principle:

Review Comment:
   Delete this paragraph.



##########
docs/content.zh/docs/dev/table/tuning.md:
##########
@@ -259,5 +259,23 @@ GROUP BY day
 
 Flink SQL 优化器可以识别相同的 distinct key 上的不同过滤器参数。例如,在上面的示例中,三个 COUNT DISTINCT 都在 
`user_id` 一列上。Flink 可以只使用一个共享状态实例,而不是三个状态实例,以减少状态访问和状态大小。在某些工作负载下,可以获得显著的性能提升。
 
+## MiniBatch Join
+默认情况下,普通join算子是逐条处理输入的记录,即:(1)从状态中根据joinKey查询记录,(2)将当前记录写到对应状态,(3)处理输入和从状态中查找出的记录。这种处理模式可能会增加
 StateBackend 开销(尤其是对于 RocksDB StateBackend )。

Review Comment:
   Please pay attention to the code style and keep a space between Chinese and 
English.



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e., 
(1) look up records from state according to joinKey, (2) write or retract input 
in state, (3) process the input and joined records. This processing pattern may 
increase the overhead of StateBackend (especially for RocksDB StateBackend).

Review Comment:
   ```
   By default, regular join operator process input records one by one, i.e., 
(1) lookup associated records from the state of counterpart based on the join 
key of the current input record, (2) add or retract the current input record to 
state, (3) output the join results according to the current record and 
associated records. This processing pattern may increase the overhead of 
StateBackend (especially for RocksDB StateBackend). Beside, this can lead to 
severe record amplification, especially in cascading Join scenarios, generating 
too many intermediate results and further leading to performance degradation.
   ```
   You need to highlight the current problem with Regular Join, which is what 
the minibatch join is trying to solve.



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join
+
+By default, regular join operator processes input records one by one, i.e., 
(1) look up records from state according to joinKey, (2) write or retract input 
in state, (3) process the input and joined records. This processing pattern may 
increase the overhead of StateBackend (especially for RocksDB StateBackend).
+
+The core idea of mini-batch join is to cache a bundle of inputs in a buffer 
inside of the mini-batch join operator. Reduce data in the cache, and then when 
the cache is triggered for processing, perform specific optimizations based on 
certain scenarios. Some of input records would be folded according to specified 
rule illustrated below:

Review Comment:
   Here you can list the core optimization ideas of Minibatch join by an 
ordered table,  for example:
   1) xxx
   2) yyy
   3) zzz
   Then combine it with a diagram visualization to introduce the principle.



##########
docs/content/docs/dev/table/tuning.md:
##########
@@ -266,5 +266,23 @@ GROUP BY day
 Flink SQL optimizer can recognize the different filter arguments on the same 
distinct key. For example, in the above example, all the three COUNT DISTINCT 
are on `user_id` column.
 Then Flink can use just one shared state instance instead of three state 
instances to reduce state access and state size. In some workloads, this can 
get significant performance improvements.
 
+## MiniBatch Join

Review Comment:
   Please also update this sentence, which should contain the regular join: 
https://github.com/apache/flink/pull/24240/files#diff-8d75f38adf6c9f25a579a8fd856b1946386d4aae2ecdc8bbdb2c2d548ac2a590R31
   
   ```In this page, we will introduce some useful optimization options and the 
internals of streaming aggregation, regular join which will bring great 
improvement in some cases.```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to