xuyangzhong commented on code in PR #25075:
URL: https://github.com/apache/flink/pull/25075#discussion_r1675358942
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -385,35 +384,97 @@ object WindowUtil {
}
}
- private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+ private def isValidRowtimeWindow(windowProperties: RelWindowProperties):
Boolean = {
+ // rowtime tvf window can support calculation on window columns even
before aggregation
+ windowProperties.isRowtime
+ }
- @tailrec
- def find(rel: RelNode): Unit = {
- rel match {
- case rss: RelSubset =>
- val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
- find(innerRel)
+ /**
+ * If the middle Calc(s) contains call(s) on window columns, we should not
convert the Aggregate
+ * into WindowAggregate but GroupAggregate instead.
+ *
+ * The valid plan structure is like:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc (should not contain call on window columns)
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ *
+ * and unlike:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ */
+ private def isValidProcTimeWindow(
+ windowProperties: RelWindowProperties,
+ fmq: FlinkRelMetadataQuery,
+ agg: FlinkLogicalAggregate): Boolean = {
+ var existNeighbourWindowTableFunc = false
+ val calcMatcher = new CalcWindowFunctionScanMatcher
+ try {
+ calcMatcher.go(agg.getInput(0))
+ } catch {
+ case r: Util.FoundOne =>
+ r.getNode match {
+ case _: Some[_] =>
+ existNeighbourWindowTableFunc = true
+ case _ => // do nothing
+ }
+ }
+ var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
+ calcMatcher.calcNodes.exists(calc =>
calcContainsCallsOnWindowColumns(calc, fmq))
+
+ // aggregate call shouldn't be on window columns
+ val aggInputWindowProps = windowProperties.getWindowColumns
+ existCallOnWindowColumns = existCallOnWindowColumns ||
!agg.getAggCallList.forall {
+ call =>
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
+ }
+ // proctime window can's support calculation on window columns before
aggregation,
+ // and need to check if there is a neighbour windowTableFunctionCall
+ !existCallOnWindowColumns && existNeighbourWindowTableFunc
+ }
+
+ private class CalcWindowFunctionScanMatcher extends RelVisitor {
+ val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]()
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case calc: Calc =>
+ calcNodes += calc
+ // continue to visit children
+ super.visit(calc, 0, parent)
case scan: FlinkLogicalTableFunctionScan =>
if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
- throw new Util.FoundOne
+ throw new Util.FoundOne(Some(0))
}
- find(scan.getInput(0))
-
- // proctime attribute comes from these operators can not be used
directly for proctime
- // window aggregate, so further traversal of child nodes is unnecessary
- case _: FlinkLogicalAggregate | _: FlinkLogicalRank | _:
FlinkLogicalJoin =>
-
- case sr: SingleRel => find(sr.getInput)
+ case rss: RelSubset =>
+ val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
+ // special case doesn't call super.visit for RelSubSet because it
has no children
+ visit(innerRel, 0, rss)
+ case _: FlinkLogicalAggregate | _: FlinkLogicalCorrelate | _:
FlinkLogicalIntersect |
+ _: FlinkLogicalJoin | _: FlinkLogicalMatch | _: FlinkLogicalMinus |
+ _: FlinkLogicalOverAggregate | _: FlinkLogicalRank | _:
FlinkLogicalUnion =>
+ // proctime attribute comes from these operators can not be used
directly for proctime
+ // window aggregate, so further traversal of child nodes is
unnecessary
+ throw new Util.FoundOne(Option.empty)
Review Comment:
We can return directly here to stop the further traversal.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -385,35 +384,97 @@ object WindowUtil {
}
}
- private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+ private def isValidRowtimeWindow(windowProperties: RelWindowProperties):
Boolean = {
+ // rowtime tvf window can support calculation on window columns even
before aggregation
+ windowProperties.isRowtime
+ }
- @tailrec
- def find(rel: RelNode): Unit = {
- rel match {
- case rss: RelSubset =>
- val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
- find(innerRel)
+ /**
+ * If the middle Calc(s) contains call(s) on window columns, we should not
convert the Aggregate
+ * into WindowAggregate but GroupAggregate instead.
+ *
+ * The valid plan structure is like:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc (should not contain call on window columns)
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ *
+ * and unlike:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ */
+ private def isValidProcTimeWindow(
+ windowProperties: RelWindowProperties,
+ fmq: FlinkRelMetadataQuery,
+ agg: FlinkLogicalAggregate): Boolean = {
+ var existNeighbourWindowTableFunc = false
+ val calcMatcher = new CalcWindowFunctionScanMatcher
+ try {
+ calcMatcher.go(agg.getInput(0))
+ } catch {
+ case r: Util.FoundOne =>
+ r.getNode match {
+ case _: Some[_] =>
+ existNeighbourWindowTableFunc = true
+ case _ => // do nothing
+ }
+ }
+ var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
+ calcMatcher.calcNodes.exists(calc =>
calcContainsCallsOnWindowColumns(calc, fmq))
+
+ // aggregate call shouldn't be on window columns
+ val aggInputWindowProps = windowProperties.getWindowColumns
+ existCallOnWindowColumns = existCallOnWindowColumns ||
!agg.getAggCallList.forall {
+ call =>
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
+ }
+ // proctime window can's support calculation on window columns before
aggregation,
+ // and need to check if there is a neighbour windowTableFunctionCall
+ !existCallOnWindowColumns && existNeighbourWindowTableFunc
+ }
+
+ private class CalcWindowFunctionScanMatcher extends RelVisitor {
+ val calcNodes: ListBuffer[Calc] = ListBuffer[Calc]()
+
+ override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
+ node match {
+ case calc: Calc =>
+ calcNodes += calc
+ // continue to visit children
+ super.visit(calc, 0, parent)
case scan: FlinkLogicalTableFunctionScan =>
if (WindowUtil.isWindowTableFunctionCall(scan.getCall)) {
- throw new Util.FoundOne
+ throw new Util.FoundOne(Some(0))
Review Comment:
It seems that we can unify the processing about `calcNodes` and
`existNeighbourWindowTableFunc` instead of throwing an exception here. WDYT?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -385,35 +384,97 @@ object WindowUtil {
}
}
- private def existNeighbourWindowTableFunc(rel: RelNode): Boolean = {
+ private def isValidRowtimeWindow(windowProperties: RelWindowProperties):
Boolean = {
+ // rowtime tvf window can support calculation on window columns even
before aggregation
+ windowProperties.isRowtime
+ }
- @tailrec
- def find(rel: RelNode): Unit = {
- rel match {
- case rss: RelSubset =>
- val innerRel = Option.apply(rss.getBest).getOrElse(rss.getOriginal)
- find(innerRel)
+ /**
+ * If the middle Calc(s) contains call(s) on window columns, we should not
convert the Aggregate
+ * into WindowAggregate but GroupAggregate instead.
+ *
+ * The valid plan structure is like:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc (should not contain call on window columns)
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ *
+ * and unlike:
+ *
+ * {{{
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * Aggregate
+ * |
+ * Calc
+ * |
+ * WindowTableFunctionScan
+ * }}}
+ */
+ private def isValidProcTimeWindow(
+ windowProperties: RelWindowProperties,
+ fmq: FlinkRelMetadataQuery,
+ agg: FlinkLogicalAggregate): Boolean = {
+ var existNeighbourWindowTableFunc = false
+ val calcMatcher = new CalcWindowFunctionScanMatcher
+ try {
+ calcMatcher.go(agg.getInput(0))
+ } catch {
+ case r: Util.FoundOne =>
+ r.getNode match {
+ case _: Some[_] =>
+ existNeighbourWindowTableFunc = true
+ case _ => // do nothing
+ }
+ }
+ var existCallOnWindowColumns = calcMatcher.calcNodes.nonEmpty &&
+ calcMatcher.calcNodes.exists(calc =>
calcContainsCallsOnWindowColumns(calc, fmq))
+
+ // aggregate call shouldn't be on window columns
+ val aggInputWindowProps = windowProperties.getWindowColumns
+ existCallOnWindowColumns = existCallOnWindowColumns ||
!agg.getAggCallList.forall {
+ call =>
aggInputWindowProps.intersect(ImmutableBitSet.of(call.getArgList)).isEmpty
+ }
+ // proctime window can's support calculation on window columns before
aggregation,
Review Comment:
nit: `can't`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]