[ 
https://issues.apache.org/jira/browse/FLINK-21923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-21923:
-------------------------------
    Fix Version/s:     (was: 1.13.0)
                   1.13.1

> SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the 
> same time
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-21923
>                 URL: https://issues.apache.org/jira/browse/FLINK-21923
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: tartarus
>            Assignee: tartarus
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0, 1.13.1
>
>
> SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation 
> to improve computing performance under data skew.
> In the partial phase, avg will be translated into count and sum. If count 
> already exists in the original SQL at this time, the engine will remove the 
> duplicate count, and then add Project to calculate and restore the optimized 
> count result value.
> {code:java}
>     relBuilder.aggregate(
>       relBuilder.groupKey(fullGroupSet, 
> ImmutableList.of[ImmutableBitSet](fullGroupSet)),
>       newPartialAggCalls)
>     relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
>       .setPartialFinalType(PartialFinalType.PARTIAL)
> {code}
> so `relBuilder.peek()` will return `FlinkLogicalCalc` not 
> `FlinkLogicalAggregate`,
> then will throw exception like 
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be 
> cast to 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
>       at 
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
>       at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>       at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>       at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
>       at 
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
> {code}
> We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`
> {code:java}
>   @Test
>   def testAggFilterClauseBothWithAvgAndCount(): Unit = {
>     util.tableEnv.getConfig.getConfiguration.setBoolean(
>       OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
>     val sqlQuery =
>       s"""
>          |SELECT
>          |  a,
>          |  COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
>          |  SUM(b) FILTER (WHERE NOT b = 5),
>          |  COUNT(b),
>          |  AVG(b),
>          |  SUM(b)
>          |FROM MyTable
>          |GROUP BY a
>          |""".stripMargin
>     util.verifyRelPlan(sqlQuery)
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to