This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0412329dea41d8f1325b6eefc2ab0914142be007
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Sep 6 23:29:16 2022 +0200

    [FLINK-29214][Table API/SQL] Remove usages of deprecated Aggregate#indicator
---
 .../planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java   | 1 -
 .../table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java    | 4 +---
 .../rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java | 2 +-
 .../physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java | 2 +-
 .../planner/plan/rules/logical/AggregateReduceGroupingRule.scala      | 3 +--
 .../planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala   | 3 ++-
 .../table/planner/plan/rules/logical/PruneAggregateCallRule.scala     | 3 +--
 .../planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala  | 4 ----
 .../plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala  | 2 +-
 .../plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala | 2 +-
 .../physical/stream/StreamPhysicalGroupWindowAggregateRule.scala      | 3 +--
 .../rules/physical/stream/StreamPhysicalWindowAggregateRule.scala     | 2 +-
 12 files changed, 11 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java
index 72e2b3e94e9..3f9415ea0c0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateJoinTransposeRule.java
@@ -488,7 +488,6 @@ public class FlinkAggregateJoinTransposeRule extends 
RelOptRule {
                     aggregate.copy(
                             aggregate.getTraitSet(),
                             aggregate.getInput(),
-                            aggregate.indicator,
                             newGroupSet,
                             
com.google.common.collect.ImmutableList.of(newGroupSet),
                             aggCalls);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java
index 820156c8261..8f9227cd4b6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRule.java
@@ -73,9 +73,7 @@ public class FlinkAggregateRemoveRule extends RelOptRule {
     public boolean matches(RelOptRuleCall call) {
         final Aggregate aggregate = call.rel(0);
         final RelNode input = call.rel(1);
-        if (aggregate.getGroupCount() == 0
-                || aggregate.indicator
-                || aggregate.getGroupType() != Aggregate.Group.SIMPLE) {
+        if (aggregate.getGroupCount() == 0 || aggregate.getGroupType() != 
Aggregate.Group.SIMPLE) {
             return false;
         }
         for (AggregateCall aggCall : aggregate.getAggCallList()) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
index acae431f2a7..0bebe9aca32 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupAggregateRule.java
@@ -62,7 +62,7 @@ public class StreamPhysicalPythonGroupAggregateRule extends 
ConverterRule {
         FlinkLogicalAggregate agg = call.rel(0);
 
         // check if we have grouping sets
-        if (agg.getGroupType() != Aggregate.Group.SIMPLE || agg.indicator) {
+        if (agg.getGroupType() != Aggregate.Group.SIMPLE) {
             throw new TableException("GROUPING SETS are currently not 
supported.");
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
index cdb2f54d3cb..8d2d662ea31 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
@@ -69,7 +69,7 @@ public class StreamPhysicalPythonGroupWindowAggregateRule 
extends ConverterRule
         List<AggregateCall> aggCalls = agg.getAggCallList();
 
         // check if we have grouping sets
-        if (agg.getGroupType() != Aggregate.Group.SIMPLE || agg.indicator) {
+        if (agg.getGroupType() != Aggregate.Group.SIMPLE) {
             throw new TableException("GROUPING SETS are currently not 
supported.");
         }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala
index d1845c685af..6ee02c5e1da 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRule.scala
@@ -44,7 +44,7 @@ class AggregateReduceGroupingRule(relBuilderFactory: 
RelBuilderFactory)
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val agg: Aggregate = call.rel(0)
-    agg.getGroupCount > 1 && agg.getGroupType == Group.SIMPLE && !agg.indicator
+    agg.getGroupCount > 1 && agg.getGroupType == Group.SIMPLE
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
@@ -106,7 +106,6 @@ class AggregateReduceGroupingRule(relBuilderFactory: 
RelBuilderFactory)
     val newAgg = agg.copy(
       agg.getTraitSet,
       input,
-      agg.indicator, // always false here
       newGrouping,
       ImmutableList.of(newGrouping),
       newAggCalls
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
index 901c136cfe1..33a1862142f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalWindowAggregateRuleBase.scala
@@ -36,6 +36,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{Aggregate, AggregateCall, RelFactories}
 import org.apache.calcite.rel.core.Aggregate.Group
+import org.apache.calcite.rel.hint.RelHint
 import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeUtil
@@ -98,7 +99,7 @@ abstract class LogicalWindowAggregateRuleBase(description: 
String)
     // we don't use the builder here because it uses RelMetadataQuery which 
affects the plan
     val newAgg = LogicalAggregate.create(
       newProject,
-      agg.indicator,
+      ImmutableList.of[RelHint](),
       newGroupSet,
       ImmutableList.of(newGroupSet),
       finalCalls)
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala
index ee378cc4ffb..bd7c479fea3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PruneAggregateCallRule.scala
@@ -45,7 +45,7 @@ abstract class PruneAggregateCallRule[T <: RelNode](topClass: 
Class[T])
     val relOnAgg: T = call.rel(0)
     val agg: Aggregate = call.rel(1)
     if (
-      agg.indicator || agg.getGroupType != Group.SIMPLE || 
agg.getAggCallList.isEmpty ||
+      agg.getGroupType != Group.SIMPLE || agg.getAggCallList.isEmpty ||
       // at least output one column
       (agg.getGroupCount == 0 && agg.getAggCallList.size() == 1)
     ) {
@@ -91,7 +91,6 @@ abstract class PruneAggregateCallRule[T <: RelNode](topClass: 
Class[T])
     val newAgg = agg.copy(
       agg.getTraitSet,
       agg.getInput,
-      agg.indicator,
       agg.getGroupSet,
       ImmutableList.of(agg.getGroupSet),
       newAggCalls
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala
index f6ad3a06f89..355c1d0779a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala
@@ -79,10 +79,6 @@ class BatchPhysicalHashAggRule
     val input: RelNode = call.rel(1)
     val inputRowType = input.getRowType
 
-    if (agg.indicator) {
-      throw new UnsupportedOperationException("Not support group sets 
aggregate now.")
-    }
-
     val groupSet = agg.getGroupSet.toArray
     val (auxGroupSet, aggCallsWithoutAuxGroupCalls) = 
AggregateUtil.checkAndSplitAggCalls(agg)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
index 13ddd5c12c6..0402b0bd861 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala
@@ -84,7 +84,7 @@ class BatchPhysicalWindowAggregateRule
 
     // check if we have grouping sets
     val groupSets = agg.getGroupType != Group.SIMPLE
-    if (groupSets || agg.indicator) {
+    if (groupSets) {
       throw new TableException("GROUPING SETS are currently not supported.")
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
index a7f71c2e515..13ad78c5850 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupAggregateRule.scala
@@ -41,7 +41,7 @@ class StreamPhysicalGroupAggregateRule(config: Config) 
extends ConverterRule(con
     val agg: FlinkLogicalAggregate = call.rel(0)
 
     // check if we have grouping sets
-    if (agg.getGroupType != Group.SIMPLE || agg.indicator) {
+    if (agg.getGroupType != Group.SIMPLE) {
       throw new TableException("GROUPING SETS are currently not supported.")
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
index 9bfaebdd07c..13998116eac 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalGroupWindowAggregateRule.scala
@@ -43,8 +43,7 @@ class StreamPhysicalGroupWindowAggregateRule(config: Config) 
extends ConverterRu
     val agg: FlinkLogicalWindowAggregate = call.rel(0)
 
     // check if we have grouping sets
-    val groupSets = agg.getGroupType != Group.SIMPLE
-    if (groupSets || agg.indicator) {
+    if (agg.getGroupType != Group.SIMPLE) {
       throw new TableException("GROUPING SETS are currently not supported.")
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
index 4778c4c9c0a..aec1b4dd32d 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalWindowAggregateRule.scala
@@ -46,7 +46,7 @@ class StreamPhysicalWindowAggregateRule(config: Config) 
extends ConverterRule(co
     val agg: FlinkLogicalAggregate = call.rel(0)
 
     // check if we have grouping sets
-    if (agg.getGroupType != Group.SIMPLE || agg.indicator) {
+    if (agg.getGroupType != Group.SIMPLE) {
       throw new TableException("GROUPING SETS are currently not supported.")
     }
 

Reply via email to