This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0412329dea41d8f1325b6eefc2ab0914142be007 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Sep 6 23:29:16 2022 +0200 [FLINK-29214][Table API/SQL] Remove usages of deprecated Aggregate#indicator --- .../planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java | 1 - .../table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java | 4 +--- .../rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java | 2 +- .../physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java | 2 +- .../planner/plan/rules/logical/AggregateReduceGroupingRule.scala | 3 +-- .../planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala | 3 ++- .../table/planner/plan/rules/logical/PruneAggregateCallRule.scala | 3 +-- .../planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala | 4 ---- .../plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala | 2 +- .../plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala | 2 +- .../physical/stream/StreamPhysicalGroupWindowAggregateRule.scala | 3 +-- .../rules/physical/stream/StreamPhysicalWindowAggregateRule.scala | 2 +- 12 files changed, 11 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java index 72e2b3e94e9..3f9415ea0c0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java @@ -488,7 +488,6 @@ public class FlinkAggregateJoinTransposeRule extends RelOptRule { aggregate.copy( aggregate.getTraitSet(), aggregate.getInput(), - aggregate.indicator, newGroupSet, com.google.common.collect.ImmutableList.of(newGroupSet), aggCalls); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java index 820156c8261..8f9227cd4b6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java @@ -73,9 +73,7 @@ public class FlinkAggregateRemoveRule extends RelOptRule { public boolean matches(RelOptRuleCall call) { final Aggregate aggregate = call.rel(0); final RelNode input = call.rel(1); - if (aggregate.getGroupCount() == 0 - || aggregate.indicator - || aggregate.getGroupType() != Aggregate.Group.SIMPLE) { + if (aggregate.getGroupCount() == 0 || aggregate.getGroupType() != Aggregate.Group.SIMPLE) { return false; } for (AggregateCall aggCall : aggregate.getAggCallList()) { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java index acae431f2a7..0bebe9aca32 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java @@ -62,7 +62,7 @@ public class StreamPhysicalPythonGroupAggregateRule extends ConverterRule { FlinkLogicalAggregate agg = call.rel(0); // check if we have grouping sets - if (agg.getGroupType() != Aggregate.Group.SIMPLE || agg.indicator) { + if (agg.getGroupType() != Aggregate.Group.SIMPLE) { throw new TableException("GROUPING SETS are currently not supported."); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java index cdb2f54d3cb..8d2d662ea31 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java @@ -69,7 +69,7 @@ public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule List<AggregateCall> aggCalls = agg.getAggCallList(); // check if we have grouping sets - if (agg.getGroupType() != Aggregate.Group.SIMPLE || agg.indicator) { + if (agg.getGroupType() != Aggregate.Group.SIMPLE) { throw new TableException("GROUPING SETS are currently not supported."); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala index d1845c685af..6ee02c5e1da 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala @@ -44,7 +44,7 @@ class AggregateReduceGroupingRule(relBuilderFactory: RelBuilderFactory) override def matches(call: RelOptRuleCall): Boolean = { val agg: Aggregate = call.rel(0) - agg.getGroupCount > 1 && agg.getGroupType == Group.SIMPLE && !agg.indicator + agg.getGroupCount > 1 && agg.getGroupType == Group.SIMPLE } override def onMatch(call: RelOptRuleCall): Unit = { @@ -106,7 +106,6 @@ class AggregateReduceGroupingRule(relBuilderFactory: RelBuilderFactory) val newAgg = agg.copy( agg.getTraitSet, input, - agg.indicator, // always false here newGrouping, ImmutableList.of(newGrouping), newAggCalls diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala index 901c136cfe1..33a1862142f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala @@ -36,6 +36,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories} import org.apache.calcite.rel.core.Aggregate.Group +import org.apache.calcite.rel.hint.RelHint import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject} import org.apache.calcite.rex._ import org.apache.calcite.sql.`type`.SqlTypeUtil @@ -98,7 +99,7 @@ abstract class LogicalWindowAggregateRuleBase(description: String) // we don't use the builder here because it uses RelMetadataQuery which affects the plan val newAgg = LogicalAggregate.create( newProject, - agg.indicator, + ImmutableList.of[RelHint](), newGroupSet, ImmutableList.of(newGroupSet), finalCalls) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala index ee378cc4ffb..bd7c479fea3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala @@ -45,7 +45,7 @@ abstract class PruneAggregateCallRule[T <: RelNode](topClass: Class[T]) val relOnAgg: T = call.rel(0) val agg: Aggregate = call.rel(1) if ( - agg.indicator || agg.getGroupType != Group.SIMPLE || agg.getAggCallList.isEmpty || + agg.getGroupType != Group.SIMPLE || agg.getAggCallList.isEmpty || // at least output one column (agg.getGroupCount == 0 && agg.getAggCallList.size() == 1) ) { @@ -91,7 +91,6 @@ abstract class PruneAggregateCallRule[T <: RelNode](topClass: Class[T]) val newAgg = agg.copy( agg.getTraitSet, agg.getInput, - agg.indicator, agg.getGroupSet, ImmutableList.of(agg.getGroupSet), newAggCalls diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala index f6ad3a06f89..355c1d0779a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala @@ -79,10 +79,6 @@ class BatchPhysicalHashAggRule val input: RelNode = call.rel(1) val inputRowType = input.getRowType - if (agg.indicator) { - throw new UnsupportedOperationException("Not support group sets aggregate now.") - } - val groupSet = agg.getGroupSet.toArray val (auxGroupSet, aggCallsWithoutAuxGroupCalls) = AggregateUtil.checkAndSplitAggCalls(agg) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala index 13ddd5c12c6..0402b0bd861 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala @@ -84,7 +84,7 @@ class BatchPhysicalWindowAggregateRule // check if we have grouping sets val groupSets = agg.getGroupType != Group.SIMPLE - if (groupSets || agg.indicator) { + if (groupSets) { throw new TableException("GROUPING SETS are currently not supported.") } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala index a7f71c2e515..13ad78c5850 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala @@ -41,7 +41,7 @@ class StreamPhysicalGroupAggregateRule(config: Config) extends ConverterRule(con val agg: FlinkLogicalAggregate = call.rel(0) // check if we have grouping sets - if (agg.getGroupType != Group.SIMPLE || agg.indicator) { + if (agg.getGroupType != Group.SIMPLE) { throw new TableException("GROUPING SETS are currently not supported.") } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala index 9bfaebdd07c..13998116eac 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala @@ -43,8 +43,7 @@ class StreamPhysicalGroupWindowAggregateRule(config: Config) extends ConverterRu val agg: FlinkLogicalWindowAggregate = call.rel(0) // check if we have grouping sets - val groupSets = agg.getGroupType != Group.SIMPLE - if (groupSets || agg.indicator) { + if (agg.getGroupType != Group.SIMPLE) { throw new TableException("GROUPING SETS are currently not supported.") } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala index 4778c4c9c0a..aec1b4dd32d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala @@ -46,7 +46,7 @@ class StreamPhysicalWindowAggregateRule(config: Config) extends ConverterRule(co val agg: FlinkLogicalAggregate = call.rel(0) // check if we have grouping sets - if (agg.getGroupType != Group.SIMPLE || agg.indicator) { + if (agg.getGroupType != Group.SIMPLE) { throw new TableException("GROUPING SETS are currently not supported.") }
