jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451764950
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##########
@@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
val tableConfig = planner.getTableConfig
// build RelNodeBlock plan
val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots,
tableConfig)
+ val miniBatchRequirementChecker = {
+ (node: RelNode) =>
+ node.isInstanceOf[Filter] ||
Review Comment:
Hi @xuyangzhong thanks for your comment. Good point.
There are two main reasons I think we should not move this logic elsewhere
than `StreamCommonSubGraphBasedOptimizer`:
- There is a small difference in the logic when we move the logic to the
`MiniBatchIntervalInferRule`.
In `MiniBatchIntervalInferRule` the check for the `miniBatchEnabled` is
calculated via global variables:
```
val miniBatchEnabled =
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
```
Therefore, it is deterministic. If we calculate the mini-batch skipping
logic in `MiniBatchIntervalInferRule`, the result will not be deterministic,
because the optimizer will invoke `MiniBatchIntervalInferRule::onMatch` with
different parts of the query plan (not necessarily only with the root of the
query plan).
- Also, there are many parts of the code that decide whether mini batch
enabled via checking the global configuration variable. Therefore, it might be
better to unset the global configuration inside
`StreamCommonSubGraphBasedOptimizer` and set the original configuration
variable at the end. I used `try...finally` for that. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]