[
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551505#comment-15551505
]
Anton Mushin commented on FLINK-4604:
-------------------------------------
I have some problem.
I added rule in
{{org.apache.flink.api.table.plan.rules.FlinkRuleSets#DATASET_OPT_RULES}}
{code}
.....
// aggregate union rule
AggregateUnionAggregateRule.INSTANCE,
AggregateReduceFunctionsRule.INSTANCE,
......
{code}
I create case classes in
{{org.apache.flink.api.table.expressions.aggregations.scala}} look like as
{code}
case class StddevPop(child: Expression) extends Aggregation {
override def toString = s"stddev_pop($child)"
override private[flink] def toAggCall(name: String)(implicit relBuilder:
RelBuilder): AggCall = {
relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, false, null, name,
child.toRexNode)
}
override private[flink] def resultType = child.resultType
override private[flink] def validateInput =
TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
}
{code}
and described new functions in
{{org.apache.flink.api.table.validate.FunctionCatalog#builtInFunctions}}
{code}
......
"sum" -> classOf[Sum],
"stddev_pop" -> classOf[StddevPop],
"stddev_samp" -> classOf[StddevSamp],
"var_pop" -> classOf[VarPop],
"var_samp" -> classOf[VarSamp],
.......
{code}
and in
{{org.apache.flink.api.table.validate.BasicOperatorTable#builtInSqlOperators}}
{code}
.........
// AGGREGATE OPERATORS
........
SqlStdOperatorTable.AVG,
SqlStdOperatorTable.STDDEV_POP,
SqlStdOperatorTable.STDDEV_SAMP,
SqlStdOperatorTable.VAR_POP,
SqlStdOperatorTable.VAR_SAMP,
........
{code}
Also I described functions in
{{org.apache.flink.api.table.expressions.ExpressionParser}} lool like as
{code}
lazy val STDDEV_POP: Keyword = Keyword("stddev_pop")
.....
lazy val suffixStddevPop: PackratParser[Expression] =
composite <~ "." ~ STDDEV_POP ~ opt("()") ^^ { e => StddevPop(e) }
......
lazy val prefixStddevPop: PackratParser[Expression] =
STDDEV_POP ~ "(" ~> expression <~ ")" ^^ { e => StddevPop(e) }
// and it added in ExpressionParser#functionIdent, ExpressionParser#suffixed,
ExpressionParser#prefixed as for other aggregate functions
{code}
and I added them into {{org.apache.flink.api.scala.table.expressionDsl.scala}}
{code}
......
def stddev_pop = StddevPop(expr)
def stddev_samp = StddevSamp(expr)
def var_pop = VarPop(expr)
def var_samp = VarSamp(expr)
{code}
Then I'm execute next sql query: {{SELECT
STDDEV_POP(_1),STDDEV_SAMP(_1),VAR_SAMP(_1),VAR_POP(_1) FROM table}}
In {{org.apache.flink.api.table.BatchTableEnvironment#translate}} I getting
dataset plan as
{noformat}
LogicalAggregate(group=[{}], EXPR$0=[STDDEV_POP($0)], EXPR$1=[STDDEV_SAMP($0)],
EXPR$2=[VAR_SAMP($0)], EXPR$3=[VAR_POP($0)])
LogicalProject(_1=[$0])
LogicalTableScan(table=[[_DataSetTable_0]])
{noformat}
When
{{org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate#translateToPlan}}
occur then {{org.apache.flink.api.table.runtime.aggregate.AvgAggregate}} is
called for each new function and becouse of it avg calculate for each function
Could you suggest what I might miss?
> Add support for standard deviation/variance
> -------------------------------------------
>
> Key: FLINK-4604
> URL: https://issues.apache.org/jira/browse/FLINK-4604
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Anton Mushin
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP,
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test
> and document this rule.
> If we also want to add this aggregates to Table API is up for discussion.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)