[
https://issues.apache.org/jira/browse/FLINK-21923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17306820#comment-17306820
]
tartarus commented on FLINK-21923:
----------------------------------
[~jark] Please assign it to me, thanks
> SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the
> same time
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-21923
> URL: https://issues.apache.org/jira/browse/FLINK-21923
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0
> Reporter: tartarus
> Priority: Major
> Fix For: 1.13.0
>
>
> SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation
> to improve computing performance under data skew.
> In the partial phase, avg will be translated into count and sum. If count
> already exists in the original SQL at this time, the engine will remove the
> duplicate count, and then add Project to calculate and restore the optimized
> count result value.
> {code:java}
> relBuilder.aggregate(
> relBuilder.groupKey(fullGroupSet,
> ImmutableList.of[ImmutableBitSet](fullGroupSet)),
> newPartialAggCalls)
> relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
> .setPartialFinalType(PartialFinalType.PARTIAL)
> {code}
> so `relBuilder.peek()` will return `FlinkLogicalCalc` not
> `FlinkLogicalAggregate`,
> then will throw exception like
> {code:java}
> java.lang.ClassCastException:
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be
> cast to
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
> at
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
> at
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
> at
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
> at
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
> at
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
> {code}
> We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`
> {code:java}
> @Test
> def testAggBothWithAvgAndCount(): Unit = {
> util.tableEnv.getConfig.getConfiguration.setBoolean(
> OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
> val sqlQuery =
> s"""
> |SELECT
> | COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
> | SUM(b) FILTER (WHERE NOT b = 5),
> | count(b),
> | AVG(b),
> | sum(b)
> |FROM MyTable
> |GROUP BY a
> """.stripMargin
> util.verifyPlan(sqlQuery)
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)