This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c2eac7ec85b [FLINK-34160][table] Migration of FlinkCalcMergeRule to
java
c2eac7ec85b is described below
commit c2eac7ec85bef93fe2b61c028984e704c5a9d126
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Feb 19 23:30:28 2024 +0100
[FLINK-34160][table] Migration of FlinkCalcMergeRule to java
---
.../plan/rules/logical/FlinkCalcMergeRule.java | 117 +++++++++++++++++++++
.../plan/rules/logical/FlinkCalcMergeRule.scala | 83 ---------------
.../PushFilterInCalcIntoTableSourceRuleTest.java | 2 +-
3 files changed, 118 insertions(+), 84 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.java
new file mode 100644
index 00000000000..f82a1bcf188
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexProgram;
+import org.immutables.value.Value;
+
+/**
+ * This rule is copied from Calcite's
[[org.apache.calcite.rel.rules.CalcMergeRule]].
+ *
+ * <p>Modification: - Condition in the merged program will be simplified if it
exists. - If the two
+ * [[Calc]] can merge into one, each non-deterministic [[RexNode]] of bottom
[[Calc]] should appear
+ * at most once in the project list and filter list of top [[Calc]].
+ */
+
+/**
+ * Planner rule that merges a [[Calc]] onto a [[Calc]].
+ *
+ * <p>The resulting [[Calc]] has the same project list as the upper [[Calc]],
but expressed in terms
+ * of the lower [[Calc]]'s inputs.
+ */
[email protected]
+public class FlinkCalcMergeRule extends
RelRule<FlinkCalcMergeRule.FlinkCalcMergeRuleConfig> {
+
+ public static final FlinkCalcMergeRule INSTANCE =
FlinkCalcMergeRuleConfig.DEFAULT.toRule();
+ public static final FlinkCalcMergeRule STREAM_PHYSICAL_INSTANCE =
+ FlinkCalcMergeRuleConfig.STREAM_PHYSICAL.toRule();
+
+ protected FlinkCalcMergeRule(FlinkCalcMergeRuleConfig config) {
+ super(config);
+ }
+
+ public boolean matches(RelOptRuleCall call) {
+ Calc topCalc = call.rel(0);
+ Calc bottomCalc = call.rel(1);
+
+ // Don't merge a calc which contains windowed aggregates onto a
+ // calc. That would effectively be pushing a windowed aggregate down
+ // through a filter.
+ RexProgram topProgram = topCalc.getProgram();
+ if (RexOver.containsOver(topProgram)) {
+ return false;
+ }
+
+ return FlinkRelUtil.isMergeable(topCalc, bottomCalc);
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ Calc topCalc = call.rel(0);
+ Calc bottomCalc = call.rel(1);
+
+ Calc newCalc = FlinkRelUtil.merge(topCalc, bottomCalc);
+ if (newCalc.getDigest() == bottomCalc.getDigest()) {
+ // newCalc is equivalent to bottomCalc,
+ // which means that topCalc
+ // must be trivial. Take it out of the game.
+ call.getPlanner().prune(topCalc);
+ }
+ call.transformTo(newCalc);
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface FlinkCalcMergeRuleConfig extends RelRule.Config {
+ FlinkCalcMergeRule.FlinkCalcMergeRuleConfig DEFAULT =
+ ImmutableFlinkCalcMergeRule.FlinkCalcMergeRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+ b0.operand(Calc.class)
+ .inputs(b1 ->
b1.operand(Calc.class).anyInputs()))
+ .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+ .withDescription("FlinkCalcMergeRule");
+
+ FlinkCalcMergeRule.FlinkCalcMergeRuleConfig STREAM_PHYSICAL =
+ ImmutableFlinkCalcMergeRule.FlinkCalcMergeRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+ b0.operand(StreamPhysicalCalc.class)
+ .inputs(
+ b1 ->
+
b1.operand(StreamPhysicalCalc.class)
+
.anyInputs()))
+ .withRelBuilderFactory(RelFactories.LOGICAL_BUILDER)
+ .withDescription("FlinkCalcMergeRule");
+
+ @Override
+ default FlinkCalcMergeRule toRule() {
+ return new FlinkCalcMergeRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
deleted file mode 100644
index da5533b2a6e..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkCalcMergeRule.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc
-import org.apache.flink.table.planner.plan.utils.FlinkRelUtil
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.core.{Calc, RelFactories}
-import org.apache.calcite.rex.{RexNode, RexOver}
-
-/**
- * This rule is copied from Calcite's
[[org.apache.calcite.rel.rules.CalcMergeRule]].
- *
- * Modification:
- * - Condition in the merged program will be simplified if it exists.
- * - If the two [[Calc]] can merge into one, each non-deterministic
[[RexNode]] of bottom [[Calc]]
- * should appear at most once in the project list and filter list of top
[[Calc]].
- */
-
-/**
- * Planner rule that merges a [[Calc]] onto a [[Calc]].
- *
- * <p>The resulting [[Calc]] has the same project list as the upper [[Calc]],
but expressed in terms
- * of the lower [[Calc]]'s inputs.
- */
-class FlinkCalcMergeRule[C <: Calc](calcClass: Class[C])
- extends RelOptRule(
- operand(calcClass, operand(calcClass, any)),
- RelFactories.LOGICAL_BUILDER,
- "FlinkCalcMergeRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val topCalc: Calc = call.rel(0)
- val bottomCalc: Calc = call.rel(1)
-
- // Don't merge a calc which contains windowed aggregates onto a
- // calc. That would effectively be pushing a windowed aggregate down
- // through a filter.
- val topProgram = topCalc.getProgram
- if (RexOver.containsOver(topProgram)) {
- return false
- }
-
- FlinkRelUtil.isMergeable(topCalc, bottomCalc)
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val topCalc: Calc = call.rel(0)
- val bottomCalc: Calc = call.rel(1)
-
- val newCalc = FlinkRelUtil.merge(topCalc, bottomCalc)
- if (newCalc.getDigest == bottomCalc.getDigest) {
- // newCalc is equivalent to bottomCalc,
- // which means that topCalc
- // must be trivial. Take it out of the game.
- call.getPlanner.prune(topCalc)
- }
- call.transformTo(newCalc)
- }
-
-}
-
-object FlinkCalcMergeRule {
- val INSTANCE = new FlinkCalcMergeRule(classOf[Calc])
- val STREAM_PHYSICAL_INSTANCE = new
FlinkCalcMergeRule(classOf[StreamPhysicalCalc])
-}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
index 1d929b751ae..3e3bbb76fa7 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
@@ -52,7 +52,7 @@ class PushFilterInCalcIntoTableSourceRuleTest extends
PushFilterIntoTableSourceS
RuleSets.ofList(
CoreRules.PROJECT_TO_CALC,
CoreRules.FILTER_TO_CALC,
- FlinkCalcMergeRule$.MODULE$.INSTANCE(),
+ FlinkCalcMergeRule.INSTANCE,
FlinkLogicalCalc.CONVERTER(),
FlinkLogicalTableSourceScan.CONVERTER(),
FlinkLogicalWatermarkAssigner.CONVERTER()))