This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ded9980df21 [FLINK-34596][table] Migrate
RemoveRedundantLocalHashAggRule to java
ded9980df21 is described below
commit ded9980df2126538b999ee08183d354e00529a5f
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Jan 1 23:02:10 2026 +0100
[FLINK-34596][table] Migrate RemoveRedundantLocalHashAggRule to java
---
.../batch/RemoveRedundantLocalHashAggRule.java | 101 +++++++++++++++++++++
.../batch/RemoveRedundantLocalHashAggRule.scala | 63 -------------
2 files changed, 101 insertions(+), 63 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java
new file mode 100644
index 00000000000..25b1a4c94da
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.batch;
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashAggregate;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalLocalHashAggregate;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.RelNode;
+import org.immutables.value.Value;
+
+/**
+ * There maybe exist a subTree like localHashAggregate -> globalHashAggregate
which the middle
+ * shuffle is removed. The rule could remove redundant localHashAggregate node.
+ */
[email protected]
+public class RemoveRedundantLocalHashAggRule
+ extends
RelRule<RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig> {
+
+ public static final RemoveRedundantLocalHashAggRule INSTANCE =
+
RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig.DEFAULT.toRule();
+
+ protected
RemoveRedundantLocalHashAggRule(RemoveRedundantLocalHashAggRuleConfig config) {
+ super(config);
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ BatchPhysicalHashAggregate globalAgg = call.rel(0);
+ BatchPhysicalLocalHashAggregate localAgg = call.rel(1);
+ RelNode inputOfLocalAgg = localAgg.getInput();
+ BatchPhysicalHashAggregate newGlobalAgg =
+ new BatchPhysicalHashAggregate(
+ globalAgg.getCluster(),
+ globalAgg.getTraitSet(),
+ inputOfLocalAgg,
+ globalAgg.getRowType(),
+ inputOfLocalAgg.getRowType(),
+ inputOfLocalAgg.getRowType(),
+ localAgg.grouping(),
+ localAgg.auxGrouping(),
+ // Use the localAgg agg calls because the global agg
call filters was
+ // removed,
+ // see BatchPhysicalHashAggRule for details.
+ localAgg.getAggCallToAggFunction(),
+ false);
+ call.transformTo(newGlobalAgg);
+ }
+
+ /** Configuration for {@link RemoveRedundantLocalHashAggRule}. */
+ @Value.Immutable(singleton = false)
+ public interface RemoveRedundantLocalHashAggRuleConfig extends
RelRule.Config {
+ RemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig
DEFAULT =
+
ImmutableRemoveRedundantLocalHashAggRule.RemoveRedundantLocalHashAggRuleConfig
+ .builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+
b0.operand(BatchPhysicalHashAggregate.class)
+ .oneInput(
+ b1 ->
+ b1.operand(
+
BatchPhysicalLocalHashAggregate
+
.class)
+
.oneInput(
+
b2 ->
+
b2.operand(
+
RelNode
+
.class)
+
.oneInput(
+
b3 ->
+
b3.operand(
+
FlinkConventions
+
.BATCH_PHYSICAL()
+
.getInterface())
+
.noInputs()))))
+ .withDescription("RemoveRedundantLocalHashAggRule");
+
+ @Override
+ default RemoveRedundantLocalHashAggRule toRule() {
+ return new RemoveRedundantLocalHashAggRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
deleted file mode 100644
index 7378a71b29b..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/RemoveRedundantLocalHashAggRule.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.batch
-
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalHashAggregate,
BatchPhysicalLocalHashAggregate}
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule._
-import org.apache.calcite.rel.RelNode
-
-/**
- * There maybe exist a subTree like localHashAggregate -> globalHashAggregate
which the middle
- * shuffle is removed. The rule could remove redundant localHashAggregate node.
- */
-class RemoveRedundantLocalHashAggRule
- extends RelOptRule(
- operand(
- classOf[BatchPhysicalHashAggregate],
- operand(
- classOf[BatchPhysicalLocalHashAggregate],
- operand(classOf[RelNode], FlinkConventions.BATCH_PHYSICAL, any))),
- "RemoveRedundantLocalHashAggRule") {
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val globalAgg: BatchPhysicalHashAggregate = call.rel(0)
- val localAgg: BatchPhysicalLocalHashAggregate = call.rel(1)
- val inputOfLocalAgg = localAgg.getInput
- val newGlobalAgg = new BatchPhysicalHashAggregate(
- globalAgg.getCluster,
- globalAgg.getTraitSet,
- inputOfLocalAgg,
- globalAgg.getRowType,
- inputOfLocalAgg.getRowType,
- inputOfLocalAgg.getRowType,
- localAgg.grouping,
- localAgg.auxGrouping,
- // Use the localAgg agg calls because the global agg call filters was
removed,
- // see BatchPhysicalHashAggRule for details.
- localAgg.getAggCallToAggFunction,
- isMerge = false)
- call.transformTo(newGlobalAgg)
- }
-}
-
-object RemoveRedundantLocalHashAggRule {
- val INSTANCE = new RemoveRedundantLocalHashAggRule
-}