This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a22ff7e961cb5d8b52401e567e59954bff35d50
Author: xuyang <[email protected]>
AuthorDate: Fri Jan 19 17:48:46 2024 +0800

    [FLINK-34100][table] Introduce UnalignedWindowTableFunctionOperator for 
unaligned window
    
    This closes #24162
---
 .../processors/UnsliceWindowAggProcessor.java      |  10 +-
 .../internal/MergingWindowProcessFunction.java     |  84 ++-
 .../groupwindow/operator/WindowOperator.java       |  16 +-
 .../AlignedWindowTableFunctionOperator.java        |   1 +
 .../UnalignedWindowTableFunctionOperator.java      | 606 +++++++++++++++++++++
 .../operator/WindowTableFunctionOperatorBase.java  |  20 +
 .../AlignedWindowTableFunctionOperatorTest.java    |  79 +--
 .../UnalignedWindowTableFunctionOperatorTest.java  | 336 ++++++++++++
 .../WindowTableFunctionOperatorTestBase.java       | 102 ++++
 9 files changed, 1175 insertions(+), 79 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java
index 33be17388db..00eda35222e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/UnsliceWindowAggProcessor.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.RowDataUtil;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
-import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
@@ -41,6 +40,7 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnsliceAssi
 import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowProcessor;
 import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowTimerServiceImpl;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.time.ZoneId;
 import java.util.Collection;
@@ -198,7 +198,7 @@ public class UnsliceWindowAggProcessor extends 
AbstractWindowAggProcessor<TimeWi
     }
 
     private class WindowContextImpl
