[FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging

Before, when a Trigger returns TriggerResult.PURGE from any of the
on*() methods the WindowOperator will clear all state of that window
(window contents, merging window set) and call Trigger.clear() so that the
Trigger can clean up its state/timers.

This was problematic in some cases. For example, with merging windows (session
windows) this means that a late-arriving element will not be put into the
session that was previously built up but will be put into a completely new
session that only contains this one element.

The new behaviour is this:
 * Only clean window contents on PURGE
 * Register cleanup timer for any window, don't delete this on PURGE
 * When the cleanup timer fires: clean window state, clean merging window set,
call Trigger.clear() to allow it to clean state/timers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b331a42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b331a42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b331a42

Branch: refs/heads/master
Commit: 0b331a421267a541d91e94f2713534704ed32bed
Parents: bcca3fe
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Nov 2 11:51:07 2016 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Jan 24 10:42:34 2017 +0100

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       | 119 +++++++-------
 .../operators/windowing/WindowOperator.java     | 155 +++++++++++--------
 .../operators/windowing/WindowOperatorTest.java |  11 +-
 3 files changed, 158 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index d9c977a..45fea14 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -1,4 +1,4 @@
-/**
+/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
@@ -57,20 +57,20 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <W> The type of {@code Window} that the {@code WindowAssigner} 
assigns.
  */
 @Internal
-public class EvictingWindowOperator<K, IN, OUT, W extends Window> 
+public class EvictingWindowOperator<K, IN, OUT, W extends Window>
                extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
 
        private static final long serialVersionUID = 1L;
 
        // 
------------------------------------------------------------------------
-       // these fields are set by the API stream graph builder to configure 
the operator 
-       
+       // these fields are set by the API stream graph builder to configure 
the operator
+
        private final Evictor<? super IN, ? super W> evictor;
 
        private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> 
evictingWindowStateDescriptor;
 
        // 
------------------------------------------------------------------------
-       // the fields below are instantiated once the operator runs in the 
runtime 
+       // the fields below are instantiated once the operator runs in the 
runtime
 
        private transient EvictorContext evictorContext;
 
@@ -146,7 +146,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                if (stateWindow == null) {
                                        throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
                                }
-                               
+
                                
evictingWindowState.setCurrentNamespace(stateWindow);
                                evictingWindowState.add(element);
 
@@ -163,14 +163,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                                // if we have no state, there 
is nothing to do
                                                continue;
                                        }
-                                       fire(actualWindow, contents, 
evictingWindowState);
+                                       emitWindowContents(actualWindow, 
contents, evictingWindowState);
                                }
 
                                if (triggerResult.isPurge()) {
-                                       cleanup(actualWindow, 
evictingWindowState, mergingWindows);
-                               } else {
-                                       registerCleanupTimer(actualWindow);
+                                       evictingWindowState.clear();
                                }
+                               registerCleanupTimer(actualWindow);
                        }
 
                        mergingWindows.persist();
@@ -198,14 +197,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                                                // if we have no state, there 
is nothing to do
                                                continue;
                                        }
-                                       fire(window, contents, 
evictingWindowState);
+                                       emitWindowContents(window, contents, 
evictingWindowState);
                                }
 
                                if (triggerResult.isPurge()) {
-                                       cleanup(window, evictingWindowState, 
null);
-                               } else {
-                                       registerCleanupTimer(window);
+                                       evictingWindowState.clear();
                                }
+                               registerCleanupTimer(window);
                        }
                }
        }
@@ -218,37 +216,42 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                evictorContext.key = timer.getKey();
                evictorContext.window = timer.getNamespace();
 
-               ListState<StreamRecord<IN>> windowState;
                MergingWindowSet<W> mergingWindows = null;
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
                        if (stateWindow == null) {
-                               // then the window is already purged and this 
is a cleanup
-                               // timer set due to allowed lateness that has 
nothing to clean,
-                               // so it is safe to just ignore
+                               // Timer firing for non-existent window, this 
can only happen if a
+                               // trigger did not clean up timers. We have 
already cleared the merging
+                               // window and therefore the Trigger state, 
however, so nothing to do.
                                return;
+                       } else {
+                               
evictingWindowState.setCurrentNamespace(stateWindow);
                        }
