[
https://issues.apache.org/jira/browse/FLINK-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15029935#comment-15029935
]
ASF GitHub Bot commented on FLINK-3087:
---------------------------------------
Github user aljoscha commented on the pull request:
https://github.com/apache/flink/pull/1414#issuecomment-160153032
Hi,
the reason it was implemented as it was is to eliminate common
subexpressions. For example, if you wrote:
```
select(a.count, b.count)
```
the resulting expansion would be
```
intermediate = select(1.count as intermediate.1)
agg = intermediate.aggregate(intermediate.1, sum)
result = agg.select(intermediate.1, intermediate.1)
```
You see, the `1` would only be aggregated once here.
The problem was that selecting `intermediate.1` twice is not allowed since
the field names have to be unique.
Changing `ExpandAggregations` to this should do the trick:
```
var intermediateCount = 0
var resultCount = 0
selection foreach { f =>
f.transformPre {
case agg: Aggregation =>
val intermediateReferences =
agg.getIntermediateFields.zip(agg.getAggregations) map {
case (expr, basicAgg) =>
aggregations.get((expr, basicAgg)) match {
case Some(intermediateName) =>
resultCount = resultCount + 1
val resultName = s"result.$resultCount"
Naming(ResolvedFieldReference(intermediateName,
expr.typeInfo), resultName)
case None =>
intermediateCount = intermediateCount + 1
val intermediateName = s"intermediate.$intermediateCount"
intermediateFields += Naming(expr, intermediateName)
aggregations((expr, basicAgg)) = intermediateName
resultCount = resultCount + 1
val resultName = s"result.$resultCount"
Naming(ResolvedFieldReference(intermediateName,
expr.typeInfo), resultName)
}
}
aggregationIntermediates(agg) = intermediateReferences
// Return a NOP so that we don't add the children of the aggregation
// to intermediate fields. We already added the necessary fields to
the list
// of intermediate fields.
NopExpression()
case fa: ResolvedFieldReference =>
if (!fa.name.startsWith("intermediate")) {
intermediateFields += Naming(fa, fa.name)
}
fa
}
}
```
Now the aggregations will be expanded to:
```
intermediate = select(1.count as intermediate.1)
agg = intermediate.aggregate(intermediate.1, sum)
result = agg.select(intermediate.1 as result.1, intermediate.1 as result.2)
```
This also required changing `ExpressionCodeGenerator.scala`, line 111 to:
```
def cleanExpr(e: Expression): Expression = {
e match {
case expressions.Naming(namedExpr, _) => cleanExpr(namedExpr)
case _ => e
}
}
```
because there is another bug that doesn't allow having nested Namings à la
`Naming(Naming(expr, intermediate.1, result.1))`.
> Table API do not support multi count in aggregation.
> ----------------------------------------------------
>
> Key: FLINK-3087
> URL: https://issues.apache.org/jira/browse/FLINK-3087
> Project: Flink
> Issue Type: Bug
> Components: Table API
> Affects Versions: 0.10.0
> Reporter: Chengxiang Li
> Assignee: Chengxiang Li
>
> Multi {{count}} in aggregation is not supported, for example:
> {code:java}
> table.select("a.count", "b.count")
> {code}
> It's valid in grammar, besides, {{a.count}} and {{b.count}} may have
> different values actually if NULL value handling is enabled.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)