This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 835c21090c8 [FLINK-34674][table] Migrate CalcSnapshotTransposeRule to 
java
835c21090c8 is described below

commit 835c21090c8f3f65c5c749160797158a5cd1fedd
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Dec 6 22:20:33 2024 +0100

    [FLINK-34674][table] Migrate CalcSnapshotTransposeRule to java
---
 .../rules/logical/CalcSnapshotTransposeRule.java   | 81 ++++++++++++++++++++++
 .../rules/logical/CalcSnapshotTransposeRule.scala  | 49 -------------
 2 files changed, 81 insertions(+), 49 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.java
new file mode 100644
index 00000000000..2948ac242bd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSnapshot;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rex.RexOver;
+import org.immutables.value.Value;
+
+/** Transpose {@link FlinkLogicalCalc} past into {@link FlinkLogicalSnapshot}. 
*/
[email protected]
+public class CalcSnapshotTransposeRule
+        extends 
RelRule<CalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig> {
+
+    public static final CalcSnapshotTransposeRule INSTANCE =
+            
CalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig.DEFAULT.toRule();
+
+    protected CalcSnapshotTransposeRule(CalcSnapshotTransposeRuleConfig 
config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        FlinkLogicalCalc calc = call.rel(0);
+        // Don't push a calc which contains windowed aggregates into a 
snapshot for now.
+        return !RexOver.containsOver(calc.getProgram());
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        FlinkLogicalCalc calc = call.rel(0);
+        FlinkLogicalSnapshot snapshot = call.rel(1);
+        Calc newClac = calc.copy(calc.getTraitSet(), snapshot.getInputs());
+        Snapshot newSnapshot = snapshot.copy(snapshot.getTraitSet(), newClac, 
snapshot.getPeriod());
+        call.transformTo(newSnapshot);
+    }
+
+    /** Rule configuration. */
+    @Value.Immutable(singleton = false)
+    public interface CalcSnapshotTransposeRuleConfig extends RelRule.Config {
+        CalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig DEFAULT =
+                
ImmutableCalcSnapshotTransposeRule.CalcSnapshotTransposeRuleConfig.builder()
+                        .operandSupplier(
+                                b0 ->
+                                        b0.operand(FlinkLogicalCalc.class)
+                                                .oneInput(
+                                                        b1 ->
+                                                                b1.operand(
+                                                                               
 FlinkLogicalSnapshot
+                                                                               
         .class)
+                                                                        
.anyInputs()))
+                        .description("CalcSnapshotTransposeRule")
+                        .build();
+
+        @Override
+        default CalcSnapshotTransposeRule toRule() {
+            return new CalcSnapshotTransposeRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.scala
deleted file mode 100644
index 80026ff4fab..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CalcSnapshotTransposeRule.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.logical
-
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalSnapshot}
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rex.RexOver
-
-/** Transpose [[FlinkLogicalCalc]] past into [[FlinkLogicalSnapshot]]. */
-class CalcSnapshotTransposeRule
-  extends RelOptRule(
-    operand(classOf[FlinkLogicalCalc], operand(classOf[FlinkLogicalSnapshot], 
any())),
-    "CalcSnapshotTransposeRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val calc = call.rel[FlinkLogicalCalc](0)
-    // Don't push a calc which contains windowed aggregates into a snapshot 
for now.
-    !RexOver.containsOver(calc.getProgram)
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val calc = call.rel[FlinkLogicalCalc](0)
-    val snapshot = call.rel[FlinkLogicalSnapshot](1)
-    val newClac = calc.copy(calc.getTraitSet, snapshot.getInputs)
-    val newSnapshot = snapshot.copy(snapshot.getTraitSet, newClac, 
snapshot.getPeriod)
-    call.transformTo(newSnapshot)
-  }
-}
-
-object CalcSnapshotTransposeRule {
-  val INSTANCE = new CalcSnapshotTransposeRule
-}

Reply via email to