Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3040#discussion_r105023568
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
 ---
    @@ -124,8 +127,26 @@ class DataSetCalc(
           body,
           returnType)
     
    +    def getForwardIndices = {
    +      // get (input, output) indices of operands,
    +      // filter modified operands and specify forwarding
    +      val inputFields = extractRefInputFields(calcProgram)
    +      calcProgram.getProjectList
    +        .map(_.getIndex)
    +        .zipWithIndex
    +        .filter(tup => inputFields.contains(tup._1))
    --- End diff --
    
    This position based mapping is making some assumptions about the internal 
organization of a `RexProgram`, i.e., that the first `n` fields of the 
expressions list are filled in order by the `n` fields of the input.
    Can we change this to iterate over the projection list and looking up the 
expression and check whether it is a `RexInputRef`. Basically something like:
    ```
    calcProgram.getProjectList.zipWithIndex.map { case (p, out) =>
      val expr = calcProgram.getExprList.get(p.getIndex)
      expr match {
        case i: RexInputRef => Some((i.getIndex, out))
        case _ => None
      }
    }.flatten
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to