[ 
https://issues.apache.org/jira/browse/FLINK-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15029935#comment-15029935
 ] 

ASF GitHub Bot commented on FLINK-3087:
---------------------------------------

Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1414#issuecomment-160153032
  
    Hi,
    the reason it was implemented as it was is to eliminate common 
subexpressions. For example, if you wrote:
    ```
    select(a.count, b.count)
    ```
    
    the resulting expansion would be
    ```
    intermediate = select(1.count as intermediate.1)
    agg = intermediate.aggregate(intermediate.1, sum)
    result = agg.select(intermediate.1, intermediate.1)
    ```
    
    You see, the `1` would only be aggregated once here.
    
    The problem was that selecting `intermediate.1` twice is not allowed since 
the field names have to be unique.
    
    Changing `ExpandAggregations` to this should do the trick:
    ```
    var intermediateCount = 0
    var resultCount = 0
    selection foreach {  f =>
      f.transformPre {
        case agg: Aggregation =>
          val intermediateReferences = 
agg.getIntermediateFields.zip(agg.getAggregations) map {
            case (expr, basicAgg) =>
              aggregations.get((expr, basicAgg)) match {
                case Some(intermediateName) =>
                  resultCount = resultCount + 1
                  val resultName = s"result.$resultCount"
                  Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
                case None =>
                  intermediateCount = intermediateCount + 1
                  val intermediateName = s"intermediate.$intermediateCount"
                  intermediateFields += Naming(expr, intermediateName)
                  aggregations((expr, basicAgg)) = intermediateName
                  resultCount = resultCount + 1
                  val resultName = s"result.$resultCount"
                  Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
              }
          }
    
          aggregationIntermediates(agg) = intermediateReferences
          // Return a NOP so that we don't add the children of the aggregation
          // to intermediate fields. We already added the necessary fields to 
the list
          // of intermediate fields.
          NopExpression()
    
        case fa: ResolvedFieldReference =>
          if (!fa.name.startsWith("intermediate")) {
            intermediateFields += Naming(fa, fa.name)
          }
          fa
      }
    }
    ```
    
    Now the aggregations will be expanded to:
    ```
    intermediate = select(1.count as intermediate.1)
    agg = intermediate.aggregate(intermediate.1, sum)
    result = agg.select(intermediate.1 as result.1, intermediate.1 as result.2)
    ```
    
    This also required changing `ExpressionCodeGenerator.scala`, line 111 to:
    ```
    def cleanExpr(e: Expression): Expression = {
      e match {
        case expressions.Naming(namedExpr, _) => cleanExpr(namedExpr)
        case _ => e
      }
    }
    ```
    
    because there is another bug that doesn't allow having nested Namings à la 
`Naming(Naming(expr, intermediate.1, result.1))`.
    



> Table API do not support multi count in aggregation.
> ----------------------------------------------------
>
>                 Key: FLINK-3087
>                 URL: https://issues.apache.org/jira/browse/FLINK-3087
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API
>    Affects Versions: 0.10.0
>            Reporter: Chengxiang Li
>            Assignee: Chengxiang Li
>
> Multi {{count}} in aggregation is not supported, for example:
> {code:java}
> table.select("a.count", "b.count")
> {code}
> It's valid in grammar, besides, {{a.count}} and {{b.count}} may have 
> different values actually if NULL value handling is enabled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to