-                       
-                       evictingWindowState.setCurrentNamespace(stateWindow);
                } else {
                        evictingWindowState.setCurrentNamespace(context.window);
                }
 
                Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
-               if (contents == null) {
-                       // if we have no state, there is nothing to do
-                       return;
+
+               if (contents != null) {
+                       TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
+                       if (triggerResult.isFire()) {
+                               emitWindowContents(context.window, contents, 
evictingWindowState);
+                       }
+                       if (triggerResult.isPurge()) {
+                               evictingWindowState.clear();
+                       }
                }
 
-               TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
-               if (triggerResult.isFire()) {
-                       fire(context.window, contents, evictingWindowState);
+               if (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
+                       clearAllState(context.window, evictingWindowState, 
mergingWindows);
                }
 
-               if (triggerResult.isPurge() || (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp()))) {
-                       cleanup(context.window, evictingWindowState, 
mergingWindows);
+               if (mergingWindows != null) {
+                       // need to make sure to update the merging state in 
state
+                       mergingWindows.persist();
                }
        }
 
@@ -259,40 +262,46 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                evictorContext.key = timer.getKey();
                evictorContext.window = timer.getNamespace();
 
-               ListState<StreamRecord<IN>> windowState;
                MergingWindowSet<W> mergingWindows = null;
 
                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
                        if (stateWindow == null) {
-                               // then the window is already purged and this 
is a cleanup
-                               // timer set due to allowed lateness that has 
nothing to clean,
-                               // so it is safe to just ignore
+                               // Timer firing for non-existent window, this 
can only happen if a
+                               // trigger did not clean up timers. We have 
already cleared the merging
+                               // window and therefore the Trigger state, 
however, so nothing to do.
                                return;
+                       } else {
+                               
evictingWindowState.setCurrentNamespace(stateWindow);
                        }
-                       evictingWindowState.setCurrentNamespace(stateWindow);
                } else {
                        evictingWindowState.setCurrentNamespace(context.window);
                }
 
                Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
-               if (contents == null) {
-                       // if we have no state, there is nothing to do
-                       return;
+
+               if (contents != null) {
+                       TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+                       if (triggerResult.isFire()) {
+                               emitWindowContents(context.window, contents, 
evictingWindowState);
+                       }
+                       if (triggerResult.isPurge()) {
+                               evictingWindowState.clear();
+                       }
                }
 
-               TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
-               if (triggerResult.isFire()) {
-                       fire(context.window, contents, evictingWindowState);
+               if (!windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
+                       clearAllState(context.window, evictingWindowState, 
mergingWindows);
                }
 
-               if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
-                       cleanup(context.window, evictingWindowState, 
mergingWindows);
+               if (mergingWindows != null) {
+                       // need to make sure to update the merging state in 
state
+                       mergingWindows.persist();
                }
        }
 
-       private void fire(W window, Iterable<StreamRecord<IN>> contents, 
ListState<StreamRecord<IN>> windowState) throws Exception {
+       private void emitWindowContents(W window, Iterable<StreamRecord<IN>> 
contents, ListState<StreamRecord<IN>> windowState) throws Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
                // Work around type system restrictions...
@@ -326,6 +335,18 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                }
        }
 
+       private void clearAllState(
+                       W window,
+                       ListState<StreamRecord<IN>> windowState,
+                       MergingWindowSet<W> mergingWindows) throws Exception {
+
+               windowState.clear();
+               context.clear();
+               if (mergingWindows != null) {
+                       mergingWindows.retireWindow(window);
+                       mergingWindows.persist();
+               }
+       }
 
        /**
         * {@code EvictorContext} is a utility for handling {@code Evictor} 
invocations. It can be reused
@@ -372,24 +393,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                }
        }
 
-       private void cleanup(W window,
-                                               ListState<StreamRecord<IN>> 
windowState,
-                                               MergingWindowSet<W> 
mergingWindows) throws Exception {
-
-               windowState.clear();
-               if (mergingWindows != null) {
-                       mergingWindows.retireWindow(window);
-                       mergingWindows.persist();
-               }
-               context.clear();
-       }
-
        @Override
        public void open() throws Exception {
                super.open();
 
                evictorContext = new EvictorContext(null,null);
-               evictingWindowState = (InternalListState<W, StreamRecord<IN>>) 
+               evictingWindowState = (InternalListState<W, StreamRecord<IN>>)
                                getOrCreateKeyedState(windowSerializer, 
evictingWindowStateDescriptor);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 0dbaffd..3c4f397 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -139,8 +139,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        /** The state in which the window contents is stored. Each window is a 
namespace */
        private transient InternalAppendingState<W, IN, ACC> windowState;
 
