[ 
https://issues.apache.org/jira/browse/FLINK-21923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tartarus updated FLINK-21923:
-----------------------------
    Description: 
SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation to 
improve computing performance under data skew.
In the partial phase, avg will be translated into count and sum. If count 
already exists in the original SQL at this time, the engine will remove the 
duplicate count, and then add Project to calculate and restore the optimized 
count result value.
{code:java}
    relBuilder.aggregate(
      relBuilder.groupKey(fullGroupSet, 
ImmutableList.of[ImmutableBitSet](fullGroupSet)),
      newPartialAggCalls)
    relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
      .setPartialFinalType(PartialFinalType.PARTIAL)
{code}
so `relBuilder.peek()` will return `FlinkLogicalCalc` not 
`FlinkLogicalAggregate`,
then will throw exception like 
{code:java}
java.lang.ClassCastException: 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be 
cast to org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate

        at 
org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
        at 
org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
{code}
We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`

{code:java}
 @Test
  def testAggFilterClauseBothWithAvgAndCount(): Unit = {
    util.tableEnv.getConfig.getConfiguration.setBoolean(
      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
    val sqlQuery =
      s"""
         |SELECT
         |  a,
         |  COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
         |  SUM(b) FILTER (WHERE NOT b = 5),
         |  COUNT(b),
         |  AVG(b),
         |  SUM(b)
         |FROM MyTable
         |GROUP BY a
         |""".stripMargin
    util.verifyRelPlan(sqlQuery)
  }
{code}



  was:
SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation to 
improve computing performance under data skew.
In the partial phase, avg will be translated into count and sum. If count 
already exists in the original SQL at this time, the engine will remove the 
duplicate count, and then add Project to calculate and restore the optimized 
count result value.
{code:java}
    relBuilder.aggregate(
      relBuilder.groupKey(fullGroupSet, 
ImmutableList.of[ImmutableBitSet](fullGroupSet)),
      newPartialAggCalls)
    relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
      .setPartialFinalType(PartialFinalType.PARTIAL)
{code}
so `relBuilder.peek()` will return `FlinkLogicalCalc` not 
`FlinkLogicalAggregate`,
then will throw exception like 
{code:java}
java.lang.ClassCastException: 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be 
cast to org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate

        at 
org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
        at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
        at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
        at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
        at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
        at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
        at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
        at 
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
        at 
org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
{code}
We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`

{code:java}
    @Test
  def testAggFilterClauseBothWithAvgAndCount(): Unit = {
    util.tableEnv.getConfig.getConfiguration.setBoolean(
      OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
    val sqlQuery =
      s"""
         |SELECT
         |  a,
         |  COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
         |  SUM(b) FILTER (WHERE NOT b = 5),
         |  COUNT(b),
         |  AVG(b),
         |  SUM(b)
         |FROM MyTable
         |GROUP BY a
         |""".stripMargin
    util.verifyRelPlan(sqlQuery)
  }
{code}




> SplitAggregateRule will be abnormal, when the sum/count and avg in SQL at the 
> same time
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-21923
>                 URL: https://issues.apache.org/jira/browse/FLINK-21923
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: tartarus
>            Assignee: tartarus
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0
>
>
> SplitAggregateRule optimizes one-layer aggregation to two-layer aggregation 
> to improve computing performance under data skew.
> In the partial phase, avg will be translated into count and sum. If count 
> already exists in the original SQL at this time, the engine will remove the 
> duplicate count, and then add Project to calculate and restore the optimized 
> count result value.
> {code:java}
>     relBuilder.aggregate(
>       relBuilder.groupKey(fullGroupSet, 
> ImmutableList.of[ImmutableBitSet](fullGroupSet)),
>       newPartialAggCalls)
>     relBuilder.peek().asInstanceOf[FlinkLogicalAggregate]
>       .setPartialFinalType(PartialFinalType.PARTIAL)
> {code}
> so `relBuilder.peek()` will return `FlinkLogicalCalc` not 
> `FlinkLogicalAggregate`,
> then will throw exception like 
> {code:java}
> java.lang.ClassCastException: 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc cannot be 
> cast to 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalAggregate
>       at 
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRule.onMatch(SplitAggregateRule.scala:286)
>       at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333)
>       at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:407)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243)
>       at 
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:202)
>       at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:189)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>       at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>       at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:284)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:889)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:780)
>       at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyPlan(TableTestBase.scala:283)
>       at 
> org.apache.flink.table.planner.plan.rules.logical.SplitAggregateRuleTest.testAggWithFilterClause2(SplitAggregateRuleTest.scala:205)
> {code}
> We can reproduce stably and pass the test cases in `SplitAggregateRuleTest`
> {code:java}
>  @Test
>   def testAggFilterClauseBothWithAvgAndCount(): Unit = {
>     util.tableEnv.getConfig.getConfiguration.setBoolean(
>       OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true)
>     val sqlQuery =
>       s"""
>          |SELECT
>          |  a,
>          |  COUNT(DISTINCT b) FILTER (WHERE NOT b = 2),
>          |  SUM(b) FILTER (WHERE NOT b = 5),
>          |  COUNT(b),
>          |  AVG(b),
>          |  SUM(b)
>          |FROM MyTable
>          |GROUP BY a
>          |""".stripMargin
>     util.verifyRelPlan(sqlQuery)
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to