[
https://issues.apache.org/jira/browse/FLINK-4604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568664#comment-15568664
]
Anton Mushin commented on FLINK-4604:
-------------------------------------
I tried check function in
{{org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule#matches}},
but something went wrong :)
I did so
{code:title=org.apache.flink.api.table.plan.rules.dataSet.DataSetAggregateRule}
override def matches(call: RelOptRuleCall): Boolean = {
val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate]
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
if (distinctAggs) {
throw new TableException("DISTINCT aggregates are currently not
supported.")
}
// check if we have grouping sets
val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) !=
agg.getGroupSet
if (groupSets || agg.indicator) {
throw new TableException("GROUPING SETS are currently not supported.")
}
(!distinctAggs && !groupSets && !agg.indicator) &&
!AggregateReduceFunctionsRule.INSTANCE.matches(call)
}
{code}
And I got next plan and exception:
{noformat}
DataSetCalc(select=[CAST(/(-(CASE(=($f1, 0), null, $f0), /(*(CASE(=($f3, 0),
null, $f2), CASE(=($f3, 0), null, $f2)), $f3)), CASE(=($f3, 1), null, -($f3,
1)))) AS $f0, CAST(/(-(CASE(=($f5, 0), null, $f4), /(*(CASE(=($f7, 0), null,
$f6), CASE(=($f7, 0), null, $f6)), $f7)), CASE(=($f7, 1), null, -($f7, 1)))) AS
$f1, CAST(/(-(CASE(=($f9, 0), null, $f8), /(*(CASE(=($f11, 0), null, $f10),
CASE(=($f11, 0), null, $f10)), $f11)), CASE(=($f11, 1), null, -($f11, 1)))) AS
$f2, CAST(/(-(CASE(=($f13, 0), null, $f12), /(*(CASE(=($f15, 0), null, $f14),
CASE(=($f15, 0), null, $f14)), $f15)), CASE(=($f15, 1), null, -($f15, 1)))) AS
$f3, CAST(/(-(CASE(=($f17, 0), null, $f16), /(*(CASE(=($f19, 0), null, $f18),
CASE(=($f19, 0), null, $f18)), $f19)), CASE(=($f19, 1), null, -($f19, 1)))) AS
$f4, CAST(/(-(CASE(=($f21, 0), null, $f20), /(*(CASE(=($f23, 0), null, $f22),
CASE(=($f23, 0), null, $f22)), $f23)), CASE(=($f23, 1), null, -($f23, 1)))) AS
$f5])
DataSetAggregate(select=[$SUM0($f6) AS $f0, COUNT($f6) AS $f1, $SUM0(_1) AS
$f2, COUNT(_1) AS $f3, $SUM0($f7) AS $f4, COUNT($f7) AS $f5, $SUM0(_2) AS $f6,
COUNT(_2) AS $f7, $SUM0($f8) AS $f8, COUNT($f8) AS $f9, $SUM0(_3) AS $f10,
COUNT(_3) AS $f11, $SUM0($f9) AS $f12, COUNT($f9) AS $f13, $SUM0(_4) AS $f14,
COUNT(_4) AS $f15, $SUM0($f10) AS $f16, COUNT($f10) AS $f17, $SUM0(_5) AS $f18,
COUNT(_5) AS $f19, $SUM0($f11) AS $f20, COUNT($f11) AS $f21, $SUM0(_6) AS $f22,
COUNT(_6) AS $f23])
DataSetCalc(select=[_1, _2, _3, _4, _5, _6])
DataSetScan(table=[[_DataSetTable_0]])
{noformat}
{noformat}
org.apache.flink.api.table.TableException: Type NULL is not supported. Null
values must have a supported type.
at
org.apache.flink.api.table.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:128)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:553)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitLiteral(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:658)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$14.apply(CodeGenerator.scala:675)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:675)
at
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
at
org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:181)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300)
at
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:300)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:300)
at
org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
at
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
at
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:274)
at
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
at
org.apache.flink.api.scala.table.TableConversions.toDataSet(TableConversions.scala:41)
at
org.apache.flink.api.scala.batch.sql.AggregationsITCase.testVarSampAggregate(AggregationsITCase.scala:369)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runners.Suite.runChild(Suite.java:127)
at org.junit.runners.Suite.runChild(Suite.java:26)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
{noformat}
then I remove {{!AggregateReduceFunctionsRule.INSTANCE.matches(call)}} and
return [this
code|https://issues.apache.org/jira/browse/FLINK-4604?focusedCommentId=15554768&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15554768],
tests is passed.
I search resolution this problem, do you have any ideas about it?
> Add support for standard deviation/variance
> -------------------------------------------
>
> Key: FLINK-4604
> URL: https://issues.apache.org/jira/browse/FLINK-4604
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Anton Mushin
> Attachments: 1.jpg
>
>
> Calcite's {{AggregateReduceFunctionsRule}} can convert SQL {{AVG, STDDEV_POP,
> STDDEV_SAMP, VAR_POP, VAR_SAMP}} to sum/count functions. We should add, test
> and document this rule.
> If we also want to add this aggregates to Table API is up for discussion.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)