Repository: flink
Updated Branches:
  refs/heads/master 2e2330737 -> 6b01a8902


[FLINK-3178] Don't Emit In-Flight Windows When Closing Window Operator

This closes #1542


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4e5a55f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4e5a55f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4e5a55f

Branch: refs/heads/master
Commit: c4e5a55f027ed73ce557f10d5125a0b168832889
Parents: 2e23307
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Jan 18 13:25:03 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 28 14:19:17 2016 +0100

----------------------------------------------------------------------
 .../examples/windowing/SessionWindowing.java    |   2 +-
 .../util/TopSpeedWindowingExampleData.java      |   8 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   5 -
 .../windowing/NonKeyedWindowOperator.java       |  14 +--
 .../operators/windowing/WindowOperator.java     |  17 ++-
 .../api/complex/ComplexIntegrationTest.java     |   3 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 110 ++-----------------
 ...AlignedProcessingTimeWindowOperatorTest.java |  89 ++++-----------
 .../util/OneInputStreamOperatorTestHarness.java |   3 +-
 9 files changed, 54 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 035727a..baa4af8 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -116,7 +116,7 @@ public class SessionWindowing {
                        // Update the last seen event time
                        lastSeenState.update(timestamp);
 
-                       ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
+                       ctx.registerEventTimeTimer(timestamp + sessionTimeout);
 
                        if (timeSinceLastEvent > sessionTimeout) {
                                return TriggerResult.FIRE_AND_PURGE;

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
index bf63695..4718b8b 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/TopSpeedWindowingExampleData.java
@@ -192,9 +192,7 @@ public class TopSpeedWindowingExampleData {
                                        
"(1,95,1973.6111111111115,1424952007664)\n" +
                                        
"(0,100,1709.7222222222229,1424952006663)\n" +
                                        
"(0,100,1737.5000000000007,1424952007664)\n" +
-                                       
"(1,95,1973.6111111111115,1424952007664)\n" +
-                                       
"(0,100,1791.6666666666674,1424952009664)\n" +
-                                       
"(1,95,2211.1111111111118,1424952017668)\n";
+                                       
"(1,95,1973.6111111111115,1424952007664)\n";
 
        public static final String TOP_CASE_CLASS_SPEEDS =
                        "CarEvent(0,55,15.277777777777777,1424951918630)\n" +
@@ -267,9 +265,7 @@ public class TopSpeedWindowingExampleData {
                                        
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
                                        
"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
                                        
"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
-                                       
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
-                                       
"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
-                                       
"CarEvent(1,95,2211.1111111111118,1424952017668)\n";
+                                       
"CarEvent(1,95,1973.6111111111115,1424952007664)\n";
 
        private TopSpeedWindowingExampleData() {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 677a7dd..b8c95aa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -171,13 +171,8 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
        public void close() throws Exception {
                super.close();
                
-               final long finalWindowTimestamp = nextEvaluationTime;
-
                // early stop the triggering thread, so it does not attempt to 
return any more data
                stopTriggers();
-
-               // emit the remaining data
-               computeWindow(finalWindowTimestamp);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 7823631..2afa1e7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -217,14 +217,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends 
Window>
        }
 
        @Override
-       public final void close() throws Exception {
-               super.close();
-               // emit the elements that we still keep
-               for (Context window: windows.values()) {
-                       emitWindow(window);
-               }
+       public final void dispose() {
+               super.dispose();
                windows.clear();
-               windowBufferFactory.close();
+               try {
+                       windowBufferFactory.close();
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while closing 
WindowBufferFactory.", e);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 68c3a5f..4a50efb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -251,17 +251,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        }
 
        @Override
-       public final void close() throws Exception {
-               super.close();
-               // emit the elements that we still keep
-               for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
-                       Map<W, Context> keyWindows = entry.getValue();
-                       for (Context window: keyWindows.values()) {
-                               emitWindow(window);
-                       }
-               }
+       public final void dispose() {
+               super.dispose();
                windows.clear();
-               windowBufferFactory.close();
+               try {
+                       windowBufferFactory.close();
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while closing 
WindowBufferFactory.", e);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index 020dda3..9b0a6d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -214,6 +214,7 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
 
        @SuppressWarnings("unchecked")
        @Test
+       @Ignore
        public void complexIntegrationTest3() throws Exception {
                //Heavy prime factorisation with maps and flatmaps
 
@@ -225,7 +226,7 @@ public class ComplexIntegrationTest extends 
StreamingMultipleProgramsTestBase {
                expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" 
+ "10939\n" + "4409\n" +
                                "5279\n" + "11927\n" + "6133\n" + "6997\n" + 
"12823\n" + "7919\n" + "8831\n" +
                                "13763\n" + "9733\n" + "9973\n" + "14759\n" + 
"15671\n" + "16673\n" + "17659\n" +
-                               "18617\n" + "19697\n" + "19997\n";
+                               "18617\n";
 
                for (int i = 2; i < 100; i++) {
                        expected2 += "(" + i + "," + 20000 / i + ")\n";

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index ed0f04a..8722802 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -251,12 +251,14 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
+                       // get and verify the result
+                       out.waitForNElements(numElements, 60_000);
+
                        synchronized (lock) {
                                op.close();
                        }
                        op.dispose();
 
-                       // get and verify the result
                        List<Integer> result = out.getElements();
                        assertEquals(numElements, result.size());
 
@@ -441,102 +443,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        timerService.shutdown();
                }
        }
-       
-       @Test
-       public void testEmitTrailingDataOnClose() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
-               try {
-                       final CollectingOutput<Integer> out = new 
CollectingOutput<>();
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
-                       
-                       // the operator has a window time that is so long that 
it will not fire in this test
-                       final long oneYear = 365L * 24 * 60 * 60 * 1000;
-                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
-                                       new 
AccumulatingProcessingTimeWindowOperator<>(
-                                                       
validatingIdentityFunction, identitySelector,
-                                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
-                                                       oneYear, oneYear);
-
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
-                       
-                       List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10);
-                       for (Integer i : data) {
-                               synchronized (lock) {
-                                       op.processElement(new 
StreamRecord<Integer>(i));
-                               }
-                       }
-
-                       synchronized (lock) {
-                               op.close();
-                       }
-                       op.dispose();
-                       
-                       // get and verify the result
-                       List<Integer> result = out.getElements();
-                       Collections.sort(result);
-                       assertEquals(data, result);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       timerService.shutdown();
-               }
-       }
-
-       @Test
-       public void testPropagateExceptionsFromClose() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
-               try {
-                       final CollectingOutput<Integer> out = new 
CollectingOutput<>();
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
-
-                       WindowFunction<Integer, Integer, Integer, TimeWindow> 
failingFunction = new FailingFunction(100);
-
-                       // the operator has a window time that is so long that 
it will not fire in this test
-                       final long hundredYears = 100L * 365 * 24 * 60 * 60 * 
1000;
-                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
-                                       new 
AccumulatingProcessingTimeWindowOperator<>(
-                                                       failingFunction, 
identitySelector,
-                                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
-                                                       hundredYears, 
hundredYears);
-
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
-
-                       for (int i = 0; i < 150; i++) {
-                               synchronized (lock) {
-                                       op.processElement(new 
StreamRecord<Integer>(i));
-                               }
-                       }
-                       
-                       try {
-                               synchronized (lock) {
-                                       op.close();
-                               }
-                               fail("This should fail with an exception");
-                       }
-                       catch (Exception e) {
-                               assertTrue(
-                                               
e.getMessage().contains("Artificial Test Exception") ||
-                                               (e.getCause() != null && 
e.getCause().getMessage().contains("Artificial Test Exception")));
-                       }
 
-                       op.dispose();
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       timerService.shutdown();
-               }
-       }
-       
        @Test
        public void checkpointRestoreWithPendingWindowTumbling() {
                final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
@@ -607,16 +514,19 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
-                       synchronized (lock) {
-                               op.close();
-                       }
-                       op.dispose();
+
+                       out2.waitForNElements(numElements - 
resultAtSnapshot.size(), 60_000);
 
                        // get and verify the result
                        List<Integer> finalResult = new 
ArrayList<>(resultAtSnapshot);
                        finalResult.addAll(out2.getElements());
                        assertEquals(numElements, finalResult.size());
 
+                       synchronized (lock) {
+                               op.close();
+                       }
+                       op.dispose();
+
                        Collections.sort(finalResult);
                        for (int i = 0; i < numElements; i++) {
                                assertEquals(i, finalResult.get(i).intValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index b3e59e5..611916e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -269,14 +269,17 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
+                       out.waitForNElements(numElements, 60_000);
+
+                       // get and verify the result
+                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
+                       assertEquals(numElements, result.size());
+
                        synchronized (lock) {
                                op.close();
                        }
                        op.dispose();
 
-                       // get and verify the result
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
-                       assertEquals(numElements, result.size());
 
                        Collections.sort(result, tupleComparator);
                        for (int i = 0; i < numElements; i++) {
@@ -335,13 +338,15 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
+                       out.waitForNElements(numWindows, 60_000);
+
+                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
+
                        synchronized (lock) {
                                op.close();
                        }
                        op.dispose();
-                       
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
-                       
+
                        // we have ideally one element per window. we may have 
more, when we emitted a value into the
                        // successive window (corner case), so we can have 
twice the number of elements, in the worst case.
                        assertTrue(result.size() >= numWindows && result.size() 
<= 2 * numWindows);
@@ -487,57 +492,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        timerService.shutdown();
                }
        }
-       
-       @Test
-       public void testEmitTrailingDataOnClose() {
-               final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
-               try {
-                       final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
-                       final Object lock = new Object();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
-                       
-                       // the operator has a window time that is so long that 
it will not fire in this test
-                       final long oneYear = 365L * 24 * 60 * 60 * 1000;
-                       AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
-                                       new 
AggregatingProcessingTimeWindowOperator<>(
-                                                       sumFunction, 
fieldOneSelector,
-                                                       IntSerializer.INSTANCE, 
tupleSerializer, oneYear, oneYear);
-
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
-                       
-                       List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10);
-                       for (Integer i : data) {
-                               synchronized (lock) {
-                                       StreamRecord<Tuple2<Integer, Integer>> 
next = new StreamRecord<>(new Tuple2<>(i, i));
-                                       op.setKeyContextElement(next);
-                                       op.processElement(next);
-                               }
-                       }
-
-                       synchronized (lock) {
-                               op.close();
-                       }
-                       op.dispose();
-                       
-                       // get and verify the result
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
-                       assertEquals(data.size(), result.size());
-                       
-                       Collections.sort(result, tupleComparator);
-                       for (int i = 0; i < data.size(); i++) {
-                               assertEquals(data.get(i), result.get(i).f0);
-                               assertEquals(data.get(i), result.get(i).f1);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       timerService.shutdown();
-               }
-       }
 
        @Test
        public void testPropagateExceptionsFromProcessElement() {
@@ -667,16 +621,18 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                Thread.sleep(1);
                        }
 
-                       synchronized (lock) {
-                               op.close();
-                       }
-                       op.dispose();
+                       out2.waitForNElements(numElements - 
resultAtSnapshot.size(), 60_000);
 
                        // get and verify the result
                        List<Tuple2<Integer, Integer>> finalResult = new 
ArrayList<>(resultAtSnapshot);
                        finalResult.addAll(out2.getElements());
                        assertEquals(numElements, finalResult.size());
 
+                       synchronized (lock) {
+                               op.close();
+                       }
+                       op.dispose();
+
                        Collections.sort(finalResult, tupleComparator);
                        for (int i = 0; i < numElements; i++) {
                                assertEquals(i, 
finalResult.get(i).f0.intValue());
@@ -812,7 +768,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        public void testKeyValueStateInWindowFunctionTumbling() {
                final ScheduledExecutorService timerService = 
Executors.newSingleThreadScheduledExecutor();
                try {
-                       final long hundredYears = 100L * 365 * 24 * 60 * 60 * 
1000;
+                       final long thirtySeconds = 30_000;
                        
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
                        final Object lock = new Object();
@@ -823,7 +779,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
                                                        new StatefulFunction(), 
fieldOneSelector,
-                                                       IntSerializer.INSTANCE, 
tupleSerializer, hundredYears, hundredYears);
+                                                       IntSerializer.INSTANCE, 
tupleSerializer, thirtySeconds, thirtySeconds);
 
                        op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE), out);
                        op.open();
@@ -841,10 +797,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        op.setKeyContextElement(next2);
                                        op.processElement(next2);
                                }
-
-                               op.close();
                        }
 
+                       out.waitForNElements(2, 60_000);
+
                        List<Tuple2<Integer, Integer>> result = 
out.getElements();
                        assertEquals(2, result.size());
 
@@ -854,7 +810,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
                        assertEquals(10, 
StatefulFunction.globalCounts.get(1).intValue());
                        assertEquals(10, 
StatefulFunction.globalCounts.get(2).intValue());
-               
+
+                       op.close();
                        op.dispose();
                }
                catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c4e5a55f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 01f95bc..258e30a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -111,10 +111,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Calls close on the operator.
+        * Calls close and dispose on the operator.
         */
        public void close() throws Exception {
                operator.close();
+               operator.dispose();
        }
 
        public void processElement(StreamRecord<IN> element) throws Exception {

Reply via email to