[FLINK-2074] Fix erroneous emission of Sliding Time PreReducer

Before this, a sliding time window would keep emitting the last result
because the number of elements per pre-aggregation result was not
correctly reset on eviction.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6dbec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6dbec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6dbec

Branch: refs/heads/master
Commit: 6bc6dbec6878f58500370b2e6912ad5022c5bf78
Parents: 4de2353
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri May 22 16:30:58 2015 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue May 26 10:21:31 2015 +0200

----------------------------------------------------------------------
 .../windowbuffer/SlidingGroupedPreReducer.java  |   1 +
 .../windowbuffer/SlidingTimePreReducer.java     |   1 +
 .../SlidingTimeGroupedPreReducerTest.java       | 168 +++++++++++++++----
 .../windowbuffer/SlidingTimePreReducerTest.java | 116 ++++++++++---
 4 files changed, 230 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
index 0872c6e..09fadf9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java
@@ -143,6 +143,7 @@ public abstract class SlidingGroupedPreReducer<T> extends 
SlidingPreReducer<T> {
        @Override
        protected void resetCurrent() {
                currentReducedMap = null;
+               elementsSinceLastPreAggregate = 0;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
index 7652d81..d84505c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java
@@ -89,6 +89,7 @@ public class SlidingTimePreReducer<T> extends 
SlidingPreReducer<T> {
 
                if (toRemove > 0 && lastPreAggregateSize == null) {
                        currentReduced = null;
+                       elementsSinceLastPreAggregate = 0;
                        toRemove = 0;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 3438f42..18a4748 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -23,10 +23,14 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import 
org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
@@ -37,61 +41,136 @@ import org.junit.Test;
 public class SlidingTimeGroupedPreReducerTest {
 
        TypeSerializer<Integer> serializer = 
TypeExtractor.getForObject(1).createSerializer(null);
+       TypeInformation<Tuple2<Integer,Integer>> tupleType = 
TypeInfoParser.parse("Tuple2<Integer,Integer>");
+
 
        ReduceFunction<Integer> reducer = new SumReducer();
+       ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new 
TupleSumReducer();
+
 
        KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+       KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
 
        @Test
        public void testPreReduce1() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               // This ensures that the buffer is properly cleared after a 
burst of elements by
+               // replaying the same sequence of elements with a later 
timestamp and expecting the same
+               // result.
 
-               SlidingTimeGroupedPreReducer<Integer> preReducer = new 
SlidingTimeGroupedPreReducer<Integer>(
-                               reducer, serializer, key, 3, 2, new 
TimestampWrapper<Integer>(
-                                               new Timestamp<Integer>() {
+               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
 
-                                                       private static final 
long serialVersionUID = 1L;
+               SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> 
preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, 
Integer>>(tupleReducer,
+                               tupleType.createSerializer(new 
ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, 
Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
 
-                                                       @Override
-                                                       public long 
getTimestamp(Integer value) {
-                                                               return value;
-                                                       }
-                                               }, 1));
+                       private static final long serialVersionUID = 1L;
 
-               preReducer.store(1);
-               preReducer.store(2);
+                       @Override
+                       public long getTimestamp(Tuple2<Integer, Integer> 
value) {
+                               return value.f0;
+                       }
+               }, 1));
+
+               int timeOffset = 0;
+
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 
1));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 
2));
                preReducer.emitWindow(collector);
-               preReducer.store(3);
-               preReducer.store(4);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 
3));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 
4));
                preReducer.evict(1);
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(5);
-               preReducer.store(6);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 
5));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 
6));
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(7);
-               preReducer.store(8);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 
7));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 
8));
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(9);
-               preReducer.store(10);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 
9));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 
10));
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(11);
-               preReducer.store(12);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 
11));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 
12));
                preReducer.emitWindow(collector);
-               preReducer.store(13);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 
13));
 