-       /** The {@link #windowState}, typed to merging state for merging 
windows.
-        * Null if the window state is not mergeable */
+       /**
+        * The {@link #windowState}, typed to merging state for merging windows.
+        * Null if the window state is not mergeable.
+        */
        private transient InternalMergingState<W, IN, ACC> windowMergingState;
 
        /** The state that holds the merging window metadata (the sets that 
describe what is merged) */
@@ -292,7 +294,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        new 
ListStateDescriptor<>("merging-window-set", tupleSerializer);
 
                        // get the state that stores the merging sets
-                       mergingSetsState = (InternalListState<VoidNamespace, 
Tuple2<W, W>>) 
+                       mergingSetsState = (InternalListState<VoidNamespace, 
Tuple2<W, W>>)
                                        
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, 
mergingSetsStateDescriptor);
                        
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
                }
@@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        public void processElement(StreamRecord<IN> element) throws Exception {
                final Collection<W> elementWindows = 
windowAssigner.assignWindows(
                        element.getValue(), element.getTimestamp(), 
windowAssignerContext);
-               
+
                final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
                if (windowAssigner instanceof MergingWindowAssigner) {
@@ -376,14 +378,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        if (contents == null) {
                                                continue;
                                        }
-                                       fire(actualWindow, contents);
+                                       emitWindowContents(actualWindow, 
contents);
                                }
 
                                if (triggerResult.isPurge()) {
-                                       cleanup(actualWindow, windowState, 
mergingWindows);
-                               } else {
-                                       registerCleanupTimer(actualWindow);
+                                       windowState.clear();
                                }
+                               registerCleanupTimer(actualWindow);
                        }
 
                        // need to make sure to update the merging state in 
state
@@ -409,14 +410,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        if (contents == null) {
                                                continue;
                                        }
-                                       fire(window, contents);
+                                       emitWindowContents(window, contents);
                                }
 
                                if (triggerResult.isPurge()) {
-                                       cleanup(window, windowState, null);
-                               } else {
-                                       registerCleanupTimer(window);
+                                       windowState.clear();
                                }
+                               registerCleanupTimer(window);
                        }
                }
        }
@@ -432,31 +432,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        mergingWindows = getMergingWindowSet();
                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
                        if (stateWindow == null) {
-                               // then the window is already purged and this 
is a cleanup
-                               // timer set due to allowed lateness that has 
nothing to clean,
-                               // so it is safe to just ignore
+                               // Timer firing for non-existent window, this 
can only happen if a
+                               // trigger did not clean up timers. We have 
already cleared the merging
+                               // window and therefore the Trigger state, 
however, so nothing to do.
                                return;
+                       } else {
+                               windowState.setCurrentNamespace(stateWindow);
                        }
-                       
-                       windowState.setCurrentNamespace(stateWindow);
                } else {
                        windowState.setCurrentNamespace(context.window);
                        mergingWindows = null;
                }
 
