This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de4e9e7382e79b6b1ac0f17ae87d5c84fbe7820c
Author: xuyang <[email protected]>
AuthorDate: Fri Jan 19 17:16:27 2024 +0800

    [FLINK-34100][table] Extract a WindowTableFunctionOperatorBase from 
WindowTableFunctionOperator to prepare for introducing unaligned window table 
function
    
    This closes #24162
---
 .../exec/common/CommonExecWindowTableFunction.java |  6 +-
 .../AlignedWindowTableFunctionOperator.java        | 70 ++++++++++++++++++++++
 ...r.java => WindowTableFunctionOperatorBase.java} | 56 +++++++----------
 ...=> AlignedWindowTableFunctionOperatorTest.java} | 11 ++--
 4 files changed, 100 insertions(+), 43 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
index 2d9924e9f97..404f24c3cb9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
-import 
org.apache.flink.table.runtime.operators.window.tvf.operator.WindowTableFunctionOperator;
+import 
org.apache.flink.table.runtime.operators.window.tvf.operator.AlignedWindowTableFunctionOperator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.RowType;
@@ -92,8 +92,8 @@ public abstract class CommonExecWindowTableFunction extends 
ExecNodeBase<RowData
                 TimeWindowUtil.getShiftTimeZone(
                         windowingStrategy.getTimeAttributeType(),
                         TableConfigUtils.getLocalTimeZone(config));
