tartarus created FLINK-21923:
--------------------------------
Summary: SplitAggregateRule will be abnormal, when the sum/count
and avg in SQL at the same time
Key: FLINK-21923
URL: https://issues.apache.org/jira/browse/FLINK-21923
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.10.0
Reporter: tartarus
Fix For: 1.13.0
SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation to
improve computing performance under data skew.
In the partial phase, avg will be translated into count and sum. If count
already exists in the original SQL at this time, the engine will remove the
duplicate count, and then add Project to calculate and restore the optimized
count result value.
{code:java}
relBuilder.aggregate(
relBuilder.groupKey(fullGroupSet,
ImmutableList.of[ImmutableBitSet](fullGroupSet)),
newPartialAggCalls)
relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
.setPartialFinalType(PartialFinalType.PARTIAL)
{code}
so `relBuilder.peek()` will return `FlinkLogicalCalc` not
`FlinkLogicalAggregate`,
then will throw exception like
{code:java}
java.lang.ClassCastException:
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be
cast to org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
at
org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
at
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
at
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
at
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
at
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
at
org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
{code}
We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`
{code:java}
@Test
def testAggBothWithAvgAndCount(): Unit = {
util.tableEnv.getConfig.getConfiguration.setBoolean(
OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
val sqlQuery =
s"""
|SELECT
| COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
| SUM(b) FILTER (WHERE NOT b = 5),
| count(b),
| AVG(b),
| sum(b)
|FROM MyTable
|GROUP BY a
""".stripMargin
util.verifyPlan(sqlQuery)
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)