-               ACC contents = windowState.get();
-               if (contents == null) {
-                       // if we have no state, there is nothing to do
-                       return;
+               ACC contents = null;
+               if (windowState != null) {
+                       contents = windowState.get();
                }
 
-               TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
-               if (triggerResult.isFire()) {
-                       fire(context.window, contents);
+               if (contents != null) {
+                       TriggerResult triggerResult = 
context.onEventTime(timer.getTimestamp());
+                       if (triggerResult.isFire()) {
+                               emitWindowContents(context.window, contents);
+                       }
+                       if (triggerResult.isPurge()) {
+                               windowState.clear();
+                       }
                }
 
-               if (triggerResult.isPurge() || (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp()))) {
-                       cleanup(context.window, windowState, mergingWindows);
+               if (windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
+                       clearAllState(context.window, windowState, 
mergingWindows);
+               }
+
+               if (mergingWindows != null) {
+                       // need to make sure to update the merging state in 
state
+                       mergingWindows.persist();
                }
        }
 
@@ -471,55 +480,67 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        mergingWindows = getMergingWindowSet();
                        W stateWindow = 
mergingWindows.getStateWindow(context.window);
                        if (stateWindow == null) {
-                               // then the window is already purged and this 
is a cleanup
-                               // timer set due to allowed lateness that has 
nothing to clean,
-                               // so it is safe to just ignore
+                               // Timer firing for non-existent window, this 
can only happen if a
+                               // trigger did not clean up timers. We have 
already cleared the merging
+                               // window and therefore the Trigger state, 
however, so nothing to do.
                                return;
+                       } else {
+                               windowState.setCurrentNamespace(stateWindow);
                        }
-                       windowState.setCurrentNamespace(stateWindow);
                } else {
                        windowState.setCurrentNamespace(context.window);
                        mergingWindows = null;
                }
 
