This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f85d3074ec [FLINK-38971][table] Migrate 
ExpandWindowTableFunctionTransposeRule to java
4f85d3074ec is described below

commit 4f85d3074eccfe628e2926269ec7e943c61d2a9c
Author: Roman <[email protected]>
AuthorDate: Mon Mar 2 20:41:49 2026 +0100

    [FLINK-38971][table] Migrate ExpandWindowTableFunctionTransposeRule to java
---
 .../ExpandWindowTableFunctionTransposeRule.java    | 363 +++++++++++++++++++++
 ...WindowTableFunctionIntoWindowAggregateRule.java |   4 +-
 .../ExpandWindowTableFunctionTransposeRule.scala   | 283 ----------------
 .../table/planner/plan/utils/WindowUtil.scala      |   3 +-
 4 files changed, 366 insertions(+), 287 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.java
new file mode 100644
index 00000000000..ddba3c17591
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.SqlWindowTableFunction;
+import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction;
+import org.apache.flink.table.planner.plan.trait.RelWindowProperties;
+import org.apache.flink.table.planner.plan.utils.WindowUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple4;
+
+/**
+ * This rule transposes {@link StreamPhysicalExpand} past {@link 
StreamPhysicalWindowTableFunction}
+ * to make {@link PullUpWindowTableFunctionIntoWindowAggregateRule} can match 
the rel tree pattern
+ * and optimize them into {@link StreamPhysicalWindowAggregate}.
+ *
+ * <p>Example:
+ *
+ * <p>MyTable: a INT, c STRING, rowtime TIMESTAMP(3)
+ *
+ * <p>SQL:
+ *
+ * <pre>{@code
+ * SELECT
+ *    window_start,
+ *    window_end,
+ *    count(distinct a),
+ *    count(distinct c)
+ * FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+ * GROUP BY window_start, window_end
+ * }</pre>
+ *
+ * <p>We will get part of the initial physical plan like following:
+ *
+ * <pre>{@code
+ * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
+ * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a) 
FILTER $g_1 AS $f2,
+ * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$) 
AS window_end])
+ * +- Exchange(distribution=[hash[$f4, $f5]])
+ *    +- Calc(select=[window_start, window_end, a, c, $f4, $f5, =($e, 1) AS 
$g_1, =($e, 2) AS $g_2])
+ *       +- Expand(projects=[{window_start, window_end, a, c, $f4, null AS 
$f5, 1 AS $e},
+ *       {window_start, window_end, a, c, null AS $f4, $f5, 2 AS $e}])
+ *          +- Calc(select=[window_start, window_end, a, c,
+ *          MOD(HASH_CODE(a), 1024) AS $f4, MOD(HASH_CODE(c), 1024) AS $f5])
+ *             +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])])
+ * }</pre>
+ *
+ * <p>However, it can't match {@link 
PullUpWindowTableFunctionIntoWindowAggregateRule}, because
+ * {@link StreamPhysicalWindowTableFunction} is not near {@link 
StreamPhysicalWindowAggregate}. So
+ * we need to transpose {@link StreamPhysicalExpand} past {@link 
StreamPhysicalWindowTableFunction}
+ * to make the part of rel tree like this which can be matched by {@link
+ * PullUpWindowTableFunctionIntoWindowAggregateRule}.
+ *
+ * <pre>{@code
+ * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
+ * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a) 
FILTER $g_1 AS $f2,
+ * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$) 
AS window_end])
+ * +- Exchange(distribution=[hash[$f4, $f5]])
+ *   +- Calc(select=[window_start, window_end, a, c, $f4, $f5, ($e = 1) AS 
$g_1, ($e = 2) AS $g_2])
+ *     +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])
+ *       +- Expand(...)
+ * }</pre>
+ */
[email protected]
+public class ExpandWindowTableFunctionTransposeRule
+        extends RelRule<
+                ExpandWindowTableFunctionTransposeRule
+                        .ExpandWindowTableFunctionTransposeRuleConfig> {
+
+    public static final ExpandWindowTableFunctionTransposeRule INSTANCE =
+            
ExpandWindowTableFunctionTransposeRule.ExpandWindowTableFunctionTransposeRuleConfig
+                    .DEFAULT
+                    .toRule();
+
+    protected ExpandWindowTableFunctionTransposeRule(
+            ExpandWindowTableFunctionTransposeRuleConfig config) {
+        super(config);
+    }
+
+    @Override
+    public boolean matches(RelOptRuleCall call) {
+        StreamPhysicalExpand expand = call.rel(0);
+        StreamPhysicalCalc calc = call.rel(1);
+        FlinkRelMetadataQuery fmq =
+                
FlinkRelMetadataQuery.reuseOrCreate(calc.getCluster().getMetadataQuery());
+
+        // condition and projection of Calc shouldn't contain calls on window 
columns,
+        // otherwise, we can't transpose WindowTVF and Calc
+        if (WindowUtil.calcContainsCallsOnWindowColumns(calc, fmq)) {
+            return false;
+        }
+
+        // we only transpose WindowTVF when expand propagate window_start and 
window_end,
+        // otherwise, it's meaningless to transpose
+        RelWindowProperties expandWindowProps = 
fmq.getRelWindowProperties(expand);
+        return expandWindowProps != null
+                && !expandWindowProps.getWindowStartColumns().isEmpty()
+                && !expandWindowProps.getWindowEndColumns().isEmpty();
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        StreamPhysicalExpand expand = call.rel(0);
+        StreamPhysicalCalc calc = call.rel(1);
+        StreamPhysicalWindowTableFunction windowTVF = call.rel(2);
+        RelOptCluster cluster = expand.getCluster();
+        FlinkRelMetadataQuery fmq = 
FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery());
+        FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
cluster.getTypeFactory();
+        RelNode input = windowTVF.getInput();
+        RelDataType inputRowType = input.getRowType();
+
+        RelTraitSet requiredInputTraitSet =
+                
input.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+        RelNode newInput = RelOptRule.convert(input, requiredInputTraitSet);
+
+        // 
-------------------------------------------------------------------------
+        //  1. transpose Calc and WindowTVF, build the new Calc node (the top 
node)
+        // 
-------------------------------------------------------------------------
+        ImmutableBitSet windowColumns = 
fmq.getRelWindowProperties(windowTVF).getWindowColumns();
+        Tuple4<RexProgram, int[], Integer, Boolean> programInfo =
+                WindowUtil.buildNewProgramWithoutWindowColumns(
+                        cluster.getRexBuilder(),
+                        calc.getProgram(),
+                        inputRowType,
+                        windowTVF.windowing().getTimeAttributeIndex(),
+                        windowColumns.toArray());
+        RexProgram newProgram = programInfo._1();
+        int[] fieldShifting = programInfo._2();
+        int newTimeField = programInfo._3();
+        boolean timeFieldAdded = programInfo._4();
+
+        StreamPhysicalCalc newCalc =
+                new StreamPhysicalCalc(
+                        cluster,
+                        calc.getTraitSet(),
+                        newInput,
+                        newProgram,
+                        newProgram.getOutputRowType());
+
+        // 
-------------------------------------------------------------------------
+        //  2. Adjust input ref index in Expand, append time attribute ref if 
needed
+        // 
-------------------------------------------------------------------------
+        StreamPhysicalExpand newExpand =
+                buildNewExpand(expand, newCalc, fieldShifting, newTimeField, 
timeFieldAdded);
+
+        // 
-------------------------------------------------------------------------
+        //  3. Apply WindowTVF on the new Expand node
+        // 
-------------------------------------------------------------------------
+        RelDataType newOutputType =
+                SqlWindowTableFunction.inferRowType(
+                        typeFactory,
+                        newExpand.getRowType(),
+                        typeFactory.createFieldTypeFromLogicalType(
+                                windowTVF.windowing().getTimeAttributeType()));
+        // the time attribute ref is appended
+        int timeAttributeOnExpand =
+                timeFieldAdded ? newExpand.getRowType().getFieldCount() - 1 : 
newTimeField;
+        TimeAttributeWindowingStrategy newWindowing =
+                new TimeAttributeWindowingStrategy(
+                        windowTVF.windowing().getWindow(),
+                        windowTVF.windowing().getTimeAttributeType(),
+                        timeAttributeOnExpand);
+        StreamPhysicalWindowTableFunction newWindowTVF =
+                new StreamPhysicalWindowTableFunction(
+                        cluster, windowTVF.getTraitSet(), newExpand, 
newOutputType, newWindowing);
+
+        // 
-------------------------------------------------------------------------
+        //  4. Apply Calc on the new WindowTVF to adjust the fields mapping
+        // 
-------------------------------------------------------------------------
+        int[] projectionMapping = getProjectionMapping(fmq, expand, 
newWindowTVF);
+        List<RexNode> projectExprs =
+                Arrays.stream(projectionMapping)
+                        .mapToObj(index -> RexInputRef.of(index, 
newWindowTVF.getRowType()))
+                        .collect(Collectors.toList());
+        RexProgram topRexProgram =
+                RexProgram.create(
+                        newWindowTVF.getRowType(),
+                        projectExprs,
+                        null,
+                        expand.getRowType(),
+                        cluster.getRexBuilder());
+        StreamPhysicalCalc topCalc =
+                new StreamPhysicalCalc(
+                        cluster,
+                        expand.getTraitSet(),
+                        newWindowTVF,
+                        topRexProgram,
+                        topRexProgram.getOutputRowType());
+
+        // 
-------------------------------------------------------------------------
+        //  5. Finish
+        // 
-------------------------------------------------------------------------
+        call.transformTo(topCalc);
+    }
+
+    private StreamPhysicalExpand buildNewExpand(
+            StreamPhysicalExpand expand,
+            StreamPhysicalCalc newCalc,
+            int[] inputFieldShifting,
+            int newTimeField,
+            boolean timeFieldAdded) {
+        RelDataType newInputRowType = newCalc.getRowType();
+        int expandIdIndex = expand.expandIdIndex();
+        int newExpandIdIndex = -1;
+        List<List<RexNode>> newProjects = new ArrayList<>();
+
+        for (List<RexNode> exprs : expand.projects()) {
+            List<RexNode> newExprs = new ArrayList<>();
+            int baseOffset = 0;
+            for (int exprIndex = 0; exprIndex < exprs.size(); exprIndex++) {
+                RexNode expr = exprs.get(exprIndex);
+                if (expr instanceof RexInputRef) {
+                    int shiftedIndex = inputFieldShifting[((RexInputRef) 
expr).getIndex()];
+                    if (shiftedIndex < 0) {
+                        // skip the window columns
+                        continue;
+                    }
+                    newExprs.add(RexInputRef.of(shiftedIndex, 
newInputRowType));
+                    // we only use the type from input ref instead of literal
+                    baseOffset++;
+                } else if (expr instanceof RexLiteral) {
+                    newExprs.add(expr);
+                    if (exprIndex == expandIdIndex) {
+                        // this is the expand id, we should remember the new 
index of expand id
+                        // and update type for this expr
+                        newExpandIdIndex = baseOffset;
+                    }
+                    baseOffset++;
+                } else {
+                    throw new IllegalArgumentException(
+                            "Expand node should only contain RexInputRef and 
RexLiteral, but got "
+                                    + expr);
+                }
+            }
+            if (timeFieldAdded) {
+                // append time attribute reference if needed
+                newExprs.add(RexInputRef.of(newTimeField, newInputRowType));
+            }
+            newProjects.add(newExprs);
+        }
+
+        return new StreamPhysicalExpand(
+                expand.getCluster(), expand.getTraitSet(), newCalc, 
newProjects, newExpandIdIndex);
+    }
+
+    private int[] getProjectionMapping(
+            FlinkRelMetadataQuery fmq,
+            StreamPhysicalExpand oldExpand,
+            StreamPhysicalWindowTableFunction newWindowTVF) {
+        RelWindowProperties windowProps = 
fmq.getRelWindowProperties(oldExpand);
+        Set<Integer> startColumns =
+                Arrays.stream(windowProps.getWindowStartColumns().toArray())
+                        .boxed()
+                        .collect(Collectors.toSet());
+        Set<Integer> endColumns =
+                Arrays.stream(windowProps.getWindowEndColumns().toArray())
+                        .boxed()
+                        .collect(Collectors.toSet());
+        Set<Integer> timeColumns =
+                Arrays.stream(windowProps.getWindowTimeColumns().toArray())
+                        .boxed()
+                        .collect(Collectors.toSet());
+        int newWindowTimePos = newWindowTVF.getRowType().getFieldCount() - 1;
+        int newWindowEndPos = newWindowTVF.getRowType().getFieldCount() - 2;
+        int newWindowStartPos = newWindowTVF.getRowType().getFieldCount() - 3;
+        int numWindowColumns = 0;
+        List<Integer> projectMapping = new ArrayList<>();
+
+        for (int index = 0; index < oldExpand.getRowType().getFieldCount(); 
index++) {
+            if (startColumns.contains(index)) {
+                projectMapping.add(newWindowStartPos);
+                numWindowColumns++;
+            } else if (endColumns.contains(index)) {
+                projectMapping.add(newWindowEndPos);
+                numWindowColumns++;
+            } else if (timeColumns.contains(index)) {
+                projectMapping.add(newWindowTimePos);
+                numWindowColumns++;
+            } else {
+                projectMapping.add(index - numWindowColumns);
+            }
+        }
+
+        return projectMapping.stream().mapToInt(Integer::intValue).toArray();
+    }
+
+    /** Configuration for {@link ExpandWindowTableFunctionTransposeRule}. */
+    @Value.Immutable(singleton = false)
+    public interface ExpandWindowTableFunctionTransposeRuleConfig extends 
RelRule.Config {
+        
ExpandWindowTableFunctionTransposeRule.ExpandWindowTableFunctionTransposeRuleConfig
+                DEFAULT =
+                        ImmutableExpandWindowTableFunctionTransposeRule
+                                
.ExpandWindowTableFunctionTransposeRuleConfig.builder()
+                                .build()
+                                .withOperandSupplier(
+                                        b0 ->
+                                                
b0.operand(StreamPhysicalExpand.class)
+                                                        .oneInput(
+                                                                b1 ->
+                                                                        
b1.operand(
+                                                                               
         StreamPhysicalCalc
+                                                                               
                 .class)
+                                                                               
 .oneInput(
+                                                                               
         b2 ->
+                                                                               
                 b2.operand(
+                                                                               
                                 StreamPhysicalWindowTableFunction
+                                                                               
                                         .class)
+                                                                               
                         .anyInputs())))
+                                
.withDescription("ExpandWindowTableFunctionTransposeRule")
+                                .as(
+                                        ExpandWindowTableFunctionTransposeRule
+                                                
.ExpandWindowTableFunctionTransposeRuleConfig
+                                                .class);
+
+        @Override
+        default ExpandWindowTableFunctionTransposeRule toRule() {
+            return new ExpandWindowTableFunctionTransposeRule(this);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
index b0aaaa460ff..798afe4e58c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
@@ -130,7 +130,7 @@ public class 
PullUpWindowTableFunctionIntoWindowAggregateRule
         //  1. transpose Calc and WindowTVF, build the new Calc node
         // 
-------------------------------------------------------------------------
         ImmutableBitSet windowColumns = 
fmq.getRelWindowProperties(windowTVF).getWindowColumns();
-        Tuple4<RexProgram, int[], Object, Object> programInfo =
+        Tuple4<RexProgram, int[], Integer, Boolean> programInfo =
                 WindowUtil.buildNewProgramWithoutWindowColumns(
                         cluster.getRexBuilder(),
                         calc.getProgram(),
@@ -139,7 +139,7 @@ public class 
PullUpWindowTableFunctionIntoWindowAggregateRule
                         windowColumns.toArray());
         RexProgram newProgram = programInfo._1();
         int[] aggInputFieldsShift = programInfo._2();
-        int timeAttributeIndex = (int) programInfo._3();
+        int timeAttributeIndex = programInfo._3();
 
         StreamPhysicalCalc newCalc =
                 new StreamPhysicalCalc(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.scala
deleted file mode 100644
index 96688fbd232..00000000000
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.functions.sql.SqlWindowTableFunction
-import 
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalCalc, 
StreamPhysicalExpand, StreamPhysicalWindowAggregate, 
StreamPhysicalWindowTableFunction}
-import org.apache.flink.table.planner.plan.utils.WindowUtil
-import 
org.apache.flink.table.planner.plan.utils.WindowUtil.buildNewProgramWithoutWindowColumns
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode, RexProgram}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * This rule transposes [[StreamPhysicalExpand]] past 
[[StreamPhysicalWindowTableFunction]] to make
- * [[PullUpWindowTableFunctionIntoWindowAggregateRule]] can match the rel tree 
pattern and optimize
- * them into [[StreamPhysicalWindowAggregate]].
- *
- * Example:
- *
- * MyTable: a INT, c STRING, rowtime TIMESTAMP(3)
- *
- * SQL:
- * {{{
- * SELECT
- *    window_start,
- *    window_end,
- *    count(distinct a),
- *    count(distinct c)
- * FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
- * GROUP BY window_start, window_end
- * }}}
- *
- * We will get part of the initial physical plan like following:
- * {{{
- * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
- * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a) 
FILTER $g_1 AS $f2,
- * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$) 
AS window_end])
- * +- Exchange(distribution=[hash[$f4, $f5]])
- *    +- Calc(select=[window_start, window_end, a, c, $f4, $f5, =($e, 1) AS 
$g_1, =($e, 2) AS $g_2])
- *       +- Expand(projects=[{window_start, window_end, a, c, $f4, null AS 
$f5, 1 AS $e},
- *       {window_start, window_end, a, c, null AS $f4, $f5, 2 AS $e}])
- *          +- Calc(select=[window_start, window_end, a, c,
- *          MOD(HASH_CODE(a), 1024) AS $f4, MOD(HASH_CODE(c), 1024) AS $f5])
- *             +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])])
- * }}}
- *
- * However, it can't match 
[[PullUpWindowTableFunctionIntoWindowAggregateRule]], because
- * [[StreamPhysicalWindowTableFunction]] is not near 
[[StreamPhysicalWindowAggregate]]. So we need
- * to transpose [[StreamPhysicalExpand]] past 
[[StreamPhysicalWindowTableFunction]] to make the part
- * of rel tree like this which can be matched by
- * [[PullUpWindowTableFunctionIntoWindowAggregateRule]].
- *
- * {{{
- * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
- * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a) 
FILTER $g_1 AS $f2,
- * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$) 
AS window_end])
- * +- Exchange(distribution=[hash[$f4, $f5]])
- *   +- Calc(select=[window_start, window_end, a, c, $f4, $f5, ($e = 1) AS 
$g_1, ($e = 2) AS $g_2])
- *     +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])
- *       +- Expand(...)
- * }}}
- * </pre>
- */
-class ExpandWindowTableFunctionTransposeRule
-  extends RelOptRule(
-    operand(
-      classOf[StreamPhysicalExpand],
-      operand(
-        classOf[StreamPhysicalCalc],
-        operand(classOf[StreamPhysicalWindowTableFunction], any()))),
-    "ExpandWindowTableFunctionTransposeRule") {
-
-  override def matches(call: RelOptRuleCall): Boolean = {
-    val expand: StreamPhysicalExpand = call.rel(0)
-    val calc: StreamPhysicalCalc = call.rel(1)
-    val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(calc.getCluster.getMetadataQuery)
-
-    // condition and projection of Calc shouldn't contain calls on window 
columns,
-    // otherwise, we can't transpose WindowTVF and Calc
-    if (WindowUtil.calcContainsCallsOnWindowColumns(calc, fmq)) {
-      return false
-    }
-
-    // we only transpose WindowTVF when expand propagate window_start and 
window_end,
-    // otherwise, it's meaningless to transpose
-    val expandWindowProps = fmq.getRelWindowProperties(expand)
-    expandWindowProps != null &&
-    !expandWindowProps.getWindowStartColumns.isEmpty &&
-    !expandWindowProps.getWindowEndColumns.isEmpty
-  }
-
-  override def onMatch(call: RelOptRuleCall): Unit = {
-    val expand: StreamPhysicalExpand = call.rel(0)
-    val calc: StreamPhysicalCalc = call.rel(1)
-    val windowTVF: StreamPhysicalWindowTableFunction = call.rel(2)
-    val cluster = expand.getCluster
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery)
-    val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    val input = windowTVF.getInput
-    val inputRowType = input.getRowType
-
-    val requiredInputTraitSet = 
input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
-    val newInput: RelNode = RelOptRule.convert(input, requiredInputTraitSet)
-
-    // 
-------------------------------------------------------------------------
-    //  1. transpose Calc and WindowTVF, build the new Calc node (the top node)
-    // 
-------------------------------------------------------------------------
-    val windowColumns = fmq.getRelWindowProperties(windowTVF).getWindowColumns
-    val (newProgram, fieldShifting, newTimeField, timeFieldAdded) =
-      buildNewProgramWithoutWindowColumns(
-        cluster.getRexBuilder,
-        calc.getProgram,
-        inputRowType,
-        windowTVF.windowing.getTimeAttributeIndex,
-        windowColumns.toArray)
-    val newCalc = new StreamPhysicalCalc(
-      cluster,
-      calc.getTraitSet,
-      newInput,
-      newProgram,
-      newProgram.getOutputRowType)
-
-    // 
-------------------------------------------------------------------------
-    //  2. Adjust input ref index in Expand, append time attribute ref if 
needed
-    // 
-------------------------------------------------------------------------
-    val newExpand = buildNewExpand(expand, newCalc, fieldShifting, 
newTimeField, timeFieldAdded)
-
-    // 
-------------------------------------------------------------------------
-    //  3. Apply WindowTVF on the new Expand node
-    // 
-------------------------------------------------------------------------
-    val newOutputType = SqlWindowTableFunction.inferRowType(
-      typeFactory,
-      newExpand.getRowType,
-      
typeFactory.createFieldTypeFromLogicalType(windowTVF.windowing.getTimeAttributeType))
-    val timeAttributeOnExpand = if (timeFieldAdded) {
-      // the time attribute ref is appended
-      newExpand.getRowType.getFieldCount - 1
-    } else {
-      newTimeField
-    }
-    val newWindowing = new TimeAttributeWindowingStrategy(
-      windowTVF.windowing.getWindow,
-      windowTVF.windowing.getTimeAttributeType,
-      timeAttributeOnExpand)
-    val newWindowTVF = new StreamPhysicalWindowTableFunction(
-      cluster,
-      windowTVF.getTraitSet,
-      newExpand,
-      newOutputType,
-      newWindowing)
-
-    // 
-------------------------------------------------------------------------
-    //  4. Apply Calc on the new WindowTVF to adjust the fields mapping
-    // 
-------------------------------------------------------------------------
-    val projectionMapping = getProjectionMapping(fmq, expand, newWindowTVF)
-    val projectExprs = projectionMapping.map(RexInputRef.of(_, 
newWindowTVF.getRowType))
-    val topRexProgram = RexProgram.create(
-      newWindowTVF.getRowType,
-      projectExprs.toList.asJava,
-      null, // no filter
-      expand.getRowType,
-      cluster.getRexBuilder)
-    val topCalc = new StreamPhysicalCalc(
-      cluster,
-      expand.getTraitSet,
-      newWindowTVF,
-      topRexProgram,
-      topRexProgram.getOutputRowType)
-
-    // 
-------------------------------------------------------------------------
-    //  5. Finish
-    // 
-------------------------------------------------------------------------
-    call.transformTo(topCalc)
-  }
-
-  private def buildNewExpand(
-      expand: StreamPhysicalExpand,
-      newCalc: StreamPhysicalCalc,
-      inputFieldShifting: Array[Int],
-      newTimeField: Int,
-      timeFieldAdded: Boolean): StreamPhysicalExpand = {
-    val newInputRowType = newCalc.getRowType
-    val expandIdIndex = expand.expandIdIndex
-    var newExpandIdIndex = -1
-    val newProjects = expand.projects.asScala.map {
-      exprs =>
-        val newExprs = ArrayBuffer[RexNode]()
-        var baseOffset = 0
-        exprs.asScala.zipWithIndex.foreach {
-          case (ref: RexInputRef, _) if inputFieldShifting(ref.getIndex) < 0 =>
-          // skip the window columns
-          case (ref: RexInputRef, _) =>
-            val newInputIndex = inputFieldShifting(ref.getIndex)
-            newExprs += RexInputRef.of(newInputIndex, newInputRowType)
-            // we only use the type from input ref instead of literal
-            baseOffset += 1
-          case (lit: RexLiteral, exprIndex) =>
-            newExprs += lit
-            if (exprIndex == expandIdIndex) {
-              // this is the expand id, we should remember the new index of 
expand id
-              // and update type for this expr
-              newExpandIdIndex = baseOffset
-            }
-            baseOffset += 1
-          case exp @ _ =>
-            throw new IllegalArgumentException(
-              "Expand node should only contain RexInputRef and RexLiteral, but 
got " + exp)
-        }
-        if (timeFieldAdded) {
-          // append time attribute reference if needed
-          newExprs += RexInputRef.of(newTimeField, newInputRowType)
-        }
-        newExprs.asJava
-    }
-
-    new StreamPhysicalExpand(
-      expand.getCluster,
-      expand.getTraitSet,
-      newCalc,
-      newProjects.asJava,
-      newExpandIdIndex
-    )
-  }
-
-  private def getProjectionMapping(
-      fmq: FlinkRelMetadataQuery,
-      oldExpand: StreamPhysicalExpand,
-      newWindowTVF: StreamPhysicalWindowTableFunction): Array[Int] = {
-    val windowProps = fmq.getRelWindowProperties(oldExpand)
-    val startColumns = windowProps.getWindowStartColumns.toArray
-    val endColumns = windowProps.getWindowEndColumns.toArray
-    val timeColumns = windowProps.getWindowTimeColumns.toArray
-    val newWindowTimePos = newWindowTVF.getRowType.getFieldCount - 1
-    val newWindowEndPos = newWindowTVF.getRowType.getFieldCount - 2
-    val newWindowStartPos = newWindowTVF.getRowType.getFieldCount - 3
-    var numWindowColumns = 0
-    val projectMapping = ArrayBuffer[Int]()
-    (0 until oldExpand.getRowType.getFieldCount).foreach {
-      index =>
-        if (startColumns.contains(index)) {
-          projectMapping += newWindowStartPos
-          numWindowColumns += 1
-        } else if (endColumns.contains(index)) {
-          projectMapping += newWindowEndPos
-          numWindowColumns += 1
-        } else if (timeColumns.contains(index)) {
-          projectMapping += newWindowTimePos
-          numWindowColumns += 1
-        } else {
-          projectMapping += index - numWindowColumns
-        }
-    }
-    projectMapping.toArray
-  }
-}
-
-object ExpandWindowTableFunctionTransposeRule {
-  val INSTANCE = new ExpandWindowTableFunctionTransposeRule
-}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 9102f68528d..fd965e3ef5f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -43,7 +43,6 @@ import org.apache.calcite.rel.{BiRel, RelNode, RelVisitor}
 import org.apache.calcite.rel.core._
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.SqlTypeFamily
-import org.apache.calcite.sql.{SqlKind, SqlUtil}
 import org.apache.calcite.util.{ImmutableBitSet, Util}
 
 import java.time.Duration
@@ -123,7 +122,7 @@ object WindowUtil {
       oldProgram: RexProgram,
       inputRowType: RelDataType,
       inputTimeAttributeIndex: Int,
-      windowColumns: Array[Int]): (RexProgram, Array[Int], Int, Boolean) = {
+      windowColumns: Array[Int]): (RexProgram, Array[Int], Integer, 
java.lang.Boolean) = {
     val programBuilder = new RexProgramBuilder(inputRowType, rexBuilder)
     // mapping from original field index to new field index
     var containsTimeAttribute = false


Reply via email to