-        WindowTableFunctionOperator windowTableFunctionOperator =
-                new WindowTableFunctionOperator(
+        AlignedWindowTableFunctionOperator windowTableFunctionOperator =
+                new AlignedWindowTableFunctionOperator(
                         windowAssigner, 
windowingStrategy.getTimeAttributeIndex(), shiftTimeZone);
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
new file mode 100644
index 00000000000..deb2c59200c
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+
+import java.time.ZoneId;
+import java.util.Collection;
+
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+
+/**
+ * The operator for aligned window table function.
+ *
+ * <p>See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * <p>Note: The operator only applies for Window TVF with row semantics (e.g 
TUMBLE/HOP/CUMULATE)
+ * instead of set semantics (e.g SESSION).
+ *
+ * <p>The operator emits result per record instead of at the end of window.
+ */
+public class AlignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase {
+
+    private static final long serialVersionUID = 1L;
+
+    public AlignedWindowTableFunctionOperator(
+            GroupWindowAssigner<TimeWindow> windowAssigner,
+            int rowtimeIndex,
+            ZoneId shiftTimeZone) {
+        super(windowAssigner, rowtimeIndex, shiftTimeZone);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData inputRow = element.getValue();
+        long timestamp;
+        if (windowAssigner.isEventTime()) {
+            if (inputRow.isNullAt(rowtimeIndex)) {
+                // null timestamp would be dropped
+                return;
+            }
+            timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
+        } else {
+            timestamp = getProcessingTimeService().getCurrentProcessingTime();
+        }
+        timestamp = toUtcTimestampMills(timestamp, shiftTimeZone);
+        Collection<TimeWindow> elementWindows = 
windowAssigner.assignWindows(inputRow, timestamp);
+        collect(inputRow, elementWindows);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
similarity index 67%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperator.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
index a812c3a0cb7..d2494267516 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
@@ -21,7 +21,6 @@ package 
org.apache.flink.table.runtime.operators.window.tvf.operator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.TimestampData;
@@ -34,33 +33,26 @@ import java.time.ZoneId;
 import java.util.Collection;
 
 import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills;
-import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * The operator acts as a table-valued function to assign windows for input 
row. Output row includes
- * the original columns as well additional 3 columns named {@code 
window_start}, {@code window_end},
- * {@code window_time} to indicate the assigned window.
- *
- * <p>Note: The operator only applies for Window TVF with row semantics (e.g 
TUMBLE/HOP/CUMULATE)
- * instead of set semantics (e.g Session).
- *
- * <p>The operator emits result per record instead of at the end of window.
+ * The {@link WindowTableFunctionOperatorBase} acts as a table-valued function 
to assign windows for
+ * input row. Output row includes the original columns as well additional 3 
columns named {@code
+ * window_start}, {@code window_end}, {@code window_time} to indicate the 
assigned window.
  */
-public class WindowTableFunctionOperator extends TableStreamOperator<RowData>
+public abstract class WindowTableFunctionOperatorBase extends 
TableStreamOperator<RowData>
         implements OneInputStreamOperator<RowData, RowData> {
 
-    private static final long serialVersionUID = 1L;
-
-    private final GroupWindowAssigner<TimeWindow> windowAssigner;
-    private final int rowtimeIndex;
-
     /**
      * The shift timezone of the window, if the proctime or rowtime type is 
TIMESTAMP_LTZ, the shift
      * timezone is the timezone user configured in TableConfig, other cases 
the timezone is UTC
      * which means never shift when assigning windows.
      */
-    private final ZoneId shiftTimeZone;
+    protected final ZoneId shiftTimeZone;
+
+    protected final int rowtimeIndex;
+
+    protected final GroupWindowAssigner<TimeWindow> windowAssigner;
 
     /** This is used for emitting elements with a given timestamp. */
     private transient TimestampedCollector<RowData> collector;
@@ -68,14 +60,14 @@ public class WindowTableFunctionOperator extends 
TableStreamOperator<RowData>
     private transient JoinedRowData outRow;
     private transient GenericRowData windowProperties;
 
-    public WindowTableFunctionOperator(
+    public WindowTableFunctionOperatorBase(
             GroupWindowAssigner<TimeWindow> windowAssigner,
             int rowtimeIndex,
             ZoneId shiftTimeZone) {
-        checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
-        this.windowAssigner = windowAssigner;
-        this.rowtimeIndex = rowtimeIndex;
         this.shiftTimeZone = shiftTimeZone;
+        this.rowtimeIndex = rowtimeIndex;
+        this.windowAssigner = windowAssigner;
+        checkArgument(!windowAssigner.isEventTime() || rowtimeIndex >= 0);
 
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
@@ -91,21 +83,15 @@ public class WindowTableFunctionOperator extends 
TableStreamOperator<RowData>
     }
 
     @Override
-    public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        RowData inputRow = element.getValue();
-        long timestamp;
-        if (windowAssigner.isEventTime()) {
-            if (inputRow.isNullAt(rowtimeIndex)) {
-                // null timestamp would be dropped
-                return;
-            }
-            timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
-        } else {
-            timestamp = getProcessingTimeService().getCurrentProcessingTime();
+    public void close() throws Exception {
+        super.close();
+        if (collector != null) {
+            collector.close();
         }
-        timestamp = toUtcTimestampMills(timestamp, shiftTimeZone);
-        Collection<TimeWindow> elementWindows = 
windowAssigner.assignWindows(inputRow, timestamp);
-        for (TimeWindow window : elementWindows) {
+    }
+
+    protected void collect(RowData inputRow, Collection<TimeWindow> 
allWindows) {
+        for (TimeWindow window : allWindows) {
             windowProperties.setField(0, 
TimestampData.fromEpochMillis(window.getStart()));
             windowProperties.setField(1, 
TimestampData.fromEpochMillis(window.getEnd()));
             windowProperties.setField(
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
similarity index 97%
rename from 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTest.java
rename to 
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
index a1464e58156..bcf511c9408 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
@@ -51,15 +51,15 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
 import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
 
-/** Tests for {@link WindowTableFunctionOperator}. */
+/** Tests for {@link AlignedWindowTableFunctionOperator}. */
 @RunWith(Parameterized.class)
-public class WindowTableFunctionOperatorTest {
+public class AlignedWindowTableFunctionOperatorTest {
 
     private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
     private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
     private final ZoneId shiftTimeZone;
 
-    public WindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
+    public AlignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
         this.shiftTimeZone = shiftTimeZone;
     }
 
@@ -306,8 +306,9 @@ public class WindowTableFunctionOperatorTest {
 
     private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
             GroupWindowAssigner<TimeWindow> windowAssigner, ZoneId 
shiftTimeZone) throws Exception {
-        WindowTableFunctionOperator operator =
-                new WindowTableFunctionOperator(windowAssigner, 
ROW_TIME_INDEX, shiftTimeZone);
+        AlignedWindowTableFunctionOperator operator =
+                new AlignedWindowTableFunctionOperator(
+                        windowAssigner, ROW_TIME_INDEX, shiftTimeZone);
         return new OneInputStreamOperatorTestHarness<>(operator, 
INPUT_ROW_SER);
     }
 

Reply via email to