Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3040#discussion_r105023568
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
---
@@ -124,8 +127,26 @@ class DataSetCalc(
body,
returnType)
+ def getForwardIndices = {
+ // get (input, output) indices of operands,
+ // filter modified operands and specify forwarding
+ val inputFields = extractRefInputFields(calcProgram)
+ calcProgram.getProjectList
+ .map(_.getIndex)
+ .zipWithIndex
+ .filter(tup => inputFields.contains(tup._1))
--- End diff --
This position based mapping is making some assumptions about the internal
organization of a `RexProgram`, i.e., that the first `n` fields of the
expressions list are filled in order by the `n` fields of the input.
Can we change this to iterate over the projection list and looking up the
expression and check whether it is a `RexInputRef`. Basically something like:
```
calcProgram.getProjectList.zipWithIndex.map { case (p, out) =>
val expr = calcProgram.getExprList.get(p.getIndex)
expr match {
case i: RexInputRef => Some((i.getIndex, out))
case _ => None
}
}.flatten
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---