xuyangzhong commented on code in PR #23470:
URL: https://github.com/apache/flink/pull/23470#discussion_r1578935653
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##########
@@ -46,11 +47,14 @@ import scala.collection.JavaConversions._
class StreamCommonSubGraphBasedOptimizer(planner: StreamPlanner)
extends CommonSubGraphBasedOptimizer {
- override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
- val tableConfig = planner.getTableConfig
- // build RelNodeBlock plan
- val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots,
tableConfig)
- // infer trait properties for sink block
+ private def optimizeSinkBlocks(
+ origMiniBatchEnabled: Boolean,
+ tableConfig: TableConfig,
+ sinkBlocks: Seq[RelNodeBlock]): Seq[RelNodeBlock] = {
+ if (origMiniBatchEnabled)
Review Comment:
nit: How about moving this if block from function `optimizeSinkBlocks` into
`doOptimize`? Because confining all minibatch-related concepts within one
function seems cleaner.
Just like:
```
override protected def doOptimize(roots: Seq[RelNode]): Seq[RelNodeBlock] = {
val tableConfig = planner.getTableConfig
// build RelNodeBlock plan
val sinkBlocks = RelNodeBlockPlanBuilder.buildRelNodeBlockPlan(roots,
tableConfig)
// infer trait properties for sink block
val origMiniBatchEnabled =
tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
try {
if (origMiniBatchEnabled) {
tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Boolean.box(!shouldSkipMiniBatch(sinkBlocks)))
}
optimizeSinkBlocks(tableConfig, sinkBlocks)
} finally {
tableConfig.getConfiguration.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
origMiniBatchEnabled)
}
}
private def optimizeSinkBlocks(
tableConfig: TableConfig,
sinkBlocks: Seq[RelNodeBlock]): Seq[RelNodeBlock] = {
sinkBlocks.foreach {
sinkBlock =>
......
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala:
##########
@@ -83,25 +87,42 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
isSinkBlock = true)
block.setOptimizedPlan(optimizedTree)
return sinkBlocks
+ } else {
Review Comment:
nit: It seems that reverting this part of the changes won't be an issue. How
about reverting it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]