[ 
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15554768#comment-15554768
 ] 

Anton Mushin commented on FLINK-4604:
-------------------------------------

I changed method 
{{org.apache.flink.api.table.plan.nodes.dataset.DataSetAggregate#computeSelfCost}}
it is impermanent implementation for examle
{code}
override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
    val child = this.getInput
    val rowCnt = metadata.getRowCount(child)
    val rowSize = this.estimateRowSize(child.getRowType)
    val aggCnt = this.namedAggregates.size
    var resultCost = planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, 
rowCnt * rowSize)
    this.namedAggregates.foreach(x=>{
      x.getKey.getAggregation.getKind match {
        case SqlKind.STDDEV_POP =>
          resultCost = resultCost.plus(planner.getCostFactory.makeCost(rowCnt, 
rowCnt * aggCnt, rowCnt * rowSize))
        case SqlKind.STDDEV_SAMP =>
          resultCost = resultCost.plus(planner.getCostFactory.makeCost(rowCnt, 
rowCnt * aggCnt, rowCnt * rowSize))
        case SqlKind.VAR_SAMP =>
          resultCost = resultCost.plus(planner.getCostFactory.makeCost(rowCnt, 
rowCnt * aggCnt, rowCnt * rowSize))
        case SqlKind.VAR_POP =>
          resultCost = resultCost.plus(planner.getCostFactory.makeCost(rowCnt, 
rowCnt * aggCnt, rowCnt * rowSize))
        case default => None
      }
    })
    resultCost
  }
{code}
and i got next plan:
{noformat}
DataSetCalc(select=[CAST(POWER(/(-($f0, /(*($f1, $f1), $f2)), $f2), 0.5)) AS 
$f0, CAST(POWER(/(-($f3, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, -($f2, 
1))), 0.5)) AS $f1, CAST(/(-($f4, /(*($f1, $f1), $f2)), CASE(=($f2, 1), null, 
-($f2, 1)))) AS $f2, CAST(/(-($f5, /(*($f1, $f1), $f2)), $f2)) AS $f3])
  DataSetAggregate(select=[SUM($f6) AS $f0, SUM(_1) AS $f1, COUNT(_1) AS $f2, 
SUM($f7) AS $f3, SUM($f8) AS $f4, SUM($f9) AS $f5])
    DataSetCalc(select=[_1, _2, _3, _4, _5, _6])
      DataSetScan(table=[[_DataSetTable_0]])

LogicalAggregate(group=[{}], EXPR$0=[STDDEV_POP($0)], EXPR$1=[STDDEV_SAMP($0)], 
EXPR$2=[VAR_SAMP($0)], EXPR$3=[VAR_POP($0)])
  LogicalProject(_1=[$0])
    LogicalTableScan(table=[[_DataSetTable_0]])
{noformat}

> Add support for standard deviation/variance
> -------------------------------------------
>
>                 Key: FLINK-4604
>                 URL: https://issues.apache.org/jira/browse/FLINK-4604
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: Anton Mushin
>         Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP, 
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test 
> and document this rule. 
> If we also want to add this aggregates to Table API is up for discussion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to