Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1414#issuecomment-160153032
  
    Hi,
    the reason it was implemented as it was is to eliminate common 
subexpressions. For example, if you wrote:
    ```
    select(a.count, b.count)
    ```
    
    the resulting expansion would be
    ```
    intermediate = select(1.count as intermediate.1)
    agg = intermediate.aggregate(intermediate.1, sum)
    result = agg.select(intermediate.1, intermediate.1)
    ```
    
    You see, the `1` would only be aggregated once here.
    
    The problem was that selecting `intermediate.1` twice is not allowed since 
the field names have to be unique.
    
    Changing `ExpandAggregations` to this should do the trick:
    ```
    var intermediateCount = 0
    var resultCount = 0
    selection foreach {  f =>
      f.transformPre {
        case agg: Aggregation =>
          val intermediateReferences = 
agg.getIntermediateFields.zip(agg.getAggregations) map {
            case (expr, basicAgg) =>
              aggregations.get((expr, basicAgg)) match {
                case Some(intermediateName) =>
                  resultCount = resultCount + 1
                  val resultName = s"result.$resultCount"
                  Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
                case None =>
                  intermediateCount = intermediateCount + 1
                  val intermediateName = s"intermediate.$intermediateCount"
                  intermediateFields += Naming(expr, intermediateName)
                  aggregations((expr, basicAgg)) = intermediateName
                  resultCount = resultCount + 1
                  val resultName = s"result.$resultCount"
                  Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
              }
          }
    
          aggregationIntermediates(agg) = intermediateReferences
          // Return a NOP so that we don't add the children of the aggregation
          // to intermediate fields. We already added the necessary fields to 
the list
          // of intermediate fields.
          NopExpression()
    
        case fa: ResolvedFieldReference =>
          if (!fa.name.startsWith("intermediate")) {
            intermediateFields += Naming(fa, fa.name)
          }
          fa
      }
    }
    ```
    
    Now the aggregations will be expanded to:
    ```
    intermediate = select(1.count as intermediate.1)
    agg = intermediate.aggregate(intermediate.1, sum)
    result = agg.select(intermediate.1 as result.1, intermediate.1 as result.2)
    ```
    
    This also required changing `ExpressionCodeGenerator.scala`, line 111 to:
    ```
    def cleanExpr(e: Expression): Expression = {
      e match {
        case expressions.Naming(namedExpr, _) => cleanExpr(namedExpr)
        case _ => e
      }
    }
    ```
    
    because there is another bug that doesn't allow having nested Namings à la 
`Naming(Naming(expr, intermediate.1, result.1))`.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to