tsreaper commented on a change in pull request #16979:
URL: https://github.com/apache/flink/pull/16979#discussion_r696566068
##########
File path:
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
##########
@@ -85,12 +83,13 @@ class StreamCommonSubGraphBasedOptimizer(planner:
StreamPlanner)
return sinkBlocks
}
- // infer updateAsRetraction property and miniBatchInterval property for
all input blocks
+ // infer modifyKind property for each blocks independently
+ sinkBlocks.foreach(b => optimizeBlock(b, isSinkBlock = true))
+ // infer and propagate updateKind and miniBatchInterval property for each
blocks
sinkBlocks.foreach { b =>
- inferTraits(b, b.isUpdateBeforeRequired, b.getMiniBatchInterval,
isSinkBlock = true)
+ propagateUpdateKindAndMiniBatchInterval(
+ b, b.isUpdateBeforeRequired, b.getMiniBatchInterval, isSinkBlock =
true)
}
- // propagate updateAsRetraction property and miniBatchInterval property to
all input blocks
- sinkBlocks.foreach(propagateTraits(_, isSinkBlock = true))
// clear the intermediate result
sinkBlocks.foreach(resetIntermediateResult)
// optimize recursively RelNodeBlock
Review comment:
This final call to `optimizeBlock` is only updating the intermediate
results and is not for some real optimization. However this will increase the
planning time by 50%. Is it possible to remove this call?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]