This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35d1d8ab8a04a98906fd37f6e62aa289d9ed63bd
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Feb 22 20:18:30 2026 +0100

    [FLINK-38624][table] Convert FlinkLogicalOverAggregate to java
---
 .../nodes/logical/FlinkLogicalOverAggregate.java   | 135 +++++++++++++++++++++
 .../nodes/logical/FlinkLogicalOverAggregate.scala  | 121 ------------------
 2 files changed, 135 insertions(+), 121 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.java
new file mode 100644
index 00000000000..429d24f1f25
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.logical;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.sql.SqlRankFunction;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.List;
+
+/**
+ * Sub-class of {@link Window} that is a relational expression which 
represents a set of over window
+ * aggregates in Flink.
+ */
+public class FlinkLogicalOverAggregate extends Window implements 
FlinkLogicalRel {
+    public static final ConverterRule CONVERTER =
+            new FlinkLogicalOverAggregateConverter(
+                    ConverterRule.Config.INSTANCE.withConversion(
+                            LogicalWindow.class,
+                            Convention.NONE,
+                            FlinkConventions.LOGICAL(),
+                            "FlinkLogicalOverAggregateConverter"));
+
+    protected FlinkLogicalOverAggregate(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            List<RelHint> hints,
+            RelNode input,
+            List<RexLiteral> constants,
+            RelDataType rowType,
+            List<Group> groups) {
+        super(cluster, traitSet, hints, input, constants, rowType, groups);
+    }
+
+    public FlinkLogicalOverAggregate(
+            RelOptCluster cluster,
+            RelTraitSet traitSet,
+            RelNode input,
+            List<RexLiteral> constants,
+            RelDataType rowType,
+            List<Group> groups) {
+        super(cluster, traitSet, input, constants, rowType, groups);
+    }
+
+    @Override
+    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return copy(traitSet, inputs, rowType, groups);
+    }
+
+    public RelNode copy(
+            RelTraitSet traitSet,
+            List<RelNode> inputs,
+            RelDataType rowType,
+            List<Window.Group> groups) {
+        return new FlinkLogicalOverAggregate(
+                getCluster(), traitSet, inputs.get(0), constants, rowType, 
groups);
+    }
+
+    private static class FlinkLogicalOverAggregateConverter extends 
ConverterRule {
+
+        protected FlinkLogicalOverAggregateConverter(Config config) {
+            super(config);
+        }
+
+        @Override
+        public @Nullable RelNode convert(RelNode rel) {
+            LogicalWindow window = (LogicalWindow) rel;
+            RelMetadataQuery mq = rel.getCluster().getMetadataQuery();
+            RelTraitSet traitSet =
+                    rel.getCluster()
+                            .traitSetOf(FlinkConventions.LOGICAL())
+                            .replaceIfs(
+                                    RelCollationTraitDef.INSTANCE,
+                                    () ->
+                                            RelMdCollation.window(
+                                                    mq, window.getInput(), 
window.groups))
+                            .simplify();
+            RelNode newInput = RelOptRule.convert(window.getInput(), 
FlinkConventions.LOGICAL());
+
+            window.groups.forEach(
+                    group -> {
+                        final int orderKeySize = 
group.orderKeys.getFieldCollations().size();
+                        group.aggCalls.forEach(
+                                winAggCall -> {
+                                    if (orderKeySize == 0
+                                            && winAggCall.op instanceof 
SqlRankFunction) {
+                                        throw new ValidationException(
+                                                "Over Agg: The window rank 
function requires order by clause with non-constant fields. "
+                                                        + "please re-check the 
over window statement.");
+                                    }
+                                });
+                    });
+
+            return new FlinkLogicalOverAggregate(
+                    rel.getCluster(),
+                    traitSet,
+                    newInput,
+                    window.constants,
+                    window.getRowType(),
+                    window.groups);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
deleted file mode 100644
index 61a35b2fb00..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalOverAggregate.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.nodes.logical
-
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelCollation, RelCollationTraitDef, RelNode}
-import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.convert.ConverterRule.Config
-import org.apache.calcite.rel.core.Window
-import org.apache.calcite.rel.logical.LogicalWindow
-import org.apache.calcite.rel.metadata.RelMdCollation
-import org.apache.calcite.rex.RexLiteral
-import org.apache.calcite.sql.SqlRankFunction
-
-import java.util
-import java.util.{List => JList}
-import java.util.function.Supplier
-
-import scala.collection.JavaConversions._
-
-/**
- * Sub-class of [[Window]] that is a relational expression which represents a 
set of over window
- * aggregates in Flink.
- */
-class FlinkLogicalOverAggregate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    input: RelNode,
-    windowConstants: JList[RexLiteral],
-    rowType: RelDataType,
-    windowGroups: JList[Window.Group])
-  extends Window(cluster, traitSet, input, windowConstants, rowType, 
windowGroups)
-  with FlinkLogicalRel {
-
-  override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
-    copy(traitSet, inputs, rowType, windowGroups)
-  }
-
-  def copy(
-      traitSet: RelTraitSet,
-      inputs: JList[RelNode],
-      rowType: RelDataType,
-      groups: JList[Window.Group]): RelNode = {
-    new FlinkLogicalOverAggregate(
-      cluster,
-      traitSet,
-      inputs.get(0),
-      windowConstants,
-      rowType,
-      groups)
-  }
-}
-
-class FlinkLogicalOverAggregateConverter(config: Config) extends 
ConverterRule(config) {
-
-  override def convert(rel: RelNode): RelNode = {
-    val window = rel.asInstanceOf[LogicalWindow]
-    val mq = rel.getCluster.getMetadataQuery
-    val traitSet = rel.getCluster
-      .traitSetOf(FlinkConventions.LOGICAL)
-      .replaceIfs(
-        RelCollationTraitDef.INSTANCE,
-        new Supplier[util.List[RelCollation]]() {
-          def get: util.List[RelCollation] = {
-            RelMdCollation.window(mq, window.getInput(), window.groups)
-          }
-        }
-      )
-      .simplify()
-    val newInput = RelOptRule.convert(window.getInput, 
FlinkConventions.LOGICAL)
-
-    window.groups.foreach {
-      group =>
-        val orderKeySize = group.orderKeys.getFieldCollations.size()
-        group.aggCalls.foreach {
-          winAggCall =>
-            if (orderKeySize == 0 && 
winAggCall.op.isInstanceOf[SqlRankFunction]) {
-              throw new ValidationException(
-                "Over Agg: The window rank function requires order by clause 
with non-constant fields. " +
-                  "please re-check the over window statement.")
-            }
-        }
-    }
-
-    new FlinkLogicalOverAggregate(
-      rel.getCluster,
-      traitSet,
-      newInput,
-      window.constants,
-      window.getRowType,
-      window.groups)
-  }
-}
-
-object FlinkLogicalOverAggregate {
-  val CONVERTER: ConverterRule = new FlinkLogicalOverAggregateConverter(
-    Config.INSTANCE.withConversion(
-      classOf[LogicalWindow],
-      Convention.NONE,
-      FlinkConventions.LOGICAL,
-      "FlinkLogicalOverAggregateConverter"))
-}

Reply via email to