[ https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551505#comment-15551505 ]
Anton Mushin commented on FLINK-4604: ------------------------------------- I have some problem. I added rule in {{org.apache.flink.api.table.plan.rules.FlinkRuleSets#DATASET_OPT_RULES}} {code} ..... // aggregate union rule AggregateUnionAggregateRule.INSTANCE, AggregateReduceFunctionsRule.INSTANCE, ...... {code} I create case classes in {{org.apache.flink.api.table.expressions.aggregations.scala}} look like as {code} case class StddevPop(child: Expression) extends Aggregation { override def toString = s"stddev_pop($child)" override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, false, null, name, child.toRexNode) } override private[flink] def resultType = child.resultType override private[flink] def validateInput = TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_pop") } {code} and described new functions in {{org.apache.flink.api.table.validate.FunctionCatalog#builtInFunctions}} {code} ...... "sum" -> classOf[Sum], "stddev_pop" -> classOf[StddevPop], "stddev_samp" -> classOf[StddevSamp], "var_pop" -> classOf[VarPop], "var_samp" -> classOf[VarSamp], ....... {code} and in {{org.apache.flink.api.table.validate.BasicOperatorTable#builtInSqlOperators}} {code} ......... // AGGREGATE OPERATORS ........ SqlStdOperatorTable.AVG, SqlStdOperatorTable.STDDEV_POP, SqlStdOperatorTable.STDDEV_SAMP, SqlStdOperatorTable.VAR_POP, SqlStdOperatorTable.VAR_SAMP, ........ {code} Also I described functions in {{org.apache.flink.api.table.expressions.ExpressionParser}} lool like as {code} lazy val STDDEV_POP: Keyword = Keyword("stddev_pop") ..... lazy val suffixStddevPop: PackratParser[Expression] = composite <~ "." ~ STDDEV_POP ~ opt("()") ^^ { e => StddevPop(e) } ...... lazy val prefixStddevPop: PackratParser[Expression] = STDDEV_POP ~ "(" ~> expression <~ ")" ^^ { e => StddevPop(e) } // and it added in ExpressionParser#functionIdent, ExpressionParser#suffixed, ExpressionParser#prefixed as for other aggregate functions {code} and I added them into {{org.apache.flink.api.scala.table.expressionDsl.scala}} {code} ...... def stddev_pop = StddevPop(expr) def stddev_samp = StddevSamp(expr) def var_pop = VarPop(expr) def var_samp = VarSamp(expr) {code} Then I'm execute next sql query: {{SELECT STDDEV_POP(_1),STDDEV_SAMP(_1),VAR_SAMP(_1),VAR_POP(_1) FROM table}} In {{org.apache.flink.api.table.BatchTableEnvironment#translate}} I getting dataset plan as {noformat} LogicalAggregate(group=[{}], EXPR$0=[STDDEV_POP($0)], EXPR$1=[STDDEV_SAMP($0)], EXPR$2=[VAR_SAMP($0)], EXPR$3=[VAR_POP($0)]) LogicalProject(_1=[$0]) LogicalTableScan(table=[[_DataSetTable_0]]) {noformat} When {{org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate#translateToPlan}} occur then {{org.apache.flink.api.table.runtime.aggregate.AvgAggregate}} is called for each new function and becouse of it avg calculate for each function Could you suggest what I might miss? > Add support for standard deviation/variance > ------------------------------------------- > > Key: FLINK-4604 > URL: https://issues.apache.org/jira/browse/FLINK-4604 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Timo Walther > Assignee: Anton Mushin > > Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, > STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test > and document this rule. > If we also want to add this aggregates to Table API is up for discussion. -- This message was sent by Atlassian JIRA (v6.3.4#6332)