This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ded9980df21 [FLINK-34596][table] Migrate 
RemoveRedundantLocalHashAggRule to java
ded9980df21 is described below

commit ded9980df2126538b999ee08183d354e00529a5f
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Jan 1 23:02:10 2026 +0100

    [FLINK-34596][table] Migrate RemoveRedundantLocalHashAggRule to java
---
 .../batch/RemoveRedundantLocalHashAggRule.java     | 101 +++++++++++++++++++++
 .../batch/RemoveRedundantLocalHashAggRule.scala    |  63 -------------
 2 files changed, 101 insertions(+), 63 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java
new file mode 100644
index 00000000000..25b1a4c94da
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+/**
+ * There maybe exist a subTree like localHashAggregate -> globalHashAggregate 
which the middle
+ * shuffle is removed. The rule could remove redundant localHashAggregate node.
+ */
[email protected]
+public class RemoveRedundantLocalHashAggRule
+        extends 
RelRule<RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig> {
+
+    public static final RemoveRedundantLocalHashAggRule INSTANCE =
+            
RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig.DEFAULT.toRule();
+
+    protected 
RemoveRedundantLocalHashAggRule(RemoveRedundantLocalHashAggRuleConfig config) {
+        super(config);
+    }
+
+    public void onMatch(RelOptRuleCall call) {
+        BatchPhysicalHashAggregate globalAgg = call.rel(0);
+        BatchPhysicalLocalHashAggregate localAgg = call.rel(1);
+        RelNode inputOfLocalAgg = localAgg.getInput();
+        BatchPhysicalHashAggregate newGlobalAgg =
+                new BatchPhysicalHashAggregate(
+                        globalAgg.getCluster(),
+                        globalAgg.getTraitSet(),
+                        inputOfLocalAgg,
+                        globalAgg.getRowType(),
+                        inputOfLocalAgg.getRowType(),
+                        inputOfLocalAgg.getRowType(),
+                        localAgg.grouping(),
+                        localAgg.auxGrouping(),
+                        // Use the localAgg agg calls because the global agg 
call filters was
+                        // removed,
+                        // see BatchPhysicalHashAggRule for details.
+                        localAgg.getAggCallToAggFunction(),
+                        false);
+        call.transformTo(newGlobalAgg);
+    }
+
+    /** Configuration for {@link RemoveRedundantLocalHashAggRule}. */
+    @Value.Immutable(singleton = false)
+    public interface RemoveRedundantLocalHashAggRuleConfig extends 
RelRule.Config {
+        RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig 
DEFAULT =
+                
ImmutableRemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig
+                        .builder()
+                        .build()
+                        .withOperandSupplier(
+                                b0 ->
+                                        
b0.operand(BatchPhysicalHashAggregate.class)
+                                                .oneInput(
+                                                        b1 ->
+                                                                b1.operand(
+                                                                               
 BatchPhysicalLocalHashAggregate
+                                                                               
         .class)
+                                                                        
.oneInput(
+                                                                               
 b2 ->
+                                                                               
         b2.operand(
+                                                                               
                         RelNode
+                                                                               
                                 .class)
+                                                                               
                 .oneInput(
+                                                                               
                         b3 ->
+                                                                               
                                 b3.operand(
+                                                                               
                                                 FlinkConventions
+                                                                               
                                                         .BATCH_PHYSICAL()
+                                                                               
                                                         .getInterface())
+                                                                               
                                         .noInputs()))))
+                        .withDescription("RemoveRedundantLocalHashAggRule");
+
+        @Override
+        default RemoveRedundantLocalHashAggRule toRule() {
+            return new RemoveRedundantLocalHashAggRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
deleted file mode 100644
index 7378a71b29b..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.batch
-
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalHashAggregate,
 BatchPhysicalLocalHashAggregate}
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.rel.RelNode
-
-/**
- * There maybe exist a subTree like localHashAggregate -> globalHashAggregate 
which the middle
- * shuffle is removed. The rule could remove redundant localHashAggregate node.
- */
-class RemoveRedundantLocalHashAggRule
-  extends RelOptRule(
-    operand(
-      classOf[BatchPhysicalHashAggregate],
-      operand(
-        classOf[BatchPhysicalLocalHashAggregate],
-        operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
-    "RemoveRedundantLocalHashAggRule") {
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val globalAgg: BatchPhysicalHashAggregate = call.rel(0)
-    val localAgg: BatchPhysicalLocalHashAggregate = call.rel(1)
-    val inputOfLocalAgg = localAgg.getInput
-    val newGlobalAgg = new BatchPhysicalHashAggregate(
-      globalAgg.getCluster,
-      globalAgg.getTraitSet,
-      inputOfLocalAgg,
-      globalAgg.getRowType,
-      inputOfLocalAgg.getRowType,
-      inputOfLocalAgg.getRowType,
-      localAgg.grouping,
-      localAgg.auxGrouping,
-      // Use the localAgg agg calls because the global agg call filters was 
removed,
-      // see BatchPhysicalHashAggRule for details.
-      localAgg.getAggCallToAggFunction,
-      isMerge = false)
-    call.transformTo(newGlobalAgg)
-  }
-}
-
-object RemoveRedundantLocalHashAggRule {
-  val INSTANCE = new RemoveRedundantLocalHashAggRule
-}

Reply via email to