This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 835c21090c8 [FLINK-34674][table] Migrate CalcSnapshotTransposeRule to
java
835c21090c8 is described below
commit 835c21090c8f3f65c5c749160797158a5cd1fedd
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Dec 6 22:20:33 2024 +0100
[FLINK-34674][table] Migrate CalcSnapshotTransposeRule to java
---
.../rules/logical/CalcSnapshotTransposeRule.java | 81 ++++++++++++++++++++++
.../rules/logical/CalcSnapshotTransposeRule.scala | 49 -------------
2 files changed, 81 insertions(+), 49 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.java
new file mode 100644
index 00000000000..2948ac242bd
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rex.RexOver;
+import org.immutables.value.Value;
+
+/** Transpose {@link FlinkLogicalCalc} past into {@link FlinkLogicalSnapshot}.
*/
[email protected]
+public class CalcSnapshotTransposeRule
+ extends
RelRule<CalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig> {
+
+ public static final CalcSnapshotTransposeRule INSTANCE =
+
CalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig.DEFAULT.toRule();
+
+ protected CalcSnapshotTransposeRule(CalcSnapshotTransposeRuleConfig
config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ FlinkLogicalCalc calc = call.rel(0);
+ // Don't push a calc which contains windowed aggregates into a
snapshot for now.
+ return !RexOver.containsOver(calc.getProgram());
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ FlinkLogicalCalc calc = call.rel(0);
+ FlinkLogicalSnapshot snapshot = call.rel(1);
+ Calc newClac = calc.copy(calc.getTraitSet(), snapshot.getInputs());
+ Snapshot newSnapshot = snapshot.copy(snapshot.getTraitSet(), newClac,
snapshot.getPeriod());
+ call.transformTo(newSnapshot);
+ }
+
+ /** Rule configuration. */
+ @Value.Immutable(singleton = false)
+ public interface CalcSnapshotTransposeRuleConfig extends RelRule.Config {
+ CalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig DEFAULT =
+
ImmutableCalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig.builder()
+ .operandSupplier(
+ b0 ->
+ b0.operand(FlinkLogicalCalc.class)
+ .oneInput(
+ b1 ->
+ b1.operand(
+
FlinkLogicalSnapshot
+
.class)
+
.anyInputs()))
+ .description("CalcSnapshotTransposeRule")
+ .build();
+
+ @Override
+ default CalcSnapshotTransposeRule toRule() {
+ return new CalcSnapshotTransposeRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.scala
deleted file mode 100644
index 80026ff4fab..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc,
FlinkLogicalSnapshot}
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rex.RexOver
-
-/** Transpose [[FlinkLogicalCalc]] past into [[FlinkLogicalSnapshot]]. */
-class CalcSnapshotTransposeRule
- extends RelOptRule(
- operand(classOf[FlinkLogicalCalc], operand(classOf[FlinkLogicalSnapshot],
any())),
- "CalcSnapshotTransposeRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val calc = call.rel[FlinkLogicalCalc](0)
- // Don't push a calc which contains windowed aggregates into a snapshot
for now.
- !RexOver.containsOver(calc.getProgram)
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val calc = call.rel[FlinkLogicalCalc](0)
- val snapshot = call.rel[FlinkLogicalSnapshot](1)
- val newClac = calc.copy(calc.getTraitSet, snapshot.getInputs)
- val newSnapshot = snapshot.copy(snapshot.getTraitSet, newClac,
snapshot.getPeriod)
- call.transformTo(newSnapshot)
- }
-}
-
-object CalcSnapshotTransposeRule {
- val INSTANCE = new CalcSnapshotTransposeRule
-}