-            implements InternalWindowProcessFunction.Context<RowData, 
TimeWindow> {
+            implements MergingWindowProcessFunction.MergingContext<RowData, 
TimeWindow> {
 
         @Override
         public long currentProcessingTime() {
@@ -277,6 +277,12 @@ public class UnsliceWindowAggProcessor extends 
AbstractWindowAggProcessor<TimeWi
         public RowData currentKey() {
             return ctx.getKeyedStateBackend().getCurrentKey();
         }
+
+        @Override
+        public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, 
Throwable>
+                getWindowStateMergingConsumer() {
+            return new 
MergingWindowProcessFunction.DefaultAccMergingConsumer<>(this, aggregator);
+        }
     }
 
     private class TriggerContextImpl implements Trigger.OnMergeContext {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java
index 3ac9dd8f725..b89041d38e5 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/internal/MergingWindowProcessFunction.java
@@ -25,12 +25,14 @@ import org.apache.flink.table.data.RowData;
 import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
 import org.apache.flink.table.runtime.operators.window.Window;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
 import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * The implementation of {@link InternalWindowProcessFunction} for {@link 
MergingWindowAssigner}.
@@ -61,13 +63,15 @@ public class MergingWindowProcessFunction<K, W extends 
Window>
 
     @Override
     public void open(Context<K, W> ctx) throws Exception {
+        checkArgument(ctx instanceof MergingContext);
         super.open(ctx);
         MapStateDescriptor<W, W> mappingStateDescriptor =
                 new MapStateDescriptor<>(
                         "session-window-mapping", windowSerializer, 
windowSerializer);
         MapState<W, W> windowMapping = 
ctx.getPartitionedState(mappingStateDescriptor);
         this.mergingWindows = new MergingWindowSet<>(windowAssigner, 
windowMapping);
-        this.mergingFunction = new MergingFunctionImpl();
+        this.mergingFunction =
+                new MergingFunctionImpl(((MergingContext) 
ctx).getWindowStateMergingConsumer());
     }
 
     @Override
@@ -124,8 +128,20 @@ public class MergingWindowProcessFunction<K, W extends 
Window>
         }
     }
 
+    /** Get the state window as the namespace stored acc in the data about 
this actual window. */
+    public W getStateWindow(W window) throws Exception {
+        return mergingWindows.getStateWindow(window);
+    }
+
     private class MergingFunctionImpl implements 
MergingWindowSet.MergeFunction<W> {
 
+        private final BiConsumerWithException<W, Collection<W>, Throwable> 
accMergingConsumer;
+
+        public MergingFunctionImpl(
+                BiConsumerWithException<W, Collection<W>, Throwable> 
accMergingConsumer) {
+            this.accMergingConsumer = accMergingConsumer;
+        }
+
         @Override
         public void merge(
                 W mergeResult,
@@ -166,23 +182,61 @@ public class MergingWindowProcessFunction<K, W extends 
Window>
 
             // merge the merged state windows into the newly resulting state 
window
             if (!stateWindowsToBeMerged.isEmpty()) {
-                RowData targetAcc = 
ctx.getWindowAccumulators(stateWindowResult);
-                if (targetAcc == null) {
-                    targetAcc = windowAggregator.createAccumulators();
+                try {
+                    accMergingConsumer.accept(stateWindowResult, 
stateWindowsToBeMerged);
+                } catch (Throwable e) {
+                    throw new RuntimeException("Should not happen", e);
                 }
-                windowAggregator.setAccumulators(stateWindowResult, targetAcc);
-                for (W w : stateWindowsToBeMerged) {
-                    RowData acc = ctx.getWindowAccumulators(w);
-                    if (acc != null) {
-                        windowAggregator.merge(w, acc);
-                    }
-                    // clear merged window
-                    ctx.clearWindowState(w);
-                    ctx.clearPreviousState(w);
+            }
+        }
+    }
+
+    /**
+     * A default merging consumer that merges the accumulators in state 
windows that waiting to be
+     * merged into the target state window.
+     *
+     * <p>The first parameter is the target state window, the second parameter 
is the windows that
+     * need to be merged.
+     */
+    public static class DefaultAccMergingConsumer<W extends Window>
+            implements BiConsumerWithException<W, Collection<W>, Throwable> {
+
+        private final Context<?, W> ctx;
+
+        private final NamespaceAggsHandleFunctionBase<W> windowAggregator;
+
+        public DefaultAccMergingConsumer(
+                Context<?, W> ctx, NamespaceAggsHandleFunctionBase<W> 
windowAggregator) {
+            this.ctx = ctx;
+            this.windowAggregator = windowAggregator;
+        }
+
+        @Override
+        public void accept(W stateWindowResult, Collection<W> 
stateWindowsToBeMerged)
+                throws Throwable {
+            RowData targetAcc = ctx.getWindowAccumulators(stateWindowResult);
+            if (targetAcc == null) {
+                targetAcc = windowAggregator.createAccumulators();
+            }
+            windowAggregator.setAccumulators(stateWindowResult, targetAcc);
+            for (W w : stateWindowsToBeMerged) {
+                RowData acc = ctx.getWindowAccumulators(w);
+                if (acc != null) {
+                    windowAggregator.merge(w, acc);
                 }
-                targetAcc = windowAggregator.getAccumulators();
-                ctx.setWindowAccumulators(stateWindowResult, targetAcc);
+                // clear merged window
+                ctx.clearWindowState(w);
+                ctx.clearPreviousState(w);
             }
+            targetAcc = windowAggregator.getAccumulators();
+            ctx.setWindowAccumulators(stateWindowResult, targetAcc);
         }
     }
+
+    /** A {@link Context} used for {@link MergingWindowProcessFunction}. */
+    public interface MergingContext<K, W extends Window> extends Context<K, W> 
{
+
+        /** Returns the consumer used to merge window state. */
+        BiConsumerWithException<W, Collection<W>, Throwable> 
getWindowStateMergingConsumer();
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
index 68f6f7c6c3e..8e7c26c67d4 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java
@@ -57,6 +57,7 @@ import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trig
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import org.apache.commons.lang3.ArrayUtils;
 
@@ -274,12 +275,13 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
 
         compileGeneratedCode();
 
-        WindowContext windowContext = new WindowContext();
         windowAggregator.open(
                 new PerWindowStateDataViewStore(
                         getKeyedStateBackend(), windowSerializer, 
getRuntimeContext()));
 
+        WindowContext windowContext;
         if (windowAssigner instanceof MergingWindowAssigner) {
+            windowContext = new MergingWindowContext();
             this.windowFunction =
                     new MergingWindowProcessFunction<>(
                             (MergingWindowAssigner<W>) windowAssigner,
@@ -287,12 +289,14 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
                             windowSerializer,
                             allowedLateness);
         } else if (windowAssigner instanceof PanedWindowAssigner) {
+            windowContext = new WindowContext();
             this.windowFunction =
                     new PanedWindowProcessFunction<>(
                             (PanedWindowAssigner<W>) windowAssigner,
                             windowAggregator,
                             allowedLateness);
         } else {
+            windowContext = new WindowContext();
             this.windowFunction =
                     new GeneralWindowProcessFunction<>(
                             windowAssigner, windowAggregator, allowedLateness);
@@ -541,6 +545,16 @@ public abstract class WindowOperator<K, W extends Window> 
extends AbstractStream
         }
     }
 
+    private class MergingWindowContext extends WindowContext
+            implements MergingWindowProcessFunction.MergingContext<K, W> {
+        @Override
+        public BiConsumerWithException<W, Collection<W>, Throwable>
+                getWindowStateMergingConsumer() {
+            return new 
MergingWindowProcessFunction.DefaultAccMergingConsumer<>(
+                    this, windowAggregator);
+        }
+    }
+
     /**
      * {@code TriggerContext} is a utility for handling {@code Trigger} 
invocations. It can be
      * reused by setting the {@code key} and {@code window} fields. No 
internal state must be kept
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
index deb2c59200c..17c5a4b5125 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperator.java
@@ -57,6 +57,7 @@ public class AlignedWindowTableFunctionOperator extends 
WindowTableFunctionOpera
         if (windowAssigner.isEventTime()) {
             if (inputRow.isNullAt(rowtimeIndex)) {
                 // null timestamp would be dropped
+                numNullRowTimeRecordsDropped.inc();
                 return;
             }
             timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
new file mode 100644
index 00000000000..6f93df0af78
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * <p>See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * <p>Note: The operator only applies for Window TVF with set semantics (e.g 
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * <p>This operator emits result at the end of window instead of per record.
+ *
+ * <p>This operator will not compact changelog records.
+ *
+ * <p>This operator will keep the original order of input records when 
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase
+        implements Triggerable<RowData, TimeWindow> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String LATE_ELEMENTS_DROPPED_METRIC_NAME = 
"numLateRecordsDropped";
+    private static final String LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME = 
"lateRecordsDroppedRate";
+    private static final String WATERMARK_LATENCY_METRIC_NAME = 
"watermarkLatency";
+
+    private final Trigger<TimeWindow> trigger;
+
+    private final TypeSerializer<RowData> inputSerializer;
+
+    private final TypeSerializer<TimeWindow> windowSerializer;
+
+    private transient InternalTimerService<TimeWindow> internalTimerService;
+
+    // a counter to tag the order of all input streams when entering the 
operator
+    private transient ValueState<Long> counterState;
+
+    private transient InternalMapState<RowData, TimeWindow, Long, RowData> 
windowState;
+
+    private transient TriggerContextImpl triggerContext;
+
+    private transient MergingWindowProcessFunction<RowData, TimeWindow> 
windowFunction;
+
+    private transient NamespaceAggsHandleFunctionBase<TimeWindow> 
windowAggregator;
+
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    private transient Counter numLateRecordsDropped;
+    private transient Meter lateRecordsDroppedRate;
+    private transient Gauge<Long> watermarkLatency;
+
+    public UnalignedWindowTableFunctionOperator(
+            GroupWindowAssigner<TimeWindow> windowAssigner,
+            TypeSerializer<TimeWindow> windowSerializer,
+            TypeSerializer<RowData> inputSerializer,
+            int rowtimeIndex,
+            ZoneId shiftTimeZone) {
+        super(windowAssigner, rowtimeIndex, shiftTimeZone);
+        this.trigger = createTrigger(windowAssigner);
+        this.windowSerializer = checkNotNull(windowSerializer);
+        this.inputSerializer = checkNotNull(inputSerializer);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        internalTimerService =
+                getInternalTimerService("session-window-tvf-timers", 
windowSerializer, this);
+
+        triggerContext = new TriggerContextImpl();
+        triggerContext.open();
+
+        ValueStateDescriptor<Long> counterStateDescriptor =
+                new ValueStateDescriptor<>("session-window-tvf-counter", 
LongSerializer.INSTANCE);
+        counterState = getRuntimeContext().getState(counterStateDescriptor);
+
+        MapStateDescriptor<Long, RowData> windowStateDescriptor =
+                new MapStateDescriptor<>(
+                        "session-window-tvf-acc", LongSerializer.INSTANCE, 
inputSerializer);
+
+        windowState =
+                (InternalMapState<RowData, TimeWindow, Long, RowData>)
+                        getOrCreateKeyedState(windowSerializer, 
windowStateDescriptor);
+
+        windowAggregator = new DummyWindowAggregator();
+        windowAggregator.open(
+                new PerWindowStateDataViewStore(
+                        getKeyedStateBackend(), windowSerializer, 
getRuntimeContext()));
+
+        WindowContextImpl windowContext = new WindowContextImpl();
+
+        windowFunction =
+                new MergingWindowProcessFunction<>(
+                        (MergingWindowAssigner<TimeWindow>) windowAssigner,
+                        windowAggregator,
+                        windowSerializer,
+                        0);
+
+        windowFunction.open(windowContext);
+
+        this.numLateRecordsDropped = 
metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
+        this.lateRecordsDroppedRate =
+                metrics.meter(
+                        LATE_ELEMENTS_DROPPED_RATE_METRIC_NAME,
+                        new MeterView(numLateRecordsDropped));
+        this.watermarkLatency =
+                metrics.gauge(
+                        WATERMARK_LATENCY_METRIC_NAME,
+                        () -> {
+                            long watermark = 
internalTimerService.currentWatermark();
+                            if (watermark < 0) {
+                                return 0L;
+                            } else {
+                                return 
internalTimerService.currentProcessingTime() - watermark;
+                            }
+                        });
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (windowAggregator != null) {
+            windowAggregator.close();
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData inputRow = element.getValue();
+
+        long timestamp;
+        if (windowAssigner.isEventTime()) {
+            if (inputRow.isNullAt(rowtimeIndex)) {
+                // null timestamp would be dropped
+                numNullRowTimeRecordsDropped.inc();
+                return;
+            }
+            timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
+        } else {
+            timestamp = getProcessingTimeService().getCurrentProcessingTime();
+        }
+
+        // no matter if order exceeds the Long.MAX_VALUE
+        Long order = counterState.value();
+        if (null == order) {
+            order = 0L;
+        }
+        counterState.update(order + 1);
+
+        timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, 
shiftTimeZone);
+        // the windows which the input row should be placed into
+        Collection<TimeWindow> affectedWindows =
+                windowFunction.assignStateNamespace(inputRow, timestamp);
+        boolean isElementDropped = true;
+        for (TimeWindow window : affectedWindows) {
+            isElementDropped = false;
+            windowState.setCurrentNamespace(window);
+            windowState.put(order, inputRow);
+        }
+
+        // the actual window which the input row is belongs to
+        Collection<TimeWindow> actualWindows =
+                windowFunction.assignActualWindows(inputRow, timestamp);
+        Preconditions.checkArgument(
+                (affectedWindows.isEmpty() && actualWindows.isEmpty())
+                        || (!affectedWindows.isEmpty() && 
!actualWindows.isEmpty()));
+        for (TimeWindow window : actualWindows) {
+            triggerContext.setWindow(window);
+            boolean triggerResult = triggerContext.onElement(inputRow, 
timestamp);
+            if (triggerResult) {
+                emitWindowResult(window);
+            }
+            // clear up state
+            registerCleanupTimer(window);
+        }
+        if (isElementDropped) {
+            // markEvent will increase numLateRecordsDropped
+            lateRecordsDroppedRate.markEvent();
+        }
+    }
+
+    private void registerCleanupTimer(TimeWindow window) {
+        long cleanupTime = getCleanupTime(window);
+        if (cleanupTime == Long.MAX_VALUE) {
+            // no need to clean up because we didn't set one
+            return;
+        }
+        if (windowAssigner.isEventTime()) {
+            triggerContext.registerEventTimeTimer(cleanupTime);
+        } else {
+            triggerContext.registerProcessingTimeTimer(cleanupTime);
+        }
+    }
+
+    private void emitWindowResult(TimeWindow window) throws Exception {
+        TimeWindow stateWindow = windowFunction.getStateWindow(window);
+        windowState.setCurrentNamespace(stateWindow);
+        Iterator<Map.Entry<Long, RowData>> iterator = windowState.iterator();
+        // build a sorted map
+        TreeMap<Long, RowData> sortedMap = new TreeMap<>();
+        while (iterator.hasNext()) {
+            Map.Entry<Long, RowData> entry = iterator.next();
+            sortedMap.put(entry.getKey(), entry.getValue());
+        }
+        // emit the sorted map
+        for (Map.Entry<Long, RowData> entry : sortedMap.entrySet()) {
+            collect(entry.getValue(), Collections.singletonList(window));
+        }
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, TimeWindow> timer) throws 
Exception {
+        triggerContext.setWindow(timer.getNamespace());
+        if (triggerContext.onEventTime(timer.getTimestamp())) {
+            // fire
+            emitWindowResult(triggerContext.window);
+        }
+
+        if (windowAssigner.isEventTime()) {
+            windowFunction.cleanWindowIfNeeded(triggerContext.window, 
timer.getTimestamp());
+        }
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, TimeWindow> timer) 
throws Exception {
+        triggerContext.setWindow(timer.getNamespace());
+        if (triggerContext.onProcessingTime(timer.getTimestamp())) {
+            // fire
+            emitWindowResult(triggerContext.window);
+        }
+
+        if (!windowAssigner.isEventTime()) {
+            windowFunction.cleanWindowIfNeeded(triggerContext.window, 
timer.getTimestamp());
+        }
+    }
+
+    /**
+     * In case this leads to a value greated than {@link Long#MAX_VALUE} then 
a cleanup time of
+     * {@link Long#MAX_VALUE} is returned.
+     */
+    private long getCleanupTime(TimeWindow window) {
+        // In case this leads to a value greater than Long.MAX_VALUE, then a 
cleanup
+        // time of Long.MAX_VALUE is returned.
+        long cleanupTime = Math.max(0, window.maxTimestamp());
+        cleanupTime = cleanupTime >= window.maxTimestamp() ? cleanupTime : 
Long.MAX_VALUE;
+        return toEpochMillsForTimer(cleanupTime, shiftTimeZone);
+    }
+
+    private static Trigger<TimeWindow> createTrigger(
+            GroupWindowAssigner<TimeWindow> windowAssigner) {
+        if (windowAssigner.isEventTime()) {
+            return EventTimeTriggers.afterEndOfWindow();
+        } else {
+            return ProcessingTimeTriggers.afterEndOfWindow();
+        }
+    }
+
+    private class WindowContextImpl
+            implements MergingWindowProcessFunction.MergingContext<RowData, 
TimeWindow> {
+
+        @Override
+        public void deleteCleanupTimer(TimeWindow window) throws Exception {
+            long cleanupTime = 
UnalignedWindowTableFunctionOperator.this.getCleanupTime(window);
+            if (cleanupTime == Long.MAX_VALUE) {
+                // no need to clean up because we didn't set one
+                return;
+            }
+            if (windowAssigner.isEventTime()) {
+                triggerContext.deleteEventTimeTimer(cleanupTime);
+            } else {
+                triggerContext.deleteProcessingTimeTimer(cleanupTime);
+            }
+        }
+
+        @Override
+        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor)
+                throws Exception {
+            requireNonNull(stateDescriptor, "The state properties must not be 
null");
+            return 
UnalignedWindowTableFunctionOperator.this.getPartitionedState(stateDescriptor);
+        }
+
+        @Override
+        public RowData currentKey() {
+            return (RowData) 
UnalignedWindowTableFunctionOperator.this.getCurrentKey();
+        }
+
+        @Override
+        public long currentProcessingTime() {
+            return internalTimerService.currentProcessingTime();
+        }
+
+        @Override
+        public long currentWatermark() {
+            return internalTimerService.currentWatermark();
+        }
+
+        @Override
+        public ZoneId getShiftTimeZone() {
+            return shiftTimeZone;
+        }
+
+        @Override
+        public void clearWindowState(TimeWindow window) throws Exception {
+            windowState.setCurrentNamespace(window);
+            windowState.clear();
+        }
+
+        @Override
+        public void clearTrigger(TimeWindow window) throws Exception {
+            triggerContext.setWindow(window);
+            triggerContext.clear();
+        }
+
+        @Override
+        public void onMerge(TimeWindow newWindow, Collection<TimeWindow> 
mergedWindows)
+                throws Exception {
+            triggerContext.setWindow(newWindow);
+            triggerContext.setMergedWindows(mergedWindows);
+            triggerContext.onMerge();
+        }
+
+        @Override
+        public void clearPreviousState(TimeWindow window) throws Exception {}
+
+        @Override
+        public RowData getWindowAccumulators(TimeWindow window) throws 
Exception {
+            return null;
+        }
+
+        @Override
+        public void setWindowAccumulators(TimeWindow window, RowData acc) 
throws Exception {}
+
+        @Override
+        public BiConsumerWithException<TimeWindow, Collection<TimeWindow>, 
Throwable>
+                getWindowStateMergingConsumer() {
+            return new MergingConsumer(windowState);
+        }
+    }
+
+    private class TriggerContextImpl implements Trigger.OnMergeContext {
+
+        private TimeWindow window;
+        private Collection<TimeWindow> mergedWindows;
+
+        public void open() throws Exception {
+            trigger.open(this);
+        }
+
+        public boolean onElement(RowData row, long timestamp) throws Exception 
{
+            return trigger.onElement(row, timestamp, window);
+        }
+
+        public boolean onProcessingTime(long time) throws Exception {
+            return trigger.onProcessingTime(time, window);
+        }
+
+        public boolean onEventTime(long time) throws Exception {
+            return trigger.onEventTime(time, window);
+        }
+
+        public void onMerge() throws Exception {
+            trigger.onMerge(window, this);
+        }
+
+        public void setWindow(TimeWindow window) {
+            this.window = window;
+        }
+
+        public void setMergedWindows(Collection<TimeWindow> mergedWindows) {
+            this.mergedWindows = mergedWindows;
+        }
+
+        @Override
+        public long getCurrentProcessingTime() {
+            return internalTimerService.currentProcessingTime();
+        }
+
+        @Override
+        public long getCurrentWatermark() {
+            return internalTimerService.currentWatermark();
+        }
+
+        @Override
+        public void registerProcessingTimeTimer(long time) {
+            internalTimerService.registerProcessingTimeTimer(window, time);
+        }
+
+        @Override
+        public void registerEventTimeTimer(long time) {
+            internalTimerService.registerEventTimeTimer(window, time);
+        }
+
+        @Override
+        public void deleteProcessingTimeTimer(long time) {
+            internalTimerService.deleteProcessingTimeTimer(window, time);
+        }
+
+        @Override
+        public void deleteEventTimeTimer(long time) {
+            internalTimerService.deleteEventTimeTimer(window, time);
+        }
+
+        @Override
+        public ZoneId getShiftTimeZone() {
+            return shiftTimeZone;
+        }
+
+        public void clear() throws Exception {
+            trigger.clear(window);
+        }
+
+        @Override
+        public <S extends MergingState<?, ?>> void mergePartitionedState(
+                StateDescriptor<S, ?> stateDescriptor) {
+            if (mergedWindows != null && !mergedWindows.isEmpty()) {
+                try {
+                    State state =
+                            
UnalignedWindowTableFunctionOperator.this.getOrCreateKeyedState(
+                                    windowSerializer, stateDescriptor);
+                    if (state instanceof InternalMergingState) {
+                        ((InternalMergingState<RowData, TimeWindow, ?, ?, ?>) 
state)
+                                .mergeNamespaces(window, mergedWindows);
+                    } else {
+                        throw new IllegalArgumentException(
+                                "The given state descriptor does not refer to 
a mergeable state (MergingState)");
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException("Error while merging state.", 
e);
+                }
+            }
+        }
+
+        @Override
+        public MetricGroup getMetricGroup() {
+            return UnalignedWindowTableFunctionOperator.this.getMetricGroup();
+        }
+
+        @Override
+        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> 
stateDescriptor) {
+            try {
+                return 
UnalignedWindowTableFunctionOperator.this.getPartitionedState(
+                        window, windowSerializer, stateDescriptor);
+            } catch (Exception e) {
+                throw new RuntimeException("Could not retrieve state", e);
+            }
+        }
+    }
+
+    private static class MergingConsumer
+            implements BiConsumerWithException<TimeWindow, 
Collection<TimeWindow>, Throwable> {
+
+        private final InternalMapState<RowData, TimeWindow, Long, RowData> 
windowState;
+
+        public MergingConsumer(InternalMapState<RowData, TimeWindow, Long, 
RowData> windowState) {
+            this.windowState = windowState;
+        }
+
+        @Override
+        public void accept(
+                TimeWindow stateWindowResult, Collection<TimeWindow> 
stateWindowsToBeMerged)
+                throws Throwable {
+            for (TimeWindow mergedWindow : stateWindowsToBeMerged) {
+                windowState.setCurrentNamespace(mergedWindow);
+                Iterator<Map.Entry<Long, RowData>> iterator = 
windowState.iterator();
+                windowState.setCurrentNamespace(stateWindowResult);
+                while (iterator.hasNext()) {
+                    Map.Entry<Long, RowData> entry = iterator.next();
+                    windowState.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+    }
+
+    /**
+     * A dummy window aggregator is used to reuse the same logic in legacy 
group session window
+     * operator.
+     *
+     * <p>Instead of using window aggregator, we use a custom {@link 
MergingConsumer} to merge the
+     * accumulators in state.
+     */
+    private static class DummyWindowAggregator
+            implements NamespaceAggsHandleFunctionBase<TimeWindow> {
+
+        private final IllegalStateException thrown =
+                new IllegalStateException(
+                        "The function should not be called in 
DummyWindowAggregator");
+
+        @Override
+        public void open(StateDataViewStore store) throws Exception {}
+
+        @Override
+        public void setAccumulators(TimeWindow namespace, RowData 
accumulators) throws Exception {
+            throw thrown;
+        }
+
+        @Override
+        public void accumulate(RowData inputRow) throws Exception {
+            throw thrown;
+        }
+
+        @Override
+        public void retract(RowData inputRow) throws Exception {
+            throw thrown;
+        }
+
+        @Override
+        public void merge(TimeWindow namespace, RowData otherAcc) throws 
Exception {
+            throw thrown;
+        }
+
+        @Override
+        public RowData createAccumulators() throws Exception {
+            throw thrown;
+        }
+
+        @Override
+        public RowData getAccumulators() throws Exception {
+            throw thrown;
+        }
+
+        @Override
+        public void cleanup(TimeWindow namespace) throws Exception {
+            throw thrown;
+        }
+
+        @Override
+        public void close() throws Exception {}
+    }
+
+    @VisibleForTesting
+    public Counter getNumLateRecordsDropped() {
+        return numLateRecordsDropped;
+    }
+
+    @VisibleForTesting
+    public Gauge<Long> getWatermarkLatency() {
+        return watermarkLatency;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
index d2494267516..9c60ac9af58 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.runtime.operators.window.tvf.operator;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -43,6 +45,9 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 public abstract class WindowTableFunctionOperatorBase extends 
TableStreamOperator<RowData>
         implements OneInputStreamOperator<RowData, RowData> {
 
+    private static final String NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME =
+            "numNullRowTimeRecordsDropped";
+
     /**
      * The shift timezone of the window, if the proctime or rowtime type is 
TIMESTAMP_LTZ, the shift
      * timezone is the timezone user configured in TableConfig, other cases 
the timezone is UTC
@@ -60,6 +65,12 @@ public abstract class WindowTableFunctionOperatorBase 
extends TableStreamOperato
     private transient JoinedRowData outRow;
     private transient GenericRowData windowProperties;
 
+    // ------------------------------------------------------------------------
+    // Metrics
+    // ------------------------------------------------------------------------
+
+    protected transient Counter numNullRowTimeRecordsDropped;
+
     public WindowTableFunctionOperatorBase(
             GroupWindowAssigner<TimeWindow> windowAssigner,
             int rowtimeIndex,
@@ -80,6 +91,10 @@ public abstract class WindowTableFunctionOperatorBase 
extends TableStreamOperato
 
         outRow = new JoinedRowData();
         windowProperties = new GenericRowData(3);
+
+        // metrics
+        this.numNullRowTimeRecordsDropped =
+                metrics.counter(NULL_ROW_TIME_ELEMENTS_DROPPED_METRIC_NAME);
     }
 
     @Override
@@ -101,4 +116,9 @@ public abstract class WindowTableFunctionOperatorBase 
extends TableStreamOperato
             collector.collect(outRow.replace(inputRow, windowProperties));
         }
     }
+
+    @VisibleForTesting
+    public Counter getNumNullRowTimeRecordsDropped() {
+        return numNullRowTimeRecordsDropped;
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
index bcf511c9408..b22c9a3b1b7 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/AlignedWindowTableFunctionOperatorTest.java
@@ -18,25 +18,14 @@
 
 package org.apache.flink.table.runtime.operators.window.tvf.operator;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.runtime.operators.window.TimeWindow;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.CumulativeWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SlidingWindowAssigner;
 import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.TumblingWindowAssigner;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
-import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
-import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarCharType;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,19 +37,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
-import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.binaryRecord;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AlignedWindowTableFunctionOperator}. */
 @RunWith(Parameterized.class)
-public class AlignedWindowTableFunctionOperatorTest {
-
-    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
-    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
-    private final ZoneId shiftTimeZone;
+public class AlignedWindowTableFunctionOperatorTest extends 
WindowTableFunctionOperatorTestBase {
 
     public AlignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
-        this.shiftTimeZone = shiftTimeZone;
+        super(shiftTimeZone);
     }
 
     @Parameterized.Parameters(name = "TimeZone = {0}")
@@ -68,33 +53,6 @@ public class AlignedWindowTableFunctionOperatorTest {
         return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] 
{SHANGHAI_ZONE_ID});
     }
 
-    private static final RowType INPUT_ROW_TYPE =
-            new RowType(
-                    Arrays.asList(
-                            new RowType.RowField("f0", new 
VarCharType(Integer.MAX_VALUE)),
-                            new RowType.RowField("f1", new IntType()),
-                            new RowType.RowField("f2", new TimestampType(3))));
-
-    private static final RowDataSerializer INPUT_ROW_SER = new 
RowDataSerializer(INPUT_ROW_TYPE);
-    private static final int ROW_TIME_INDEX = 2;
-
-    private static final LogicalType[] OUTPUT_TYPES =
-            new LogicalType[] {
-                new VarCharType(Integer.MAX_VALUE),
-                new IntType(),
-                new TimestampType(3),
-                new TimestampType(3),
-                new TimestampType(3),
-                new TimestampType(3)
-            };
-
-    private static final TypeSerializer<RowData> OUT_SERIALIZER =
-            new RowDataSerializer(OUTPUT_TYPES);
-
-    private static final RowDataHarnessAssertor ASSERTER =
-            new RowDataHarnessAssertor(
-                    OUTPUT_TYPES, new GenericRowRecordSortComparator(4, new 
TimestampType()));
-
     @Test
     public void testTumblingWindows() throws Exception {
         final TumblingWindowAssigner assigner = 
TumblingWindowAssigner.of(Duration.ofSeconds(3));
@@ -119,9 +77,18 @@ public class AlignedWindowTableFunctionOperatorTest {
 
         // late element would not be dropped
         testHarness.processElement(insertRecord("key2", 1, 80L));
+        // rowtime is null, should be dropped
+        testHarness.processElement(insertRecord("key2", 1, ((Long) null)));
         expectedOutput.add(insertRecord("key2", 1, 80L, localMills(0L), 
localMills(3000L), 2999L));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        assertThat(
+                        ((AlignedWindowTableFunctionOperator) 
testHarness.getOperator())
+                                .getNumNullRowTimeRecordsDropped()
+                                .getCount())
+                .isEqualTo(1);
+        testHarness.close();
     }
 
     @Test
@@ -148,6 +115,7 @@ public class AlignedWindowTableFunctionOperatorTest {
                         "key2", 1, Long.MAX_VALUE, localMills(3000L), 
localMills(6000L), 5999L));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
     }
 
     @Test
@@ -189,6 +157,7 @@ public class AlignedWindowTableFunctionOperatorTest {
         expectedOutput.add(insertRecord("key2", 1, 80L, localMills(0L), 
localMills(3000L), 2999L));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
     }
 
     @Test
@@ -228,6 +197,7 @@ public class AlignedWindowTableFunctionOperatorTest {
                         "key2", 1, Long.MAX_VALUE, localMills(3000L), 
localMills(6000L), 5999L));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
     }
 
     @Test
@@ -265,6 +235,7 @@ public class AlignedWindowTableFunctionOperatorTest {
         expectedOutput.add(insertRecord("key2", 1, 80L, localMills(0L), 
localMills(3000L), 2999L));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
     }
 
     @Test
@@ -302,6 +273,7 @@ public class AlignedWindowTableFunctionOperatorTest {
                         "key2", 1, Long.MAX_VALUE, localMills(3000L), 
localMills(6000L), 5999L));
         ASSERTER.assertOutputEqualsSorted(
                 "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+        testHarness.close();
     }
 
     private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
@@ -311,19 +283,4 @@ public class AlignedWindowTableFunctionOperatorTest {
                         windowAssigner, ROW_TIME_INDEX, shiftTimeZone);
         return new OneInputStreamOperatorTestHarness<>(operator, 
INPUT_ROW_SER);
     }
-
-    private StreamRecord<RowData> insertRecord(String f0, int f1, Long... f2) {
-        Object[] fields = new Object[2 + f2.length];
-        fields[0] = f0;
-        fields[1] = f1;
-        for (int idx = 0; idx < f2.length; idx++) {
-            fields[2 + idx] = TimestampData.fromEpochMillis(f2[idx]);
-        }
-        return new StreamRecord<>(row(fields));
-    }
-
-    /** Get the timestamp in mills by given epoch mills and timezone. */
-    private long localMills(long epochMills) {
-        return toUtcTimestampMills(epochMills, shiftTimeZone);
-    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
new file mode 100644
index 00000000000..3bc2df1bea2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperatorTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.EmptyRowDataKeySelector;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.SessionWindowAssigner;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UnalignedWindowTableFunctionOperator}. */
+@RunWith(Parameterized.class)
+public class UnalignedWindowTableFunctionOperatorTest extends 
WindowTableFunctionOperatorTestBase {
+
+    public UnalignedWindowTableFunctionOperatorTest(ZoneId shiftTimeZone) {
+        super(shiftTimeZone);
+    }
+
+    @Test
+    public void testEventTimeSessionWindows() throws Exception {
+        final SessionWindowAssigner assigner = 
SessionWindowAssigner.withGap(Duration.ofSeconds(3));
+        UnalignedWindowTableFunctionOperator operator = 
createOperator(assigner, ROW_TIME_INDEX);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement(insertRecord("key1", 1, 20L));
+        testHarness.processElement(insertRecord("key2", 1, 3999L));
+        testHarness.processWatermark(new Watermark(999));
+        expectedOutput.add(new Watermark(999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(1999));
+        expectedOutput.add(new Watermark(1999));
+        testHarness.processWatermark(new Watermark(2999));
+        expectedOutput.add(new Watermark(2999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(3999));
+        // append 3 fields: window_start, window_end, window_time
+        expectedOutput.add(insertRecord("key1", 1, 20L, localMills(20L), 
localMills(3020L), 3019L));
+        expectedOutput.add(new Watermark(3999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // the window end is 5000L, so it's not a late record
+        testHarness.processElement(insertRecord("key1", 1, 2000L));
+        testHarness.processWatermark(new Watermark(4999));
+        expectedOutput.add(
+                insertRecord("key1", 1, 2000L, localMills(2000L), 
localMills(5000L), 4999L));
+        expectedOutput.add(new Watermark(4999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // test out-of-order records
+        testHarness.processElement(insertRecord("key2", 2, 7999L));
+        testHarness.processElement(insertRecord("key2", 3, 5999L));
+        testHarness.processWatermark(new Watermark(5999));
+        expectedOutput.add(new Watermark(5999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(6999));
+        expectedOutput.add(new Watermark(6999));
+        testHarness.processWatermark(new Watermark(7999));
+        expectedOutput.add(new Watermark(7999));
+        testHarness.processWatermark(new Watermark(8999));
+        expectedOutput.add(new Watermark(8999));
+        testHarness.processWatermark(new Watermark(9999));
+        expectedOutput.add(new Watermark(9999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+
+        expectedOutput.clear();
+        testHarness = createTestHarness(operator);
+        testHarness.setup();
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.processWatermark(new Watermark(10999));
+
+        // late record should be dropped
+        testHarness.processElement(insertRecord("key1", 1, 999L));
+
+        // rowtime is null, should be dropped
+        testHarness.processElement(insertRecord("key1", 1, ((Long) null)));
+
+        expectedOutput.add(
+                insertRecord("key2", 1, 3999L, localMills(3999L), 
localMills(10999L), 10998L));
+        expectedOutput.add(
+                insertRecord("key2", 2, 7999L, localMills(3999L), 
localMills(10999L), 10998L));
+        expectedOutput.add(
+                insertRecord("key2", 3, 5999L, localMills(3999L), 
localMills(10999L), 10998L));
+        expectedOutput.add(new Watermark(10999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        
assertThat(operator.getNumLateRecordsDropped().getCount()).isEqualTo(1);
+        
assertThat(operator.getNumNullRowTimeRecordsDropped().getCount()).isEqualTo(1);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testProcessTimeSessionWindows() throws Exception {
+        final SessionWindowAssigner assigner =
+                
SessionWindowAssigner.withGap(Duration.ofSeconds(3)).withProcessingTime();
+        UnalignedWindowTableFunctionOperator operator = 
createOperator(assigner, -1);
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                createTestHarness(operator);
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        // timestamp is ignored in processing time
+        testHarness.setProcessingTime(20L);
+        testHarness.processElement(insertRecord("key1", 1, Long.MAX_VALUE));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(1999);
+        testHarness.setProcessingTime(2999);
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(3999L);
+        testHarness.processElement(insertRecord("key2", 1, Long.MAX_VALUE));
+
+        // append 3 fields: window_start, window_end, window_time
+        expectedOutput.add(
+                insertRecord("key1", 1, Long.MAX_VALUE, localMills(20L), 
localMills(3020L), 3019L));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(4999);
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // test records with second field out of order
+        testHarness.setProcessingTime(5999);
+        testHarness.processElement(insertRecord("key2", 3, Long.MAX_VALUE));
+
+        testHarness.setProcessingTime(7999);
+        testHarness.processElement(insertRecord("key2", 2, Long.MAX_VALUE));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.setProcessingTime(6999);
+        testHarness.setProcessingTime(7999);
+        testHarness.setProcessingTime(8999);
+        testHarness.setProcessingTime(9999);
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+
+        expectedOutput.clear();
+        testHarness = createTestHarness(operator);
+        testHarness.setup();
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.setProcessingTime(10999);
+
+        expectedOutput.add(
+                insertRecord(
+                        "key2", 1, Long.MAX_VALUE, localMills(3999L), 
localMills(10999L), 10998L));
+        expectedOutput.add(
+                insertRecord(
+                        "key2", 3, Long.MAX_VALUE, localMills(3999L), 
localMills(10999L), 10998L));
+        expectedOutput.add(
+                insertRecord(
+                        "key2", 2, Long.MAX_VALUE, localMills(3999L), 
localMills(10999L), 10998L));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        
assertThat(operator.getWatermarkLatency().getValue()).isEqualTo(Long.valueOf(0L));
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testSessionWindowsWithoutPartitionKeys() throws Exception {
+        final SessionWindowAssigner assigner = 
SessionWindowAssigner.withGap(Duration.ofSeconds(3));
+        UnalignedWindowTableFunctionOperator operator = 
createOperator(assigner, ROW_TIME_INDEX);
+
+        final EmptyRowDataKeySelector keySelector = 
EmptyRowDataKeySelector.INSTANCE;
+
+        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, keySelector, keySelector.getProducedType());
+        testHarness.setup(OUT_SERIALIZER);
+        testHarness.open();
+
+        // process elements
+        ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement(insertRecord("key1", 1, 1999L));
+        testHarness.processElement(insertRecord("key2", 1, 3999L));
+        testHarness.processWatermark(new Watermark(999));
+        expectedOutput.add(new Watermark(999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.processWatermark(new Watermark(1999));
+        expectedOutput.add(new Watermark(1999));
+        testHarness.processWatermark(new Watermark(3999));
+        expectedOutput.add(new Watermark(3999));
+        testHarness.processWatermark(new Watermark(5999));
+        expectedOutput.add(new Watermark(5999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        // do a snapshot, close and restore again
+        testHarness.prepareSnapshotPreBarrier(0L);
+        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
+        testHarness.close();
+
+        expectedOutput.clear();
+        testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        operator, keySelector, keySelector.getProducedType());
+        testHarness.setup();
+        testHarness.initializeState(snapshot);
+        testHarness.open();
+
+        testHarness.processWatermark(new Watermark(6999));
+
+        // append 3 fields: window_start, window_end, window_time
+        expectedOutput.add(
+                insertRecord("key1", 1, 1999L, localMills(1999L), 
localMills(6999L), 6998L));
+        expectedOutput.add(
+                insertRecord("key2", 1, 3999L, localMills(1999L), 
localMills(6999L), 6998L));
+        expectedOutput.add(new Watermark(6999));
+
+        ASSERTER.assertOutputEqualsSorted(
+                "Output was not correct.", expectedOutput, 
testHarness.getOutput());
+
+        testHarness.close();
+    }
+
+    protected static final RowDataKeySelector KEY_SELECTOR =
+            HandwrittenSelectorUtil.getRowDataSelector(
+                    new int[] {0}, INPUT_ROW_TYPE.getChildren().toArray(new 
LogicalType[0]));
+
+    private OneInputStreamOperatorTestHarness<RowData, RowData> 
createTestHarness(
+            UnalignedWindowTableFunctionOperator operator) throws Exception {
+
+        return new KeyedOneInputStreamOperatorTestHarness<>(
+                operator, KEY_SELECTOR, KEY_SELECTOR.getProducedType());
+    }
+
+    private UnalignedWindowTableFunctionOperator createOperator(
+            GroupWindowAssigner<TimeWindow> windowAssigner, int rowTimeIndex) {
+        return new UnalignedWindowTableFunctionOperator(
+                windowAssigner,
+                windowAssigner.getWindowSerializer(new ExecutionConfig()),
+                new RowDataSerializer(INPUT_ROW_TYPE),
+                rowTimeIndex,
+                shiftTimeZone);
+    }
+
+    @Parameterized.Parameters(name = "TimeZone = {0}")
+    public static Collection<Object[]> runMode() {
+        return Arrays.asList(new Object[] {UTC_ZONE_ID}, new Object[] 
{SHANGHAI_ZONE_ID});
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
new file mode 100644
index 00000000000..fe25b1025dd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorTestBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
+import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.time.ZoneId;
+import java.util.Arrays;
+
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills;
+
+/** A test base to test window table function operator . */
+public abstract class WindowTableFunctionOperatorTestBase {
+
+    protected static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
+    protected static final ZoneId SHANGHAI_ZONE_ID = 
ZoneId.of("Asia/Shanghai");
+    protected final ZoneId shiftTimeZone;
+
+    public WindowTableFunctionOperatorTestBase(ZoneId shiftTimeZone) {
+        this.shiftTimeZone = shiftTimeZone;
+    }
+
+    /** Get the timestamp in mills by given epoch mills and timezone. */
+    protected long localMills(long epochMills) {
+        return toUtcTimestampMills(epochMills, shiftTimeZone);
+    }
+
+    // ============================== Utils ==============================
+
+    // ============================== Util Fields 
==============================
+
+    protected static final RowType INPUT_ROW_TYPE =
+            new RowType(
+                    Arrays.asList(
+                            new RowType.RowField("f0", new 
VarCharType(Integer.MAX_VALUE)),
+                            new RowType.RowField("f1", new IntType()),
+                            new RowType.RowField("f2", new TimestampType(3))));
+
+    protected static final RowDataSerializer INPUT_ROW_SER = new 
RowDataSerializer(INPUT_ROW_TYPE);
+    protected static final int ROW_TIME_INDEX = 2;
+
+    protected static final LogicalType[] OUTPUT_TYPES =
+            new LogicalType[] {
+                new VarCharType(Integer.MAX_VALUE),
+                new IntType(),
+                new TimestampType(3),
+                new TimestampType(3),
+                new TimestampType(3),
+                new TimestampType(3)
+            };
+
+    protected static final TypeSerializer<RowData> OUT_SERIALIZER =
+            new RowDataSerializer(OUTPUT_TYPES);
+
+    protected static final RowDataHarnessAssertor ASSERTER =
+            new RowDataHarnessAssertor(
+                    OUTPUT_TYPES, new GenericRowRecordSortComparator(4, new 
TimestampType()));
+
+    // ============================== Util Functions 
==============================
+
+    protected static StreamRecord<RowData> insertRecord(String f0, int f1, 
Long... f2) {
+        Object[] fields = new Object[2 + f2.length];
+        fields[0] = f0;
+        fields[1] = f1;
+        for (int idx = 0; idx < f2.length; idx++) {
+            if (f2[idx] == null) {
+                fields[2 + idx] = null;
+            } else {
+                fields[2 + idx] = TimestampData.fromEpochMillis(f2[idx]);
+            }
+        }
+        return new StreamRecord<>(row(fields));
+    }
+}

Reply via email to