-               List<StreamWindow<Integer>> expected = new 
ArrayList<StreamWindow<Integer>>();
-               expected.add(StreamWindow.fromElements(1, 2));
-               expected.add(StreamWindow.fromElements(3, 6));
-               expected.add(StreamWindow.fromElements(5, 10));
-               expected.add(StreamWindow.fromElements(7, 14));
-               expected.add(StreamWindow.fromElements(9, 18));
-               expected.add(StreamWindow.fromElements(11, 22));
+               // ensure that everything is cleared out
+               preReducer.evict(100);
 
-               checkResults(expected, collector.getCollected());
+
+               timeOffset = 25; // a little while later...
+
+               // Repeat the same sequence, this should produce the same result
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 
1));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 
2));
+               preReducer.emitWindow(collector);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 
3));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 
4));
+               preReducer.evict(1);
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 
5));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 
6));
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 
7));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 
8));
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 
9));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 
10));
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 
11));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 
12));
+               preReducer.emitWindow(collector);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 
13));
+
+               List<StreamWindow<Tuple2<Integer, Integer>>> expected = new 
ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
+               timeOffset = 0; // rewind ...
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 2, 2),
+                               new Tuple2<Integer, Integer>(timeOffset + 1, 
1)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 2, 6),
+                               new Tuple2<Integer, Integer>(timeOffset + 3, 
3)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 4, 
10),
+                               new Tuple2<Integer, Integer>(timeOffset + 5, 
5)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 6, 
14),
+                               new Tuple2<Integer, Integer>(timeOffset + 7, 
7)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 8, 
18),
+                               new Tuple2<Integer, Integer>(timeOffset + 9, 
9)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 10, 
22),
+                               new Tuple2<Integer, Integer>(timeOffset + 11, 
11)));
+
+               timeOffset = 25; // and back to the future ...
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 2, 2),
+                               new Tuple2<Integer, Integer>(timeOffset + 1, 
1)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 2, 6),
+                               new Tuple2<Integer, Integer>(timeOffset + 3, 
3)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 4, 
10),
+                               new Tuple2<Integer, Integer>(timeOffset + 5, 
5)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 6, 
14),
+                               new Tuple2<Integer, Integer>(timeOffset + 7, 
7)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 8, 
18),
+                               new Tuple2<Integer, Integer>(timeOffset + 9, 
9)));
+               expected.add(StreamWindow.fromElements(
+                               new Tuple2<Integer, Integer>(timeOffset + 10, 
22),
+                               new Tuple2<Integer, Integer>(timeOffset + 11, 
11)));
+
+               assertEquals(expected, collector.getCollected());
        }
 
        protected static void checkResults(List<StreamWindow<Integer>> expected,
@@ -277,4 +356,31 @@ public class SlidingTimeGroupedPreReducerTest {
                }
 
        }
