[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15551505#comment-15551505
 ] 

Anton Mushin commented on FLINK-4604:
-------------------------------------

I have some problem.
I added rule in 
{{org.apache.flink.api.table.plan.rules.FlinkRuleSets#DATASET_OPT_RULES}}
{code}
.....
       // aggregate union rule
    AggregateUnionAggregateRule.INSTANCE,

    AggregateReduceFunctionsRule.INSTANCE,
......
{code}
I create case classes in 
{{org.apache.flink.api.table.expressions.aggregations.scala}} look like as
{code}
case class StddevPop(child: Expression) extends Aggregation {
  override def toString = s"stddev_pop($child)"

  override private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall = {
    relBuilder.aggregateCall(SqlStdOperatorTable.STDDEV_POP, false, null, name, 
child.toRexNode)
  }

  override private[flink] def resultType = child.resultType

  override private[flink] def validateInput =
    TypeCheckUtils.assertNumericExpr(child.resultType, "stddev_pop")
}
{code}

and described new functions in  
{{org.apache.flink.api.table.validate.FunctionCatalog#builtInFunctions}}
{code}
......
"sum" -> classOf[Sum],
    "stddev_pop" -> classOf[StddevPop],
    "stddev_samp" -> classOf[StddevSamp],
    "var_pop" -> classOf[VarPop],
    "var_samp" -> classOf[VarSamp],
.......
{code}
and in 
{{org.apache.flink.api.table.validate.BasicOperatorTable#builtInSqlOperators}}
{code}
.........
// AGGREGATE OPERATORS
........
    SqlStdOperatorTable.AVG,
    SqlStdOperatorTable.STDDEV_POP,
    SqlStdOperatorTable.STDDEV_SAMP,
    SqlStdOperatorTable.VAR_POP,
    SqlStdOperatorTable.VAR_SAMP,
........
{code}
Also I described functions in 
{{org.apache.flink.api.table.expressions.ExpressionParser}} lool like as
{code}
lazy val STDDEV_POP: Keyword = Keyword("stddev_pop")
.....
lazy val suffixStddevPop: PackratParser[Expression] =
    composite <~ "." ~ STDDEV_POP ~ opt("()") ^^ { e => StddevPop(e) }

......
lazy val prefixStddevPop: PackratParser[Expression] =
    STDDEV_POP ~ "(" ~> expression <~ ")" ^^ { e => StddevPop(e) }

// and it added in ExpressionParser#functionIdent, ExpressionParser#suffixed, 
ExpressionParser#prefixed as for other aggregate functions 
{code}
and I added them into {{org.apache.flink.api.scala.table.expressionDsl.scala}}
{code}
......
def stddev_pop = StddevPop(expr)
  def stddev_samp = StddevSamp(expr)
  def var_pop = VarPop(expr)
  def var_samp = VarSamp(expr)
{code}


Then I'm execute next sql query: {{SELECT 
STDDEV_POP(_1),STDDEV_SAMP(_1),VAR_SAMP(_1),VAR_POP(_1) FROM table}}
In {{org.apache.flink.api.table.BatchTableEnvironment#translate}} I getting 
dataset plan as
{noformat}
LogicalAggregate(group=[{}], EXPR$0=[STDDEV_POP($0)], EXPR$1=[STDDEV_SAMP($0)], 
EXPR$2=[VAR_SAMP($0)], EXPR$3=[VAR_POP($0)])
  LogicalProject(_1=[$0])
    LogicalTableScan(table=[[_DataSetTable_0]])
{noformat}

When 
{{org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate#translateToPlan}}
 occur then {{org.apache.flink.api.table.runtime.aggregate.AvgAggregate}} is 
called for each new function and becouse of it avg calculate for each function
Could you suggest what I might miss?

> Add support for standard deviation/variance
> -------------------------------------------
>
>                 Key: FLINK-4604
>                 URL: https://issues.apache.org/jira/browse/FLINK-4604
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Anton Mushin
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
> and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to