godfreyhe commented on a change in pull request #12873:
URL: https://github.com/apache/flink/pull/12873#discussion_r456230725
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
##########
@@ -61,36 +61,43 @@ class FlinkCalcMergeRule(relBuilderFactory:
RelBuilderFactory) extends RelOptRul
return false
}
- // Each bottomCalc's non-deterministic RexNode should appear at most once
in
- // topCalc's project fields and condition field.
+ isMergeable(topCalc, bottomCalc)
+ }
+
+ /**
+ * Return two neighbouring [[Calc]] can merge into one [[Calc]] or not. If
the two [[Calc]] can
+ * merge into one, each non-deterministic [[RexNode]] of bottom [[Calc]]
should appear at most
+ * once in the project list and filter list of top [[Calc]].
+ */
+ private def isMergeable(topCalc: Calc, bottomCalc: Calc): Boolean = {
+ val topProgram = topCalc.getProgram
val bottomProgram = bottomCalc.getProgram
- val topProjectRexNodesInputs = topProgram.getProjectList
+ val topProjectInputIndices = topProgram.getProjectList
.map(r => topProgram.expandLocalRef(r))
.map(r => InputFinder.bits(r).toArray)
- val topFilterRexNodesInputs = if (topProgram.getCondition != null) {
- InputFinder.bits(topProgram.expandLocalRef(topProgram.getCondition))
- .toArray
+ val topFilterInputIndices = if (topProgram.getCondition != null) {
+
InputFinder.bits(topProgram.expandLocalRef(topProgram.getCondition)).toArray
} else {
new Array[Int](0)
}
- val bottomRexList = bottomProgram.getProjectList
+ val bottomProjectList = bottomProgram.getProjectList
.map(r => bottomProgram.expandLocalRef(r))
.toArray
- bottomRexList.zipWithIndex.forall {
- case (rexNode: RexNode, index: Int) => {
+ bottomProjectList.zipWithIndex.forall {
+ case (project: RexNode, index: Int) => {
var nonDeterministicRexRefCnt = 0
- if (!RexUtil.isDeterministic(rexNode)) {
- topProjectRexNodesInputs.foreach(list => list.foreach(
- ref => if (ref == index) {
- nonDeterministicRexRefCnt += 1
- }))
- topFilterRexNodesInputs.foreach(
- ref => if (ref == index) {
- nonDeterministicRexRefCnt += 1
- })
+ if (!RexUtil.isDeterministic(project)) {
+ topProjectInputIndices.add(topFilterInputIndices)
Review comment:
This line should move out-of `forall`, otherwise the
`topProjectInputIndices` will contain many `topFilterInputIndices`s if there is
multiple non-deterministic expression in bottom project.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]