jeyhunkarimov commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1451764950


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##########
@@ -50,14 +51,23 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
     val tableConfig = planner.getTableConfig
     // build RelNodeBlock plan
     val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots, 
tableConfig)
+    val miniBatchRequirementChecker = {
+      (node: RelNode) =>
+        node.isInstanceOf[Filter] ||

Review Comment:
   Hi @xuyangzhong thanks for your comment. Good point. 
   There are two main reasons I think we should not move this logic elsewhere 
than `StreamCommonSubGraphBasedOptimizer`:
   
    - There is a small difference in the logic when we move the logic to the 
`MiniBatchIntervalInferRule`. 
   In  `MiniBatchIntervalInferRule` the check for the `miniBatchEnabled` is 
calculated via global variables:
   
   ```
       val miniBatchEnabled = 
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
   ```
   
   Therefore, it is deterministic. If we calculate the mini-batch skipping 
logic in `MiniBatchIntervalInferRule`, the result will not be deterministic, 
because the optimizer will invoke `MiniBatchIntervalInferRule::onMatch` with 
different parts of the query plan (not necessarily only with the root of the 
query plan).
   
   - Also, there are many parts of the code that decide whether mini batch 
enabled via checking the global configuration variable. Therefore, it might be 
better to unset the global configuration inside 
`StreamCommonSubGraphBasedOptimizer` and set the original configuration 
variable at the end. I used `try...finally` for that. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to