This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4f85d3074ec [FLINK-38971][table] Migrate
ExpandWindowTableFunctionTransposeRule to java
4f85d3074ec is described below
commit 4f85d3074eccfe628e2926269ec7e943c61d2a9c
Author: Roman <[email protected]>
AuthorDate: Mon Mar 2 20:41:49 2026 +0100
[FLINK-38971][table] Migrate ExpandWindowTableFunctionTransposeRule to java
---
.../ExpandWindowTableFunctionTransposeRule.java | 363 +++++++++++++++++++++
...WindowTableFunctionIntoWindowAggregateRule.java | 4 +-
.../ExpandWindowTableFunctionTransposeRule.scala | 283 ----------------
.../table/planner/plan/utils/WindowUtil.scala | 3 +-
4 files changed, 366 insertions(+), 287 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.java
new file mode 100644
index 00000000000..ddba3c17591
--- /dev/null
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.SqlWindowTableFunction;
+import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalc;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExpand;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowTableFunction;
+import org.apache.flink.table.planner.plan.trait.RelWindowProperties;
+import org.apache.flink.table.planner.plan.utils.WindowUtil;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.immutables.value.Value;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple4;
+
+/**
+ * This rule transposes {@link StreamPhysicalExpand} past {@link
StreamPhysicalWindowTableFunction}
+ * to make {@link PullUpWindowTableFunctionIntoWindowAggregateRule} can match
the rel tree pattern
+ * and optimize them into {@link StreamPhysicalWindowAggregate}.
+ *
+ * <p>Example:
+ *
+ * <p>MyTable: a INT, c STRING, rowtime TIMESTAMP(3)
+ *
+ * <p>SQL:
+ *
+ * <pre>{@code
+ * SELECT
+ * window_start,
+ * window_end,
+ * count(distinct a),
+ * count(distinct c)
+ * FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+ * GROUP BY window_start, window_end
+ * }</pre>
+ *
+ * <p>We will get part of the initial physical plan like following:
+ *
+ * <pre>{@code
+ * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
+ * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a)
FILTER $g_1 AS $f2,
+ * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$)
AS window_end])
+ * +- Exchange(distribution=[hash[$f4, $f5]])
+ * +- Calc(select=[window_start, window_end, a, c, $f4, $f5, =($e, 1) AS
$g_1, =($e, 2) AS $g_2])
+ * +- Expand(projects=[{window_start, window_end, a, c, $f4, null AS
$f5, 1 AS $e},
+ * {window_start, window_end, a, c, null AS $f4, $f5, 2 AS $e}])
+ * +- Calc(select=[window_start, window_end, a, c,
+ * MOD(HASH_CODE(a), 1024) AS $f4, MOD(HASH_CODE(c), 1024) AS $f5])
+ * +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime],
size=[15 min])])
+ * }</pre>
+ *
+ * <p>However, it can't match {@link
PullUpWindowTableFunctionIntoWindowAggregateRule}, because
+ * {@link StreamPhysicalWindowTableFunction} is not near {@link
StreamPhysicalWindowAggregate}. So
+ * we need to transpose {@link StreamPhysicalExpand} past {@link
StreamPhysicalWindowTableFunction}
+ * to make the part of rel tree like this which can be matched by {@link
+ * PullUpWindowTableFunctionIntoWindowAggregateRule}.
+ *
+ * <pre>{@code
+ * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
+ * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a)
FILTER $g_1 AS $f2,
+ * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$)
AS window_end])
+ * +- Exchange(distribution=[hash[$f4, $f5]])
+ * +- Calc(select=[window_start, window_end, a, c, $f4, $f5, ($e = 1) AS
$g_1, ($e = 2) AS $g_2])
+ * +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15
min])])
+ * +- Expand(...)
+ * }</pre>
+ */
[email protected]
+public class ExpandWindowTableFunctionTransposeRule
+ extends RelRule<
+ ExpandWindowTableFunctionTransposeRule
+ .ExpandWindowTableFunctionTransposeRuleConfig> {
+
+ public static final ExpandWindowTableFunctionTransposeRule INSTANCE =
+
ExpandWindowTableFunctionTransposeRule.ExpandWindowTableFunctionTransposeRuleConfig
+ .DEFAULT
+ .toRule();
+
+ protected ExpandWindowTableFunctionTransposeRule(
+ ExpandWindowTableFunctionTransposeRuleConfig config) {
+ super(config);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ StreamPhysicalExpand expand = call.rel(0);
+ StreamPhysicalCalc calc = call.rel(1);
+ FlinkRelMetadataQuery fmq =
+
FlinkRelMetadataQuery.reuseOrCreate(calc.getCluster().getMetadataQuery());
+
+ // condition and projection of Calc shouldn't contain calls on window
columns,
+ // otherwise, we can't transpose WindowTVF and Calc
+ if (WindowUtil.calcContainsCallsOnWindowColumns(calc, fmq)) {
+ return false;
+ }
+
+ // we only transpose WindowTVF when expand propagate window_start and
window_end,
+ // otherwise, it's meaningless to transpose
+ RelWindowProperties expandWindowProps =
fmq.getRelWindowProperties(expand);
+ return expandWindowProps != null
+ && !expandWindowProps.getWindowStartColumns().isEmpty()
+ && !expandWindowProps.getWindowEndColumns().isEmpty();
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ StreamPhysicalExpand expand = call.rel(0);
+ StreamPhysicalCalc calc = call.rel(1);
+ StreamPhysicalWindowTableFunction windowTVF = call.rel(2);
+ RelOptCluster cluster = expand.getCluster();
+ FlinkRelMetadataQuery fmq =
FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery());
+ FlinkTypeFactory typeFactory = (FlinkTypeFactory)
cluster.getTypeFactory();
+ RelNode input = windowTVF.getInput();
+ RelDataType inputRowType = input.getRowType();
+
+ RelTraitSet requiredInputTraitSet =
+
input.getTraitSet().replace(FlinkConventions.STREAM_PHYSICAL());
+ RelNode newInput = RelOptRule.convert(input, requiredInputTraitSet);
+
+ //
-------------------------------------------------------------------------
+ // 1. transpose Calc and WindowTVF, build the new Calc node (the top
node)
+ //
-------------------------------------------------------------------------
+ ImmutableBitSet windowColumns =
fmq.getRelWindowProperties(windowTVF).getWindowColumns();
+ Tuple4<RexProgram, int[], Integer, Boolean> programInfo =
+ WindowUtil.buildNewProgramWithoutWindowColumns(
+ cluster.getRexBuilder(),
+ calc.getProgram(),
+ inputRowType,
+ windowTVF.windowing().getTimeAttributeIndex(),
+ windowColumns.toArray());
+ RexProgram newProgram = programInfo._1();
+ int[] fieldShifting = programInfo._2();
+ int newTimeField = programInfo._3();
+ boolean timeFieldAdded = programInfo._4();
+
+ StreamPhysicalCalc newCalc =
+ new StreamPhysicalCalc(
+ cluster,
+ calc.getTraitSet(),
+ newInput,
+ newProgram,
+ newProgram.getOutputRowType());
+
+ //
-------------------------------------------------------------------------
+ // 2. Adjust input ref index in Expand, append time attribute ref if
needed
+ //
-------------------------------------------------------------------------
+ StreamPhysicalExpand newExpand =
+ buildNewExpand(expand, newCalc, fieldShifting, newTimeField,
timeFieldAdded);
+
+ //
-------------------------------------------------------------------------
+ // 3. Apply WindowTVF on the new Expand node
+ //
-------------------------------------------------------------------------
+ RelDataType newOutputType =
+ SqlWindowTableFunction.inferRowType(
+ typeFactory,
+ newExpand.getRowType(),
+ typeFactory.createFieldTypeFromLogicalType(
+ windowTVF.windowing().getTimeAttributeType()));
+ // the time attribute ref is appended
+ int timeAttributeOnExpand =
+ timeFieldAdded ? newExpand.getRowType().getFieldCount() - 1 :
newTimeField;
+ TimeAttributeWindowingStrategy newWindowing =
+ new TimeAttributeWindowingStrategy(
+ windowTVF.windowing().getWindow(),
+ windowTVF.windowing().getTimeAttributeType(),
+ timeAttributeOnExpand);
+ StreamPhysicalWindowTableFunction newWindowTVF =
+ new StreamPhysicalWindowTableFunction(
+ cluster, windowTVF.getTraitSet(), newExpand,
newOutputType, newWindowing);
+
+ //
-------------------------------------------------------------------------
+ // 4. Apply Calc on the new WindowTVF to adjust the fields mapping
+ //
-------------------------------------------------------------------------
+ int[] projectionMapping = getProjectionMapping(fmq, expand,
newWindowTVF);
+ List<RexNode> projectExprs =
+ Arrays.stream(projectionMapping)
+ .mapToObj(index -> RexInputRef.of(index,
newWindowTVF.getRowType()))
+ .collect(Collectors.toList());
+ RexProgram topRexProgram =
+ RexProgram.create(
+ newWindowTVF.getRowType(),
+ projectExprs,
+ null,
+ expand.getRowType(),
+ cluster.getRexBuilder());
+ StreamPhysicalCalc topCalc =
+ new StreamPhysicalCalc(
+ cluster,
+ expand.getTraitSet(),
+ newWindowTVF,
+ topRexProgram,
+ topRexProgram.getOutputRowType());
+
+ //
-------------------------------------------------------------------------
+ // 5. Finish
+ //
-------------------------------------------------------------------------
+ call.transformTo(topCalc);
+ }
+
+ private StreamPhysicalExpand buildNewExpand(
+ StreamPhysicalExpand expand,
+ StreamPhysicalCalc newCalc,
+ int[] inputFieldShifting,
+ int newTimeField,
+ boolean timeFieldAdded) {
+ RelDataType newInputRowType = newCalc.getRowType();
+ int expandIdIndex = expand.expandIdIndex();
+ int newExpandIdIndex = -1;
+ List<List<RexNode>> newProjects = new ArrayList<>();
+
+ for (List<RexNode> exprs : expand.projects()) {
+ List<RexNode> newExprs = new ArrayList<>();
+ int baseOffset = 0;
+ for (int exprIndex = 0; exprIndex < exprs.size(); exprIndex++) {
+ RexNode expr = exprs.get(exprIndex);
+ if (expr instanceof RexInputRef) {
+ int shiftedIndex = inputFieldShifting[((RexInputRef)
expr).getIndex()];
+ if (shiftedIndex < 0) {
+ // skip the window columns
+ continue;
+ }
+ newExprs.add(RexInputRef.of(shiftedIndex,
newInputRowType));
+ // we only use the type from input ref instead of literal
+ baseOffset++;
+ } else if (expr instanceof RexLiteral) {
+ newExprs.add(expr);
+ if (exprIndex == expandIdIndex) {
+ // this is the expand id, we should remember the new
index of expand id
+ // and update type for this expr
+ newExpandIdIndex = baseOffset;
+ }
+ baseOffset++;
+ } else {
+ throw new IllegalArgumentException(
+ "Expand node should only contain RexInputRef and
RexLiteral, but got "
+ + expr);
+ }
+ }
+ if (timeFieldAdded) {
+ // append time attribute reference if needed
+ newExprs.add(RexInputRef.of(newTimeField, newInputRowType));
+ }
+ newProjects.add(newExprs);
+ }
+
+ return new StreamPhysicalExpand(
+ expand.getCluster(), expand.getTraitSet(), newCalc,
newProjects, newExpandIdIndex);
+ }
+
+ private int[] getProjectionMapping(
+ FlinkRelMetadataQuery fmq,
+ StreamPhysicalExpand oldExpand,
+ StreamPhysicalWindowTableFunction newWindowTVF) {
+ RelWindowProperties windowProps =
fmq.getRelWindowProperties(oldExpand);
+ Set<Integer> startColumns =
+ Arrays.stream(windowProps.getWindowStartColumns().toArray())
+ .boxed()
+ .collect(Collectors.toSet());
+ Set<Integer> endColumns =
+ Arrays.stream(windowProps.getWindowEndColumns().toArray())
+ .boxed()
+ .collect(Collectors.toSet());
+ Set<Integer> timeColumns =
+ Arrays.stream(windowProps.getWindowTimeColumns().toArray())
+ .boxed()
+ .collect(Collectors.toSet());
+ int newWindowTimePos = newWindowTVF.getRowType().getFieldCount() - 1;
+ int newWindowEndPos = newWindowTVF.getRowType().getFieldCount() - 2;
+ int newWindowStartPos = newWindowTVF.getRowType().getFieldCount() - 3;
+ int numWindowColumns = 0;
+ List<Integer> projectMapping = new ArrayList<>();
+
+ for (int index = 0; index < oldExpand.getRowType().getFieldCount();
index++) {
+ if (startColumns.contains(index)) {
+ projectMapping.add(newWindowStartPos);
+ numWindowColumns++;
+ } else if (endColumns.contains(index)) {
+ projectMapping.add(newWindowEndPos);
+ numWindowColumns++;
+ } else if (timeColumns.contains(index)) {
+ projectMapping.add(newWindowTimePos);
+ numWindowColumns++;
+ } else {
+ projectMapping.add(index - numWindowColumns);
+ }
+ }
+
+ return projectMapping.stream().mapToInt(Integer::intValue).toArray();
+ }
+
+ /** Configuration for {@link ExpandWindowTableFunctionTransposeRule}. */
+ @Value.Immutable(singleton = false)
+ public interface ExpandWindowTableFunctionTransposeRuleConfig extends
RelRule.Config {
+
ExpandWindowTableFunctionTransposeRule.ExpandWindowTableFunctionTransposeRuleConfig
+ DEFAULT =
+ ImmutableExpandWindowTableFunctionTransposeRule
+
.ExpandWindowTableFunctionTransposeRuleConfig.builder()
+ .build()
+ .withOperandSupplier(
+ b0 ->
+
b0.operand(StreamPhysicalExpand.class)
+ .oneInput(
+ b1 ->
+
b1.operand(
+
StreamPhysicalCalc
+
.class)
+
.oneInput(
+
b2 ->
+
b2.operand(
+
StreamPhysicalWindowTableFunction
+
.class)
+
.anyInputs())))
+
.withDescription("ExpandWindowTableFunctionTransposeRule")
+ .as(
+ ExpandWindowTableFunctionTransposeRule
+
.ExpandWindowTableFunctionTransposeRuleConfig
+ .class);
+
+ @Override
+ default ExpandWindowTableFunctionTransposeRule toRule() {
+ return new ExpandWindowTableFunctionTransposeRule(this);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
index b0aaaa460ff..798afe4e58c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.java
@@ -130,7 +130,7 @@ public class
PullUpWindowTableFunctionIntoWindowAggregateRule
// 1. transpose Calc and WindowTVF, build the new Calc node
//
-------------------------------------------------------------------------
ImmutableBitSet windowColumns =
fmq.getRelWindowProperties(windowTVF).getWindowColumns();
- Tuple4<RexProgram, int[], Object, Object> programInfo =
+ Tuple4<RexProgram, int[], Integer, Boolean> programInfo =
WindowUtil.buildNewProgramWithoutWindowColumns(
cluster.getRexBuilder(),
calc.getProgram(),
@@ -139,7 +139,7 @@ public class
PullUpWindowTableFunctionIntoWindowAggregateRule
windowColumns.toArray());
RexProgram newProgram = programInfo._1();
int[] aggInputFieldsShift = programInfo._2();
- int timeAttributeIndex = (int) programInfo._3();
+ int timeAttributeIndex = programInfo._3();
StreamPhysicalCalc newCalc =
new StreamPhysicalCalc(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.scala
deleted file mode 100644
index 96688fbd232..00000000000
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ExpandWindowTableFunctionTransposeRule.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.planner.plan.rules.physical.stream
-
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.functions.sql.SqlWindowTableFunction
-import
org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions
-import
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalCalc,
StreamPhysicalExpand, StreamPhysicalWindowAggregate,
StreamPhysicalWindowTableFunction}
-import org.apache.flink.table.planner.plan.utils.WindowUtil
-import
org.apache.flink.table.planner.plan.utils.WindowUtil.buildNewProgramWithoutWindowColumns
-
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode, RexProgram}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * This rule transposes [[StreamPhysicalExpand]] past
[[StreamPhysicalWindowTableFunction]] to make
- * [[PullUpWindowTableFunctionIntoWindowAggregateRule]] can match the rel tree
pattern and optimize
- * them into [[StreamPhysicalWindowAggregate]].
- *
- * Example:
- *
- * MyTable: a INT, c STRING, rowtime TIMESTAMP(3)
- *
- * SQL:
- * {{{
- * SELECT
- * window_start,
- * window_end,
- * count(distinct a),
- * count(distinct c)
- * FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
- * GROUP BY window_start, window_end
- * }}}
- *
- * We will get part of the initial physical plan like following:
- * {{{
- * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
- * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a)
FILTER $g_1 AS $f2,
- * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$)
AS window_end])
- * +- Exchange(distribution=[hash[$f4, $f5]])
- * +- Calc(select=[window_start, window_end, a, c, $f4, $f5, =($e, 1) AS
$g_1, =($e, 2) AS $g_2])
- * +- Expand(projects=[{window_start, window_end, a, c, $f4, null AS
$f5, 1 AS $e},
- * {window_start, window_end, a, c, null AS $f4, $f5, 2 AS $e}])
- * +- Calc(select=[window_start, window_end, a, c,
- * MOD(HASH_CODE(a), 1024) AS $f4, MOD(HASH_CODE(c), 1024) AS $f5])
- * +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime],
size=[15 min])])
- * }}}
- *
- * However, it can't match
[[PullUpWindowTableFunctionIntoWindowAggregateRule]], because
- * [[StreamPhysicalWindowTableFunction]] is not near
[[StreamPhysicalWindowAggregate]]. So we need
- * to transpose [[StreamPhysicalExpand]] past
[[StreamPhysicalWindowTableFunction]] to make the part
- * of rel tree like this which can be matched by
- * [[PullUpWindowTableFunctionIntoWindowAggregateRule]].
- *
- * {{{
- * WindowAggregate(groupBy=[$f4, $f5], window=[TUMBLE(win_start=[window_start],
- * win_end=[window_end], size=[15 min])], select=[$f4, $f5, COUNT(DISTINCT a)
FILTER $g_1 AS $f2,
- * COUNT(DISTINCT c) FILTER $g_2 AS $f3, start('w$) AS window_start, end('w$)
AS window_end])
- * +- Exchange(distribution=[hash[$f4, $f5]])
- * +- Calc(select=[window_start, window_end, a, c, $f4, $f5, ($e = 1) AS
$g_1, ($e = 2) AS $g_2])
- * +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15
min])])
- * +- Expand(...)
- * }}}
- * </pre>
- */
-class ExpandWindowTableFunctionTransposeRule
- extends RelOptRule(
- operand(
- classOf[StreamPhysicalExpand],
- operand(
- classOf[StreamPhysicalCalc],
- operand(classOf[StreamPhysicalWindowTableFunction], any()))),
- "ExpandWindowTableFunctionTransposeRule") {
-
- override def matches(call: RelOptRuleCall): Boolean = {
- val expand: StreamPhysicalExpand = call.rel(0)
- val calc: StreamPhysicalCalc = call.rel(1)
- val fmq =
FlinkRelMetadataQuery.reuseOrCreate(calc.getCluster.getMetadataQuery)
-
- // condition and projection of Calc shouldn't contain calls on window
columns,
- // otherwise, we can't transpose WindowTVF and Calc
- if (WindowUtil.calcContainsCallsOnWindowColumns(calc, fmq)) {
- return false
- }
-
- // we only transpose WindowTVF when expand propagate window_start and
window_end,
- // otherwise, it's meaningless to transpose
- val expandWindowProps = fmq.getRelWindowProperties(expand)
- expandWindowProps != null &&
- !expandWindowProps.getWindowStartColumns.isEmpty &&
- !expandWindowProps.getWindowEndColumns.isEmpty
- }
-
- override def onMatch(call: RelOptRuleCall): Unit = {
- val expand: StreamPhysicalExpand = call.rel(0)
- val calc: StreamPhysicalCalc = call.rel(1)
- val windowTVF: StreamPhysicalWindowTableFunction = call.rel(2)
- val cluster = expand.getCluster
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(cluster.getMetadataQuery)
- val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- val input = windowTVF.getInput
- val inputRowType = input.getRowType
-
- val requiredInputTraitSet =
input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val newInput: RelNode = RelOptRule.convert(input, requiredInputTraitSet)
-
- //
-------------------------------------------------------------------------
- // 1. transpose Calc and WindowTVF, build the new Calc node (the top node)
- //
-------------------------------------------------------------------------
- val windowColumns = fmq.getRelWindowProperties(windowTVF).getWindowColumns
- val (newProgram, fieldShifting, newTimeField, timeFieldAdded) =
- buildNewProgramWithoutWindowColumns(
- cluster.getRexBuilder,
- calc.getProgram,
- inputRowType,
- windowTVF.windowing.getTimeAttributeIndex,
- windowColumns.toArray)
- val newCalc = new StreamPhysicalCalc(
- cluster,
- calc.getTraitSet,
- newInput,
- newProgram,
- newProgram.getOutputRowType)
-
- //
-------------------------------------------------------------------------
- // 2. Adjust input ref index in Expand, append time attribute ref if
needed
- //
-------------------------------------------------------------------------
- val newExpand = buildNewExpand(expand, newCalc, fieldShifting,
newTimeField, timeFieldAdded)
-
- //
-------------------------------------------------------------------------
- // 3. Apply WindowTVF on the new Expand node
- //
-------------------------------------------------------------------------
- val newOutputType = SqlWindowTableFunction.inferRowType(
- typeFactory,
- newExpand.getRowType,
-
typeFactory.createFieldTypeFromLogicalType(windowTVF.windowing.getTimeAttributeType))
- val timeAttributeOnExpand = if (timeFieldAdded) {
- // the time attribute ref is appended
- newExpand.getRowType.getFieldCount - 1
- } else {
- newTimeField
- }
- val newWindowing = new TimeAttributeWindowingStrategy(
- windowTVF.windowing.getWindow,
- windowTVF.windowing.getTimeAttributeType,
- timeAttributeOnExpand)
- val newWindowTVF = new StreamPhysicalWindowTableFunction(
- cluster,
- windowTVF.getTraitSet,
- newExpand,
- newOutputType,
- newWindowing)
-
- //
-------------------------------------------------------------------------
- // 4. Apply Calc on the new WindowTVF to adjust the fields mapping
- //
-------------------------------------------------------------------------
- val projectionMapping = getProjectionMapping(fmq, expand, newWindowTVF)
- val projectExprs = projectionMapping.map(RexInputRef.of(_,
newWindowTVF.getRowType))
- val topRexProgram = RexProgram.create(
- newWindowTVF.getRowType,
- projectExprs.toList.asJava,
- null, // no filter
- expand.getRowType,
- cluster.getRexBuilder)
- val topCalc = new StreamPhysicalCalc(
- cluster,
- expand.getTraitSet,
- newWindowTVF,
- topRexProgram,
- topRexProgram.getOutputRowType)
-
- //
-------------------------------------------------------------------------
- // 5. Finish
- //
-------------------------------------------------------------------------
- call.transformTo(topCalc)
- }
-
- private def buildNewExpand(
- expand: StreamPhysicalExpand,
- newCalc: StreamPhysicalCalc,
- inputFieldShifting: Array[Int],
- newTimeField: Int,
- timeFieldAdded: Boolean): StreamPhysicalExpand = {
- val newInputRowType = newCalc.getRowType
- val expandIdIndex = expand.expandIdIndex
- var newExpandIdIndex = -1
- val newProjects = expand.projects.asScala.map {
- exprs =>
- val newExprs = ArrayBuffer[RexNode]()
- var baseOffset = 0
- exprs.asScala.zipWithIndex.foreach {
- case (ref: RexInputRef, _) if inputFieldShifting(ref.getIndex) < 0 =>
- // skip the window columns
- case (ref: RexInputRef, _) =>
- val newInputIndex = inputFieldShifting(ref.getIndex)
- newExprs += RexInputRef.of(newInputIndex, newInputRowType)
- // we only use the type from input ref instead of literal
- baseOffset += 1
- case (lit: RexLiteral, exprIndex) =>
- newExprs += lit
- if (exprIndex == expandIdIndex) {
- // this is the expand id, we should remember the new index of
expand id
- // and update type for this expr
- newExpandIdIndex = baseOffset
- }
- baseOffset += 1
- case exp @ _ =>
- throw new IllegalArgumentException(
- "Expand node should only contain RexInputRef and RexLiteral, but
got " + exp)
- }
- if (timeFieldAdded) {
- // append time attribute reference if needed
- newExprs += RexInputRef.of(newTimeField, newInputRowType)
- }
- newExprs.asJava
- }
-
- new StreamPhysicalExpand(
- expand.getCluster,
- expand.getTraitSet,
- newCalc,
- newProjects.asJava,
- newExpandIdIndex
- )
- }
-
- private def getProjectionMapping(
- fmq: FlinkRelMetadataQuery,
- oldExpand: StreamPhysicalExpand,
- newWindowTVF: StreamPhysicalWindowTableFunction): Array[Int] = {
- val windowProps = fmq.getRelWindowProperties(oldExpand)
- val startColumns = windowProps.getWindowStartColumns.toArray
- val endColumns = windowProps.getWindowEndColumns.toArray
- val timeColumns = windowProps.getWindowTimeColumns.toArray
- val newWindowTimePos = newWindowTVF.getRowType.getFieldCount - 1
- val newWindowEndPos = newWindowTVF.getRowType.getFieldCount - 2
- val newWindowStartPos = newWindowTVF.getRowType.getFieldCount - 3
- var numWindowColumns = 0
- val projectMapping = ArrayBuffer[Int]()
- (0 until oldExpand.getRowType.getFieldCount).foreach {
- index =>
- if (startColumns.contains(index)) {
- projectMapping += newWindowStartPos
- numWindowColumns += 1
- } else if (endColumns.contains(index)) {
- projectMapping += newWindowEndPos
- numWindowColumns += 1
- } else if (timeColumns.contains(index)) {
- projectMapping += newWindowTimePos
- numWindowColumns += 1
- } else {
- projectMapping += index - numWindowColumns
- }
- }
- projectMapping.toArray
- }
-}
-
-object ExpandWindowTableFunctionTransposeRule {
- val INSTANCE = new ExpandWindowTableFunctionTransposeRule
-}
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
index 9102f68528d..fd965e3ef5f 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
@@ -43,7 +43,6 @@ import org.apache.calcite.rel.{BiRel, RelNode, RelVisitor}
import org.apache.calcite.rel.core._
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.SqlTypeFamily
-import org.apache.calcite.sql.{SqlKind, SqlUtil}
import org.apache.calcite.util.{ImmutableBitSet, Util}
import java.time.Duration
@@ -123,7 +122,7 @@ object WindowUtil {
oldProgram: RexProgram,
inputRowType: RelDataType,
inputTimeAttributeIndex: Int,
- windowColumns: Array[Int]): (RexProgram, Array[Int], Int, Boolean) = {
+ windowColumns: Array[Int]): (RexProgram, Array[Int], Integer,
java.lang.Boolean) = {
val programBuilder = new RexProgramBuilder(inputRowType, rexBuilder)
// mapping from original field index to new field index
var containsTimeAttribute = false