[
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16402335#comment-16402335
]
ASF GitHub Bot commented on FLINK-8903:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175181100
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the
WindowAggregateReduceFunctionsRule
+ val supported =
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP |
SqlKind.VAR_SAMP => false
--- End diff --
Replacing the current code by `SqlKind.AVG_AGG_FUNCTIONS.contains()` lead
to several test failures. These tests expected an `AVG` aggregation function
that was now replaced by `SUM / COUNT`.
> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in
> Group Windows
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.2, 1.5.0, 1.4.2
> Reporter: lilizhao
> Assignee: Fabian Hueske
> Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP
> are translated into regular AVG functions if they are applied in the context
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating
> Calcite aggregation functions to Flink Table agg functions, we only look at
> the type of the class, not at the value of the {{kind}} field. We did not
> notice that before, because in all other cases (regular {{GROUP BY}} without
> windows or {{OVER}} windows, we have a translation rule
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions
> into expressions of {{COUNT}} and {{SUM}} functions such that we never
> execute an {{AVG}} Flink function. That rule can only be applied on
> {{LogicalAggregate}}, however, we represent group windows as
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to
> {{SqlKind.AVG}}.
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}})
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the
> rule.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)