-               ACC contents = windowState.get();
-               if (contents == null) {
-                       // if we have no state, there is nothing to do
-                       return;
+               ACC contents = null;
+               if (windowState != null) {
+                       contents = windowState.get();
+               }
+
+               if (contents != null) {
+                       TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
+                       if (triggerResult.isFire()) {
+                               emitWindowContents(context.window, contents);
+                       }
+                       if (triggerResult.isPurge()) {
+                               windowState.clear();
+                       }
                }
 
-               TriggerResult triggerResult = 
context.onProcessingTime(timer.getTimestamp());
-               if (triggerResult.isFire()) {
-                       fire(context.window, contents);
+               if (!windowAssigner.isEventTime() && 
isCleanupTime(context.window, timer.getTimestamp())) {
+                       clearAllState(context.window, windowState, 
mergingWindows);
                }
 
-               if (triggerResult.isPurge() || (!windowAssigner.isEventTime() 
&& isCleanupTime(context.window, timer.getTimestamp()))) {
-                       cleanup(context.window, windowState, mergingWindows);
+               if (mergingWindows != null) {
+                       // need to make sure to update the merging state in 
state
+                       mergingWindows.persist();
                }
        }
 
        /**
-        * Cleans up the window state if the provided {@link TriggerResult} 
requires so, or if it
-        * is time to do so (see {@link #isCleanupTime(Window, long)}). The 
caller must ensure that the
+        * Drops all state for the given window and calls
+        * {@link Trigger#clear(Window, Trigger.TriggerContext)}.
+        *
+        * <p>The caller must ensure that the
         * correct key is set in the state backend and the context object.
         */
-       private void cleanup(W window,
-                                               AppendingState<IN, ACC> 
windowState,
-                                               MergingWindowSet<W> 
mergingWindows) throws Exception {
+       private void clearAllState(
+                       W window,
+                       AppendingState<IN, ACC> windowState,
+                       MergingWindowSet<W> mergingWindows) throws Exception {
                windowState.clear();
+               context.clear();
                if (mergingWindows != null) {
                        mergingWindows.retireWindow(window);
                        mergingWindows.persist();
                }
-               context.clear();
        }
 
        /**
-        * Triggers the window computation if the provided {@link 
TriggerResult} requires so.
-        * The caller must ensure that the correct key is set in the state 
backend and the context object.
+        * Emits the contents of the given window using the {@link 
InternalWindowFunction}.
         */
        @SuppressWarnings("unchecked")
-       private void fire(W window, ACC contents) throws Exception {
+       private void emitWindowContents(W window, ACC contents) throws 
Exception {
                
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
                userFunction.apply(context.key, context.window, contents, 
timestampedCollector);
        }
@@ -538,12 +559,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
        }
 
        /**
-        * Decides if a window is currently late or not, based on the current
-        * watermark, i.e. the current event time, and the allowed lateness.
-        * @param window
-        *                                      The collection of windows 
returned by the {@link WindowAssigner}.
-        * @return The windows (among the {@code eligibleWindows}) for which 
the element should still be
-        *                                      considered when triggering.
+        * Returns {@code true} if the watermark is after the end timestamp 
plus the allowed lateness
+        * of the given window.
         */
        protected boolean isLate(W window) {
                return (windowAssigner.isEventTime() && (cleanupTime(window) <= 
internalTimerService.currentWatermark()));
@@ -556,6 +573,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         */
        protected void registerCleanupTimer(W window) {
                long cleanupTime = cleanupTime(window);
+               if (cleanupTime == Long.MAX_VALUE) {
+                       // don't set a GC timer for "end of time"
+                       return;
+               }
+
                if (windowAssigner.isEventTime()) {
                        context.registerEventTimeTimer(cleanupTime);
                } else {
@@ -570,6 +592,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         */
        protected void deleteCleanupTimer(W window) {
                long cleanupTime = cleanupTime(window);
+               if (cleanupTime == Long.MAX_VALUE) {
+                       // no need to clean up because we didn't set one
+                       return;
+               }
                if (windowAssigner.isEventTime()) {
                        context.deleteEventTimeTimer(cleanupTime);
                } else {
@@ -587,24 +613,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
         * @param window the window whose cleanup time we are computing.
         */
        private long cleanupTime(W window) {
-               long cleanupTime = window.maxTimestamp() + allowedLateness;
-               return cleanupTime >= window.maxTimestamp() ? cleanupTime : 
Long.MAX_VALUE;
+               if (windowAssigner.isEventTime()) {
+                       long cleanupTime = window.maxTimestamp() + 
allowedLateness;
+                       return cleanupTime >= window.maxTimestamp() ? 
cleanupTime : Long.MAX_VALUE;
+               } else {
+                       return window.maxTimestamp();
+               }
        }
 
        /**
-        * Decides if it is time to clean up the window state.
-        * Clean up time for a window is:
-        *              <li> if it is event time, after the watermark passes 
the end of the window plus the user-specified allowed lateness
-        *              <li> if it is processing time, after the processing 
time at the node passes the end of the window.
-        *      @param window
-        *                                      the window to clean
-        *  @param time
-        *                              the current time (event or processing 
depending on the {@link WindowAssigner}
-        *  @return {@code true} if it is time to clean up the window state, 
{@code false} otherwise.
+        * Returns {@code true} if the given time is the cleanup time for the 
given window.
         */
        protected final boolean isCleanupTime(W window, long time) {
-               long cleanupTime = cleanupTime(window);
-               return  cleanupTime == time;
+               return time == cleanupTime(window);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2faa506..6238e6c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1601,20 +1601,21 @@ public class WindowOperatorTest extends TestLogger {
                expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 
14600L), 14599));
                expected.add(new Watermark(14600));
 
-               // dropped as late
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 10000));
 
+               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 
14600L), 14599));
+
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 14500));
                testHarness.processWatermark(new Watermark(20000));
 
-               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 
17500L), 17499));
+               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 
17500L), 17499));
                expected.add(new Watermark(20000));
 
                testHarness.processWatermark(new Watermark(100000));
                expected.add(new Watermark(100000));
 
                ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
-               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, actual, new Tuple2ResultSortComparator());
+               TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, actual, new Tuple3ResultSortComparator());
                testHarness.close();
        }
 
@@ -1780,7 +1781,7 @@ public class WindowOperatorTest extends TestLogger {
 
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 10000));
 
-               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 
13000L), 12999));
+               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 
14600L), 14599));
 
                ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expected, actual, new Tuple3ResultSortComparator());
@@ -1788,7 +1789,7 @@ public class WindowOperatorTest extends TestLogger {
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 14500));
                testHarness.processWatermark(new Watermark(20000));
 
-               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 
17500L), 17499));
+               expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 
17500L), 17499));
                expected.add(new Watermark(20000));
 
                testHarness.processWatermark(new Watermark(100000));

Reply via email to