This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f1871fc31aa64e51fb812e6de457bad9e1c020c3 Author: xuyang <[email protected]> AuthorDate: Mon Jan 22 11:31:56 2024 +0800 [FLINK-34100][table] Support session window table function without pulling up with window agg This closes #24162 --- .../exec/batch/BatchExecWindowTableFunction.java | 10 ++ .../exec/common/CommonExecWindowTableFunction.java | 46 +++++--- .../exec/stream/StreamExecWindowAggregateBase.java | 20 ++-- .../exec/stream/StreamExecWindowTableFunction.java | 74 ++++++++++++ .../plan/utils/WindowTableFunctionUtil.java | 10 ++ .../plan/stream/sql/agg/WindowAggregateTest.xml | 130 +++++++++++++++++++++ .../plan/batch/sql/WindowTableFunctionTest.scala | 27 +++++ .../plan/stream/sql/agg/WindowAggregateTest.scala | 10 +- .../runtime/stream/sql/WindowAggregateITCase.scala | 37 ++++++ .../stream/sql/WindowTableFunctionITCase.scala | 104 ++++++++++++++++- .../aggregate/window/WindowAggOperatorBuilder.java | 25 ++-- .../window/tvf/unslicing/UnsliceAssigners.java | 129 ++++++++++++++++++++ 12 files changed, 573 insertions(+), 49 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java index b00ce302c19..2194474c273 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecWindowTableFunction.java @@ -61,4 +61,14 @@ public class BatchExecWindowTableFunction extends CommonExecWindowTableFunction } return super.translateToPlanInternal(planner, config); } + + @Override + protected Transformation<RowData> translateWithUnalignedWindow( + PlannerBase planner, + ExecNodeConfig config, + RowType inputRowType, + Transformation<RowData> inputTransform) { + throw new TableException( + "Unaligned windows like session are not supported in batch mode yet."); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java index 404f24c3cb9..df205561781 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java @@ -20,10 +20,8 @@ package org.apache.flink.table.planner.plan.nodes.exec.common; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; @@ -38,6 +36,7 @@ import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; import org.apache.flink.table.runtime.operators.window.tvf.operator.AlignedWindowTableFunctionOperator; +import org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorBase; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; @@ -79,22 +78,22 @@ public abstract class CommonExecWindowTableFunction extends ExecNodeBase<RowData @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { - if (windowingStrategy.getWindow() instanceof SessionWindowSpec) { - // TODO Support session window table function in ExecWindowTableFunction. See - // more at FLINK-34100 - throw new TableException("Session Window TableFunction is not supported yet."); - } final ExecEdge inputEdge = getInputEdges().get(0); final Transformation<RowData> inputTransform = (Transformation<RowData>) inputEdge.translateToPlan(planner); - GroupWindowAssigner<TimeWindow> windowAssigner = createWindowAssigner(windowingStrategy); - final ZoneId shiftTimeZone = - TimeWindowUtil.getShiftTimeZone( - windowingStrategy.getTimeAttributeType(), - TableConfigUtils.getLocalTimeZone(config)); - AlignedWindowTableFunctionOperator windowTableFunctionOperator = - new AlignedWindowTableFunctionOperator( - windowAssigner, windowingStrategy.getTimeAttributeIndex(), shiftTimeZone); + final boolean isAlignedWindow = windowingStrategy.getWindow().isAlignedWindow(); + if (isAlignedWindow) { + return translateWithAlignedWindow(config, inputTransform); + } else { + return translateWithUnalignedWindow( + planner, config, (RowType) inputEdge.getOutputType(), inputTransform); + } + } + + private Transformation<RowData> translateWithAlignedWindow( + ExecNodeConfig config, Transformation<RowData> inputTransform) { + final WindowTableFunctionOperatorBase windowTableFunctionOperator = + createAlignedWindowTableFunctionOperator(config); return ExecNodeUtil.createOneInputTransformation( inputTransform, createTransformationMeta(WINDOW_TRANSFORMATION, config), @@ -103,4 +102,21 @@ public abstract class CommonExecWindowTableFunction extends ExecNodeBase<RowData inputTransform.getParallelism(), false); } + + private WindowTableFunctionOperatorBase createAlignedWindowTableFunctionOperator( + ExecNodeConfig config) { + GroupWindowAssigner<TimeWindow> windowAssigner = createWindowAssigner(windowingStrategy); + final ZoneId shiftTimeZone = + TimeWindowUtil.getShiftTimeZone( + windowingStrategy.getTimeAttributeType(), + TableConfigUtils.getLocalTimeZone(config)); + return new AlignedWindowTableFunctionOperator( + windowAssigner, windowingStrategy.getTimeAttributeIndex(), shiftTimeZone); + } + + protected abstract Transformation<RowData> translateWithUnalignedWindow( + PlannerBase planner, + ExecNodeConfig config, + RowType inputRowType, + Transformation<RowData> inputTransform); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java index c3b515dc743..690d1aac035 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregateBase.java @@ -75,16 +75,20 @@ public abstract class StreamExecWindowAggregateBase extends StreamExecAggregateB WindowingStrategy windowingStrategy, ZoneId shiftTimeZone) { WindowSpec windowSpec = windowingStrategy.getWindow(); if (windowingStrategy instanceof WindowAttachedWindowingStrategy) { - checkArgument( - isAlignedWindow(windowSpec), - "UnsliceAssigner with WindowAttachedWindowingStrategy is not supported yet."); - + int windowStartIndex = + ((WindowAttachedWindowingStrategy) windowingStrategy).getWindowStart(); int windowEndIndex = ((WindowAttachedWindowingStrategy) windowingStrategy).getWindowEnd(); - // we don't need time attribute to assign windows, use a magic value in this case - SliceAssigner innerAssigner = - createSliceAssigner(windowSpec, Integer.MAX_VALUE, shiftTimeZone); - return SliceAssigners.windowed(windowEndIndex, innerAssigner); + if (isAlignedWindow(windowSpec)) { + // we don't need time attribute to assign windows, use a magic value in this case + SliceAssigner innerAssigner = + createSliceAssigner(windowSpec, Integer.MAX_VALUE, shiftTimeZone); + return SliceAssigners.windowed(windowEndIndex, innerAssigner); + } else { + UnsliceAssigner<TimeWindow> innerAssigner = + createUnsliceAssigner(windowSpec, windowEndIndex, shiftTimeZone); + return UnsliceAssigners.windowed(windowStartIndex, windowEndIndex, innerAssigner); + } } else if (windowingStrategy instanceof SliceAttachedWindowingStrategy) { checkArgument( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java index 7728712a6af..28fd8c128ee 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowTableFunction.java @@ -19,22 +19,44 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; import org.apache.flink.FlinkVersion; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; +import org.apache.flink.table.planner.plan.logical.WindowSpec; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecWindowTableFunction; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.tvf.operator.UnalignedWindowTableFunctionOperator; +import org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperatorBase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import java.time.ZoneId; import java.util.Collections; import java.util.List; +import static org.apache.flink.table.planner.plan.utils.WindowTableFunctionUtil.createWindowAssigner; +import static org.apache.flink.util.Preconditions.checkState; + /** * Stream {@link ExecNode} which acts as a table-valued function to assign a window for each row of * the input relation. The return value of the new relation includes all the original columns as @@ -86,4 +108,56 @@ public class StreamExecWindowTableFunction extends CommonExecWindowTableFunction outputType, description); } + + protected Transformation<RowData> translateWithUnalignedWindow( + PlannerBase planner, + ExecNodeConfig config, + RowType inputRowType, + Transformation<RowData> inputTransform) { + final WindowTableFunctionOperatorBase windowTableFunctionOperator = + createUnalignedWindowTableFunctionOperator(config, inputRowType); + final OneInputTransformation<RowData, RowData> transform = + ExecNodeUtil.createOneInputTransformation( + inputTransform, + createTransformationMeta(WINDOW_TRANSFORMATION, config), + windowTableFunctionOperator, + InternalTypeInfo.of(getOutputType()), + inputTransform.getParallelism(), + false); + + final int[] partitionKeys = extractPartitionKeys(windowingStrategy.getWindow()); + // set KeyType and Selector for state + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector( + planner.getFlinkContext().getClassLoader(), + partitionKeys, + InternalTypeInfo.of(inputRowType)); + transform.setStateKeySelector(selector); + transform.setStateKeyType(selector.getProducedType()); + return transform; + } + + private int[] extractPartitionKeys(WindowSpec window) { + checkState( + window instanceof SessionWindowSpec, + "Only support unaligned window with session window now."); + + return ((SessionWindowSpec) window).getPartitionKeyIndices(); + } + + private WindowTableFunctionOperatorBase createUnalignedWindowTableFunctionOperator( + ExecNodeConfig config, RowType inputRowType) { + GroupWindowAssigner<TimeWindow> windowAssigner = createWindowAssigner(windowingStrategy); + final ZoneId shiftTimeZone = + TimeWindowUtil.getShiftTimeZone( + windowingStrategy.getTimeAttributeType(), + TableConfigUtils.getLocalTimeZone(config)); + + return new UnalignedWindowTableFunctionOperator( + windowAssigner, + windowAssigner.getWindowSerializer(new ExecutionConfig()), + new RowDataSerializer(inputRowType), + windowingStrategy.getTimeAttributeIndex(), + shiftTimeZone); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java index 4f9b66b233c..60e73c28bb0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/WindowTableFunctionUtil.java @@ -22,12 +22,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; import org.apache.flink.table.planner.plan.logical.CumulativeWindowSpec; import org.apache.flink.table.planner.plan.logical.HoppingWindowSpec; +import org.apache.flink.table.planner.plan.logical.SessionWindowSpec; import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy; import org.apache.flink.table.planner.plan.logical.TumblingWindowSpec; import org.apache.flink.table.planner.plan.logical.WindowSpec; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.CumulativeWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner; @@ -80,6 +82,14 @@ public final class WindowTableFunctionUtil { windowAssigner = windowAssigner.withOffset(cumulativeWindowSpec.getOffset()); } return windowAssigner; + } else if (windowSpec instanceof SessionWindowSpec) { + SessionWindowSpec sessionWindowSpec = (SessionWindowSpec) windowSpec; + SessionWindowAssigner windowAssigner = + SessionWindowAssigner.withGap(sessionWindowSpec.getGap()); + if (isProctime) { + windowAssigner = windowAssigner.withProcessingTime(); + } + return windowAssigner; } else { throw new TableException( String.format( diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index b5d7fa725cf..4e5c14dd9b1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -1186,6 +1186,136 @@ Calc(select=[window_time, start_time, end_time]) +- Calc(select=[CAST(rowtime AS TIMESTAMP(3)) AS rowtime, rowtime AS rowtime_0]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testGroupKeyLessThanPartitionKeyInSessionWindow[aggPhaseEnforcer=ONE_PHASE]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[$5], wAvg=[$6], uv=[$7]) ++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT()], EXPR$3=[SUM($3)], EXPR$4=[MAX($3) FILTER $4], wAvg=[weightedAvg($0, $5)], uv=[COUNT(DISTINCT $6)]) + +- LogicalProject(b=[$1], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv]) ++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER $f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) + +- Exchange(distribution=[hash[b, window_start, window_end]]) + +- Calc(select=[b, window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f4, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min], partition keys=[b, a])]) + +- Exchange(distribution=[hash[b, a]]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) + +== Optimized Execution Plan == +Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv]) ++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER $f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) + +- Exchange(distribution=[hash[b, window_start, window_end]]) + +- Calc(select=[b, window_start, window_end, d, (b > 1000) IS TRUE AS $f4, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min], partition keys=[b, a])]) + +- Exchange(distribution=[hash[b, a]]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testGroupKeyLessThanPartitionKeyInSessionWindow[aggPhaseEnforcer=TWO_PHASE]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalProject(window_start=[$1], window_end=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[$5], wAvg=[$6], uv=[$7]) ++- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT()], EXPR$3=[SUM($3)], EXPR$4=[MAX($3) FILTER $4], wAvg=[weightedAvg($0, $5)], uv=[COUNT(DISTINCT $6)]) + +- LogicalProject(b=[$1], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv]) ++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER $f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) + +- Exchange(distribution=[hash[b, window_start, window_end]]) + +- Calc(select=[b, window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f4, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min], partition keys=[b, a])]) + +- Exchange(distribution=[hash[b, a]]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) + +== Optimized Execution Plan == +Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv]) ++- GroupAggregate(groupBy=[b, window_start, window_end], select=[b, window_start, window_end, COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER $f4 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) + +- Exchange(distribution=[hash[b, window_start, window_end]]) + +- Calc(select=[b, window_start, window_end, d, (b > 1000) IS TRUE AS $f4, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min], partition keys=[b, a])]) + +- Exchange(distribution=[hash[b, a]]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testGroupKeyMoreThanPartitionKeyInSessionWindow[aggPhaseEnforcer=ONE_PHASE]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) ++- Exchange(distribution=[hash[a, window_start, window_end]]) + +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])]) + +- Exchange(distribution=[single]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) + +== Optimized Execution Plan == +GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) ++- Exchange(distribution=[hash[a, window_start, window_end]]) + +- Calc(select=[a, window_start, window_end, d, (b > 1000) IS TRUE AS $f4, b, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])]) + +- Exchange(distribution=[single]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testGroupKeyMoreThanPartitionKeyInSessionWindow[aggPhaseEnforcer=TWO_PHASE]"> + <Resource name="explain"> + <![CDATA[== Abstract Syntax Tree == +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +== Optimized Physical Plan == +GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) ++- Exchange(distribution=[hash[a, window_start, window_end]]) + +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])]) + +- Exchange(distribution=[single]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) + +== Optimized Execution Plan == +GroupAggregate(groupBy=[a, window_start, window_end], select=[a, window_start, window_end, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv]) ++- Exchange(distribution=[hash[a, window_start, window_end]]) + +- Calc(select=[a, window_start, window_end, d, (b > 1000) IS TRUE AS $f4, b, e, c]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])]) + +- Exchange(distribution=[single]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala index b8c67d9f63d..6fbc1d4314e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/WindowTableFunctionTest.scala @@ -206,4 +206,31 @@ class WindowTableFunctionTest extends TableTestBase { |""".stripMargin util.verifyExecPlan(sql) } + + @Test + def testSessionTVF(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | SESSION(TABLE MyTable1, DESCRIPTOR(ts), INTERVAL '10' MINUTE)) + |""".stripMargin + assertThatThrownBy(() => util.verifyExplain(sql)) + .hasMessageContaining("Unaligned windows like session are not supported in batch mode yet.") + .isInstanceOf[TableException] + } + + @Test + def testSessionTVFProctime(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | SESSION(TABLE MyTable2, DESCRIPTOR(c), INTERVAL '10' MINUTE)) + |""".stripMargin + + assertThatThrownBy(() => util.verifyExplain(sql)) + .hasMessageContaining("Processing time Window TableFunction is not supported yet.") + .isInstanceOf[TableException] + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index a7b44993a32..d27950c480f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -1541,15 +1541,11 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl |GROUP BY a, window_start, window_end """.stripMargin - assertThatThrownBy(() => util.verifyExplain(sql)) - .hasMessageContaining("Session Window TableFunction is not supported yet.") - .isInstanceOf[TableException] + util.verifyExplain(sql) } @TestTemplate def testGroupKeyLessThanPartitionKeyInSessionWindow(): Unit = { - // TODO Support session window table function in ExecWindowTableFunction. See - // more at FLINK-34100 val sql = { """ |SELECT @@ -1566,9 +1562,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl """.stripMargin } - assertThatThrownBy(() => util.verifyExplain(sql)) - .hasMessageContaining("Session Window TableFunction is not supported yet.") - .isInstanceOf[TableException] + util.verifyExplain(sql) } @TestTemplate diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index f9c30366dea..118b9c5a22a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -1217,6 +1217,43 @@ class WindowAggregateITCase( .isEqualTo(expected.sorted.mkString("\n")) } + @TestTemplate + def testEventTimeSessionWindowWithTVFNotPullUpIntoWindowAgg(): Unit = { + val sql = + """ + |SELECT + | `name`, + | window_start, + | window_end, + | COUNT(*), + | SUM(`bigdec`), + | MAX(`double`), + | MIN(`float`), + | COUNT(DISTINCT `string`), + | concat_distinct_agg(`string`) + |FROM ( + | SELECT * FROM TABLE( + | SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + | WHERE window_start > TIMESTAMP '2000-01-01 10:10:00.000' + |) + |GROUP BY `name`, window_start, window_end + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = Seq( + "a,2020-10-10T00:00:01,2020-10-10T00:00:13,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2", + "b,2020-10-10T00:00:06,2020-10-10T00:00:12,2,6.66,6.0,3.0,2,Hello|Hi", + "b,2020-10-10T00:00:16,2020-10-10T00:00:21,1,4.44,4.0,4.0,1,Hi", + "b,2020-10-10T00:00:34,2020-10-10T00:00:39,1,3.33,3.0,3.0,1,Comment#3", + "null,2020-10-10T00:00:32,2020-10-10T00:00:37,1,7.77,7.0,7.0,0,null" + ) + assertThat(sink.getAppendResults.sorted.mkString("\n")) + .isEqualTo(expected.sorted.mkString("\n")) + } + @TestTemplate def testEventTimeSessionWindowWithCDCSource(): Unit = { val sql = diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala index 90f0d610d77..6a55aaa1174 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala @@ -18,14 +18,12 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.restartstrategy.RestartStrategies -import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.factories.TestValuesTableFactory import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink} import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.StateBackendMode import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension -import org.apache.flink.types.Row import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.{BeforeEach, TestTemplate} @@ -383,4 +381,106 @@ class WindowTableFunctionITCase(mode: StateBackendMode) extends StreamingWithSta assertThat(sink.getAppendResults.sorted.mkString("\n")) .isEqualTo(expected.sorted.mkString("\n")) } + + @TestTemplate + def testSessionWindow(): Unit = { + val sql = + """ + |SELECT + | TO_TIMESTAMP(`ts`), + | `int`, + | `double`, + | `float`, + | `bigdec`, + | `string`, + | `name`, + | CAST(`rowtime` AS STRING), + | window_start, + | window_end, + | window_time + |FROM TABLE(SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = Seq( + "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000," + + "2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", + "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000," + + "2020-10-10T00:00:32,2020-10-10T00:00:39,2020-10-10T00:00:38.999", + "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000," + + "2020-10-10T00:00:32,2020-10-10T00:00:39,2020-10-10T00:00:38.999" + ) + assertThat(sink.getAppendResults.sorted.mkString("\n")) + .isEqualTo(expected.sorted.mkString("\n")) + } + + @TestTemplate + def testSessionWindowWithPartitionBy(): Unit = { + val sql = + """ + |SELECT + | TO_TIMESTAMP(`ts`), + | `int`, + | `double`, + | `float`, + | `bigdec`, + | `string`, + | `name`, + | CAST(`rowtime` AS STRING), + | window_start, + | window_end, + | window_time + |FROM TABLE(SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = Seq( + "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000," + + "2020-10-10T00:00:06,2020-10-10T00:00:12,2020-10-10T00:00:11.999", + "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000," + + "2020-10-10T00:00:06,2020-10-10T00:00:12,2020-10-10T00:00:11.999", + "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000," + + "2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", + "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000," + + "2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", + "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000," + + "2020-10-10T00:00:32,2020-10-10T00:00:37,2020-10-10T00:00:36.999", + "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000," + + "2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999" + ) + assertThat(sink.getAppendResults.sorted.mkString("\n")) + .isEqualTo(expected.sorted.mkString("\n")) + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java index 7228a5f2fe1..f6939cffb46 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java @@ -38,7 +38,6 @@ import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceUnshared import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowOperator; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowProcessor; import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigner; -import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssigners; import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowOperator; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; @@ -206,20 +205,14 @@ public class WindowAggOperatorBuilder { @SuppressWarnings("unchecked") private WindowOperatorBase<RowData, ?> buildUnslicingWindowOperator() { - - if (assigner instanceof UnsliceAssigners.SessionUnsliceAssigner) { - final UnsliceWindowAggProcessor windowProcessor = - new UnsliceWindowAggProcessor( - (GeneratedNamespaceAggsHandleFunction<TimeWindow>) - generatedAggregateFunction, - (UnsliceAssigner<TimeWindow>) assigner, - accSerializer, - indexOfCountStart, - shiftTimeZone); - return new UnslicingWindowOperator<>(windowProcessor); - } - - throw new UnsupportedOperationException( - "Unsupported unslice assigner: " + assigner.getClass().getCanonicalName()); + final UnsliceWindowAggProcessor windowProcessor = + new UnsliceWindowAggProcessor( + (GeneratedNamespaceAggsHandleFunction<TimeWindow>) + generatedAggregateFunction, + (UnsliceAssigner<TimeWindow>) assigner, + accSerializer, + indexOfCountStart, + shiftTimeZone); + return new UnslicingWindowOperator<>(windowProcessor); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java index ba34c4e3b67..a7030220ea2 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/unslicing/UnsliceAssigners.java @@ -19,17 +19,24 @@ package org.apache.flink.table.runtime.operators.window.tvf.unslicing; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.runtime.operators.window.MergeCallback; import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.InternalTimeWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner; import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; import org.apache.flink.table.runtime.operators.window.tvf.common.ClockService; +import java.io.IOException; import java.time.Duration; import java.time.ZoneId; import java.util.Collection; +import java.util.Collections; +import java.util.NavigableSet; import java.util.Optional; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; @@ -146,4 +153,126 @@ public class UnsliceAssigners { return String.format("SessionWindow(gap=%dms)", sessionGap); } } + + /** + * Creates a {@link UnsliceAssigner} that assigns elements which has been attached window start + * and window end timestamp to windows. The assigned windows doesn't need to be merged again. + * + * @param windowStartIndex the index of window start field in the input row, mustn't be a + * negative value. + * @param windowEndIndex the index of window end field in the input row, mustn't be a negative + * value. + */ + public static WindowedUnsliceAssigner windowed( + int windowStartIndex, int windowEndIndex, UnsliceAssigner<TimeWindow> innerAssigner) { + return new WindowedUnsliceAssigner(windowStartIndex, windowEndIndex, innerAssigner); + } + + /** + * The {@link UnsliceAssigner} for elements have been merged into unslicing windows and attached + * window start and end timestamps. + */ + public static class WindowedUnsliceAssigner extends MergingWindowAssigner<TimeWindow> + implements UnsliceAssigner<TimeWindow>, InternalTimeWindowAssigner { + + private static final long serialVersionUID = 1L; + + private final int windowStartIndex; + + private final int windowEndIndex; + + private final UnsliceAssigner<TimeWindow> innerAssigner; + + public WindowedUnsliceAssigner( + int windowStartIndex, + int windowEndIndex, + UnsliceAssigner<TimeWindow> innerAssigner) { + this.windowStartIndex = windowStartIndex; + this.windowEndIndex = windowEndIndex; + this.innerAssigner = innerAssigner; + } + + @Override + public Optional<TimeWindow> assignActualWindow( + RowData element, + ClockService clock, + MergingWindowProcessFunction<?, TimeWindow> windowFunction) + throws Exception { + return innerAssigner.assignActualWindow(element, clock, windowFunction); + } + + @Override + public Optional<TimeWindow> assignStateNamespace( + RowData element, + ClockService clock, + MergingWindowProcessFunction<?, TimeWindow> windowFunction) + throws Exception { + return innerAssigner.assignStateNamespace(element, clock, windowFunction); + } + + @Override + public MergingWindowAssigner<TimeWindow> getMergingWindowAssigner() { + return this; + } + + @Override + public boolean isEventTime() { + // it always works in event-time mode if input row has been attached windows + return true; + } + + @Override + public Collection<TimeWindow> assignWindows(RowData element, long timestamp) + throws IOException { + return Collections.singletonList(createWindow(element)); + } + + private TimeWindow createWindow(RowData element) { + if (element.isNullAt(windowStartIndex) || element.isNullAt(windowEndIndex)) { + throw new RuntimeException("RowTime field should not be null."); + } + // Precision for row timestamp is always 3 + final long windowStartTime = element.getTimestamp(windowStartIndex, 3).getMillisecond(); + final long windowEndTime = element.getTimestamp(windowEndIndex, 3).getMillisecond(); + return new TimeWindow(windowStartTime, windowEndTime); + } + + @Override + public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { + return new TimeWindow.Serializer(); + } + + @Override + public String toString() { + return getDescription(); + } + + @Override + public String getDescription() { + return String.format( + "WindowedUnsliceWindow(innerAssigner=%s, StartIndex=%d, windowEndIndex=%d)", + innerAssigner.getDescription(), windowStartIndex, windowEndIndex); + } + + @Override + public InternalTimeWindowAssigner withEventTime() { + throw new IllegalStateException( + "Should not call this function on WindowedUnsliceAssigner."); + } + + @Override + public InternalTimeWindowAssigner withProcessingTime() { + throw new IllegalStateException( + "Should not call this function on WindowedUnsliceAssigner."); + } + + @Override + public void mergeWindows( + TimeWindow newWindow, + NavigableSet<TimeWindow> sortedWindows, + MergeCallback<TimeWindow, Collection<TimeWindow>> callback) { + // no need to merge windows because the window tvf operator in upstream has done for + // them + } + } }
