wuchong commented on a change in pull request #10680:
[FLINK-15125][table-planner-blink] PROCTIME() computed column defined…
URL: https://github.com/apache/flink/pull/10680#discussion_r361305629
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
##########
@@ -138,6 +143,86 @@ abstract class
LogicalWindowAggregateRuleBase(description: String)
call.transformTo(result)
}
+ /** Trim out the HepRelVertex wrapper and get current relational expression.
*/
+ private def trimHep(node: RelNode): RelNode = {
+ node match {
+ case hepRelVertex: HepRelVertex =>
+ hepRelVertex.getCurrentRel
+ case _ => node
+ }
+ }
+
+ /**
+ * Rewrite plan with PROCTIME() as window call operand: rewrite the window
call to
+ * reference the input instead of invoke the PROCTIME() directly, in order
to simplify the
+ * subsequent rewrite logic.
+ *
+ * For example, plan
+ * <pre>
+ * LogicalProject($f0=[TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0],
b=[$1])
+ * +- LogicalTableScan
+ * </pre>
+ *
+ * would be rewritten to
+ * <pre>
+ * LogicalProject($f0=[TUMBLE($2, 1000:INTERVAL SECOND)], a=[$0], b=[$1])
+ * +- LogicalProject(a=[$0], b=[$1], $f2=[PROCTIME()])
+ * +- LogicalTableScan
+ * </pre>
+ */
+ private def rewriteProctimeTumbling(
Review comment:
I think we can improve the method implementation a bit, what do you think
about the following way:
```scala
private def rewriteProctimeWindows(
project: LogicalProject,
relBuilder: RelBuilder): LogicalProject = {
val projectInput = trimHep(project.getInput)
var hasWindowOnProctimeCall: Boolean = false
val newProjectExprs = project.getChildExps.map {
case call: RexCall if isWindowCall(call) &&
isProctimeCall(call.getOperands.head) =>
hasWindowOnProctimeCall = true
// update the window call to reference a RexInputRef instead of a
PROCTIME() call.
call.accept(
new RexShuttle {
override def visitCall(call: RexCall): RexNode = {
if (isProctimeCall(call)) {
relBuilder.getRexBuilder.makeInputRef(
call.getType,
// we will apply a project to have an additional
PROCTIME() call at the end
projectInput.getRowType.getFieldCount)
} else {
super.visitCall(call)
}
}
})
case rex: RexNode => rex
}
if (hasWindowOnProctimeCall) {
val newInput = relBuilder
.push(projectInput)
// project plus the PROCTIME() call.
.projectPlus(relBuilder.call(FlinkSqlOperatorTable.PROCTIME))
.build()
// we have to use project factory, because RelBuilder will simplify
redundant projects
RelFactories
.DEFAULT_PROJECT_FACTORY
.createProject(newInput, newProjectExprs,
project.getRowType.getFieldNames)
.asInstanceOf[LogicalProject]
} else {
project
}
}
def isProctimeCall(rexNode: RexNode): Boolean = {
rexNode match {
case call: RexCall =>
call.getOperator == FlinkSqlOperatorTable.PROCTIME
case _ => false
}
}
def isWindowCall(call: RexCall): Boolean = call.getOperator match {
case FlinkSqlOperatorTable.SESSION |
FlinkSqlOperatorTable.HOP |
FlinkSqlOperatorTable.TUMBLE => true
case _ => false
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services