[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15902057#comment-15902057
 ] 

ASF GitHub Bot commented on FLINK-3850:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3040#discussion_r105023568
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
    @@ -124,8 +127,26 @@ class DataSetCalc(
           body,
           returnType)
     
    +    def getForwardIndices = {
    +      // get (input, output) indices of operands,
    +      // filter modified operands and specify forwarding
    +      val inputFields = extractRefInputFields(calcProgram)
    +      calcProgram.getProjectList
    +        .map(_.getIndex)
    +        .zipWithIndex
    +        .filter(tup => inputFields.contains(tup._1))
    --- End diff --
    
    This position based mapping is making some assumptions about the internal 
organization of a `RexProgram`, i.e., that the first `n` fields of the 
expressions list are filled in order by the `n` fields of the input.
    Can we change this to iterate over the projection list and looking up the 
expression and check whether it is a `RexInputRef`. Basically something like:
    ```
    calcProgram.getProjectList.zipWithIndex.map { case (p, out) =>
      val expr = calcProgram.getExprList.get(p.getIndex)
      expr match {
        case i: RexInputRef => Some((i.getIndex, out))
        case _ => None
      }
    }.flatten
    ```


> Add forward field annotations to DataSet operators generated by the Table API
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-3850
>                 URL: https://issues.apache.org/jira/browse/FLINK-3850
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Fabian Hueske
>            Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to