wuchong commented on a change in pull request #10680: 
[FLINK-15125][table-planner-blink] PROCTIME() computed column defined…
URL: https://github.com/apache/flink/pull/10680#discussion_r361305629
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
 ##########
 @@ -138,6 +143,86 @@ abstract class 
LogicalWindowAggregateRuleBase(description: String)
     call.transformTo(result)
   }
 
+  /** Trim out the HepRelVertex wrapper and get current relational expression. 
*/
+  private def trimHep(node: RelNode): RelNode = {
+    node match {
+      case hepRelVertex: HepRelVertex =>
+        hepRelVertex.getCurrentRel
+      case _ => node
+    }
+  }
+
+  /**
+   * Rewrite plan with PROCTIME() as window call operand: rewrite the window 
call to
+   * reference the input instead of invoke the PROCTIME() directly, in order 
to simplify the
+   * subsequent rewrite logic.
+   *
+   * For example, plan
+   * <pre>
+   * LogicalProject($f0=[TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], 
b=[$1])
+   *   +- LogicalTableScan
+   * </pre>
+   *
+   * would be rewritten to
+   * <pre>
+   * LogicalProject($f0=[TUMBLE($2, 1000:INTERVAL SECOND)], a=[$0], b=[$1])
+   * +- LogicalProject(a=[$0], b=[$1], $f2=[PROCTIME()])
+   * +- LogicalTableScan
+   * </pre>
+   */
+  private def rewriteProctimeTumbling(
 
 Review comment:
   I think we can improve the method implementation a bit, what do you think 
about the following way:
   
   ```scala
   private def rewriteProctimeWindows(
         project: LogicalProject,
         relBuilder: RelBuilder): LogicalProject = {
       val projectInput = trimHep(project.getInput)
       var hasWindowOnProctimeCall: Boolean = false
       val newProjectExprs = project.getChildExps.map {
         case call: RexCall if isWindowCall(call) && 
isProctimeCall(call.getOperands.head) =>
           hasWindowOnProctimeCall = true
           // update the window call to reference a RexInputRef instead of a 
PROCTIME() call.
           call.accept(
             new RexShuttle {
               override def visitCall(call: RexCall): RexNode = {
                 if (isProctimeCall(call)) {
                   relBuilder.getRexBuilder.makeInputRef(
                     call.getType,
                     // we will apply a project to have an additional 
PROCTIME() call at the end
                     projectInput.getRowType.getFieldCount)
                 } else {
                   super.visitCall(call)
                 }
               }
             })
         case rex: RexNode => rex
       }
   
       if (hasWindowOnProctimeCall) {
         val newInput = relBuilder
           .push(projectInput)
           // project plus the PROCTIME() call.
           .projectPlus(relBuilder.call(FlinkSqlOperatorTable.PROCTIME))
           .build()
         // we have to use project factory, because RelBuilder will simplify 
redundant projects
         RelFactories
           .DEFAULT_PROJECT_FACTORY
           .createProject(newInput, newProjectExprs, 
project.getRowType.getFieldNames)
           .asInstanceOf[LogicalProject]
       } else {
         project
       }
     }
   
     def isProctimeCall(rexNode: RexNode): Boolean = rexNode match {
       case call: RexCall =>
         call.getOperator == FlinkSqlOperatorTable.PROCTIME
       case _ => false
     }
   
     def isWindowCall(call: RexCall): Boolean = call.getOperator match {
       case FlinkSqlOperatorTable.SESSION |
            FlinkSqlOperatorTable.HOP |
            FlinkSqlOperatorTable.TUMBLE => true
       case _ => false
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to