This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit de4e9e7382e79b6b1ac0f17ae87d5c84fbe7820c Author: xuyang <[email protected]> AuthorDate: Fri Jan 19 17:16:27 2024 +0800 [FLINK-34100][table] Extract a WindowTableFunctionOperatorBase from WindowTableFunctionOperator to prepare for introducing unaligned window table function This closes #24162 --- .../exec/common/CommonExecWindowTableFunction.java | 6 +- .../AlignedWindowTableFunctionOperator.java | 70 ++++++++++++++++++++++ ...r.java => WindowTableFunctionOperatorBase.java} | 56 +++++++---------- ...=> AlignedWindowTableFunctionOperatorTest.java} | 11 ++-- 4 files changed, 100 insertions(+), 43 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java index 2d9924e9f97..404f24c3cb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java @@ -37,7 +37,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.operators.window.TimeWindow; import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; -import org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperator; +import org.apache.flink.table.runtime.operators.window.tvf.operator.AlignedWindowTableFunctionOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.util.TimeWindowUtil; import org.apache.flink.table.types.logical.RowType; @@ -92,8 +92,8 @@ public abstract class CommonExecWindowTableFunction extends ExecNodeBase<RowData TimeWindowUtil.getShiftTimeZone( windowingStrategy.getTimeAttributeType(), TableConfigUtils.getLocalTimeZone(config)); - WindowTableFunctionOperator windowTableFunctionOperator = - new WindowTableFunctionOperator( + AlignedWindowTableFunctionOperator windowTableFunctionOperator = + new AlignedWindowTableFunctionOperator( windowAssigner, windowingStrategy.getTimeAttributeIndex(), shiftTimeZone); return ExecNodeUtil.createOneInputTransformation( inputTransform, diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java new file mode 100644 index 00000000000..deb2c59200c --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; + +import java.time.ZoneId; +import java.util.Collection; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; + +/** + * The operator for aligned window table function. + * + * <p>See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * <p>Note: The operator only applies for Window TVF with row semantics (e.g TUMBLE/HOP/CUMULATE) + * instead of set semantics (e.g SESSION). + * + * <p>The operator emits result per record instead of at the end of window. + */ +public class AlignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase { + + private static final long serialVersionUID = 1L; + + public AlignedWindowTableFunctionOperator( + GroupWindowAssigner<TimeWindow> windowAssigner, + int rowtimeIndex, + ZoneId shiftTimeZone) { + super(windowAssigner, rowtimeIndex, shiftTimeZone); + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + RowData inputRow = element.getValue(); + long timestamp; + if (windowAssigner.isEventTime()) { + if (inputRow.isNullAt(rowtimeIndex)) { + // null timestamp would be dropped + return; + } + timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); + } else { + timestamp = getProcessingTimeService().getCurrentProcessingTime(); + } + timestamp = toUtcTimestampMills(timestamp, shiftTimeZone); + Collection<TimeWindow> elementWindows = windowAssigner.assignWindows(inputRow, timestamp); + collect(inputRow, elementWindows); + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java similarity index 67% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperator.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java index a812c3a0cb7..d2494267516 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java @@ -21,7 +21,6 @@ package org.apache.flink.table.runtime.operators.window.tvf.operator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; @@ -34,33 +33,26 @@ import java.time.ZoneId; import java.util.Collection; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills; -import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; import static org.apache.flink.util.Preconditions.checkArgument; /** - * The operator acts as a table-valued function to assign windows for input row. Output row includes - * the original columns as well additional 3 columns named {@code window_start}, {@code window_end}, - * {@code window_time} to indicate the assigned window. - * - * <p>Note: The operator only applies for Window TVF with row semantics (e.g TUMBLE/HOP/CUMULATE) - * instead of set semantics (e.g Session). - * - * <p>The operator emits result per record instead of at the end of window. + * The {@link WindowTableFunctionOperatorBase} acts as a table-valued function to assign windows for + * input row. Output row includes the original columns as well additional 3 columns named {@code + * window_start}, {@code window_end}, {@code window_time} to indicate the assigned window. */ -public class WindowTableFunctionOperator extends TableStreamOperator<RowData> +public abstract class WindowTableFunctionOperatorBase extends TableStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData> { - private static final long serialVersionUID = 1L; - - private final GroupWindowAssigner<TimeWindow> windowAssigner; - private final int rowtimeIndex; - /** * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC * which means never shift when assigning windows. */ - private final ZoneId shiftTimeZone; + protected final ZoneId shiftTimeZone; + + protected final int rowtimeIndex; + + protected final GroupWindowAssigner<TimeWindow> windowAssigner; /** This is used for emitting elements with a given timestamp. */ private transient TimestampedCollector<RowData> collector; @@ -68,14 +60,14 @@ public class WindowTableFunctionOperator extends TableStreamOperator<RowData> private transient JoinedRowData outRow; private transient GenericRowData windowProperties; - public WindowTableFunctionOperator( + public WindowTableFunctionOperatorBase( GroupWindowAssigner<TimeWindow> windowAssigner, int rowtimeIndex, ZoneId shiftTimeZone) { - checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0); - this.windowAssigner = windowAssigner; - this.rowtimeIndex = rowtimeIndex; this.shiftTimeZone = shiftTimeZone; + this.rowtimeIndex = rowtimeIndex; + this.windowAssigner = windowAssigner; + checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0); setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -91,21 +83,15 @@ public class WindowTableFunctionOperator extends TableStreamOperator<RowData> } @Override - public void processElement(StreamRecord<RowData> element) throws Exception { - RowData inputRow = element.getValue(); - long timestamp; - if (windowAssigner.isEventTime()) { - if (inputRow.isNullAt(rowtimeIndex)) { - // null timestamp would be dropped - return; - } - timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); - } else { - timestamp = getProcessingTimeService().getCurrentProcessingTime(); + public void close() throws Exception { + super.close(); + if (collector != null) { + collector.close(); } - timestamp = toUtcTimestampMills(timestamp, shiftTimeZone); - Collection<TimeWindow> elementWindows = windowAssigner.assignWindows(inputRow, timestamp); - for (TimeWindow window : elementWindows) { + } + + protected void collect(RowData inputRow, Collection<TimeWindow> allWindows) { + for (TimeWindow window : allWindows) { windowProperties.setField(0, TimestampData.fromEpochMillis(window.getStart())); windowProperties.setField(1, TimestampData.fromEpochMillis(window.getEnd())); windowProperties.setField( diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java similarity index 97% rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTest.java rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java index a1464e58156..bcf511c9408 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java @@ -51,15 +51,15 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.apache.flink.table.runtime.util.StreamRecordUtils.row; import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; -/** Tests for {@link WindowTableFunctionOperator}. */ +/** Tests for {@link AlignedWindowTableFunctionOperator}. */ @RunWith(Parameterized.class) -public class WindowTableFunctionOperatorTest { +public class AlignedWindowTableFunctionOperatorTest { private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); private final ZoneId shiftTimeZone; - public WindowTableFunctionOperatorTest(ZoneId shiftTimeZone) { + public AlignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) { this.shiftTimeZone = shiftTimeZone; } @@ -306,8 +306,9 @@ public class WindowTableFunctionOperatorTest { private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness( GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId shiftTimeZone) throws Exception { - WindowTableFunctionOperator operator = - new WindowTableFunctionOperator(windowAssigner, ROW_TIME_INDEX, shiftTimeZone); + AlignedWindowTableFunctionOperator operator = + new AlignedWindowTableFunctionOperator( + windowAssigner, ROW_TIME_INDEX, shiftTimeZone); return new OneInputStreamOperatorTestHarness<>(operator, INPUT_ROW_SER); }