+
+       private static class TupleSumReducer implements 
ReduceFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> 
value1, Tuple2<Integer, Integer> value2) throws Exception {
+                       return new Tuple2<Integer, Integer>(value1.f0, 
value1.f1 + value2.f1);
+               }
+
+       }
+
+       public static class TupleModKey implements KeySelector<Tuple2<Integer, 
Integer>, Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private int m;
+
+               public TupleModKey(int m) {
+                       this.m = m;
+               }
+
+               @Override
+               public Integer getKey(Tuple2<Integer, Integer> value) throws 
Exception {
+                       return value.f1 % m;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index bc3b13b..a48bc0c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -22,9 +22,13 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
@@ -34,56 +38,106 @@ import org.junit.Test;
 public class SlidingTimePreReducerTest {
 
        TypeSerializer<Integer> serializer = 
TypeExtractor.getForObject(1).createSerializer(null);
+       TypeInformation<Tuple2<Integer,Integer>> tupleType = 
TypeInfoParser.parse("Tuple2<Integer,Integer>");
 
        ReduceFunction<Integer> reducer = new SumReducer();
+       ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new 
TupleSumReducer();
 
        @Test
        public void testPreReduce1() throws Exception {
-               TestCollector<StreamWindow<Integer>> collector = new 
TestCollector<StreamWindow<Integer>>();
+               // This ensures that the buffer is properly cleared after a 
burst of elements by
+               // replaying the same sequence of elements with a later 
timestamp and expecting the same
+               // result.
 
-               SlidingTimePreReducer<Integer> preReducer = new 
SlidingTimePreReducer<Integer>(reducer,
-                               serializer, 3, 2, new 
TimestampWrapper<Integer>(new Timestamp<Integer>() {
+               TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector 
= new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+
+               SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = 
new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
+                               tupleType.createSerializer(new 
ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new 
Timestamp<Tuple2<Integer, Integer>>() {
 
                                        private static final long 
serialVersionUID = 1L;
 
                                        @Override
-                                       public long getTimestamp(Integer value) 
{
-                                               return value;
+                                       public long 
getTimestamp(Tuple2<Integer, Integer> value) {
+                                               return value.f0;
                                        }
                                }, 1));
 
-               preReducer.store(1);
-               preReducer.store(2);
+               int timeOffset = 0;
+
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 
1));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 
2));
                preReducer.emitWindow(collector);
-               preReducer.store(3);
-               preReducer.store(4);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 
3));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 
4));
                preReducer.evict(1);
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(5);
-               preReducer.store(6);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 
5));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 
6));
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(7);
-               preReducer.store(8);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 
7));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 
8));
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(9);
-               preReducer.store(10);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 
9));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 
10));
                preReducer.emitWindow(collector);
                preReducer.evict(2);
-               preReducer.store(11);
-               preReducer.store(12);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 
11));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 
12));
                preReducer.emitWindow(collector);
-               preReducer.store(13);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 
13));
+
+               // ensure that everything is cleared out
+               preReducer.evict(100);
+
+
+               timeOffset = 25; // a little while later...
+
+               // Repeat the same sequence, this should produce the same result
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 
1));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 
2));
+               preReducer.emitWindow(collector);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 
3));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 
4));
+               preReducer.evict(1);
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 
5));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 
6));
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 
7));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 
8));
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 
9));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 
10));
+               preReducer.emitWindow(collector);
+               preReducer.evict(2);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 
11));
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 
12));
+               preReducer.emitWindow(collector);
+               preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 
13));
+
+               List<StreamWindow<Tuple2<Integer, Integer>>> expected = new 
ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
+               timeOffset = 0; // rewind ...
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 1, 3)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 2, 9)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 4, 15)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 6, 21)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 8, 27)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 10, 33)));
+
+               timeOffset = 25; // and back to the future ...
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 1, 3)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 2, 9)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 4, 15)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 6, 21)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 8, 27)));
+               expected.add(StreamWindow.fromElements(new Tuple2<Integer, 
Integer>(timeOffset + 10, 33)));
 
-               List<StreamWindow<Integer>> expected = new 
ArrayList<StreamWindow<Integer>>();
-               expected.add(StreamWindow.fromElements(3));
-               expected.add(StreamWindow.fromElements(9));
-               expected.add(StreamWindow.fromElements(15));
-               expected.add(StreamWindow.fromElements(21));
-               expected.add(StreamWindow.fromElements(27));
-               expected.add(StreamWindow.fromElements(33));
 
                assertEquals(expected, collector.getCollected());
        }
@@ -254,4 +308,16 @@ public class SlidingTimePreReducerTest {
                }
 
        }
+
+       private static class TupleSumReducer implements 
ReduceFunction<Tuple2<Integer, Integer>> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> 
value1, Tuple2<Integer, Integer> value2) throws Exception {
+                       return new Tuple2<Integer, Integer>(value1.f0, 
value1.f1 + value2.f1);
+               }
+
+       }
+
 }

Reply via email to