[FLINK-2074] Fix erroneous emission of Sliding Time PreReducer Before this, a sliding time window would keep emitting the last result because the number of elements per pre-aggregation result was not correctly reset on eviction.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6dbec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6dbec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6dbec Branch: refs/heads/master Commit: 6bc6dbec6878f58500370b2e6912ad5022c5bf78 Parents: 4de2353 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri May 22 16:30:58 2015 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue May 26 10:21:31 2015 +0200 ---------------------------------------------------------------------- .../windowbuffer/SlidingGroupedPreReducer.java | 1 + .../windowbuffer/SlidingTimePreReducer.java | 1 + .../SlidingTimeGroupedPreReducerTest.java | 168 +++++++++++++++---- .../windowbuffer/SlidingTimePreReducerTest.java | 116 ++++++++++--- 4 files changed, 230 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java index 0872c6e..09fadf9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingGroupedPreReducer.java @@ -143,6 +143,7 @@ public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> { @Override protected void resetCurrent() { currentReducedMap = null; + elementsSinceLastPreAggregate = 0; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java index 7652d81..d84505c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducer.java @@ -89,6 +89,7 @@ public class SlidingTimePreReducer<T> extends SlidingPreReducer<T> { if (toRemove > 0 && lastPreAggregateSize == null) { currentReduced = null; + elementsSinceLastPreAggregate = 0; toRemove = 0; } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java index 3438f42..18a4748 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java @@ -23,10 +23,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.helper.Timestamp; @@ -37,61 +41,136 @@ import org.junit.Test; public class SlidingTimeGroupedPreReducerTest { TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null); + TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>"); + ReduceFunction<Integer> reducer = new SumReducer(); + ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer(); + KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2); + KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2); @Test public void testPreReduce1() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + // This ensures that the buffer is properly cleared after a burst of elements by + // replaying the same sequence of elements with a later timestamp and expecting the same + // result. - SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>( - reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>( - new Timestamp<Integer>() { + TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); - private static final long serialVersionUID = 1L; + SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer, + tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() { - @Override - public long getTimestamp(Integer value) { - return value; - } - }, 1)); + private static final long serialVersionUID = 1L; - preReducer.store(1); - preReducer.store(2); + @Override + public long getTimestamp(Tuple2<Integer, Integer> value) { + return value.f0; + } + }, 1)); + + int timeOffset = 0; + + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2)); preReducer.emitWindow(collector); - preReducer.store(3); - preReducer.store(4); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4)); preReducer.evict(1); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(5); - preReducer.store(6); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6)); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(7); - preReducer.store(8); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8)); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(9); - preReducer.store(10); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10)); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(11); - preReducer.store(12); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12)); preReducer.emitWindow(collector); - preReducer.store(13); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13)); - List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>(); - expected.add(StreamWindow.fromElements(1, 2)); - expected.add(StreamWindow.fromElements(3, 6)); - expected.add(StreamWindow.fromElements(5, 10)); - expected.add(StreamWindow.fromElements(7, 14)); - expected.add(StreamWindow.fromElements(9, 18)); - expected.add(StreamWindow.fromElements(11, 22)); + // ensure that everything is cleared out + preReducer.evict(100); - checkResults(expected, collector.getCollected()); + + timeOffset = 25; // a little while later... + + // Repeat the same sequence, this should produce the same result + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2)); + preReducer.emitWindow(collector); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4)); + preReducer.evict(1); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6)); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8)); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10)); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12)); + preReducer.emitWindow(collector); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13)); + + List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>(); + timeOffset = 0; // rewind ... + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 2, 2), + new Tuple2<Integer, Integer>(timeOffset + 1, 1))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 2, 6), + new Tuple2<Integer, Integer>(timeOffset + 3, 3))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 4, 10), + new Tuple2<Integer, Integer>(timeOffset + 5, 5))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 6, 14), + new Tuple2<Integer, Integer>(timeOffset + 7, 7))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 8, 18), + new Tuple2<Integer, Integer>(timeOffset + 9, 9))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 10, 22), + new Tuple2<Integer, Integer>(timeOffset + 11, 11))); + + timeOffset = 25; // and back to the future ... + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 2, 2), + new Tuple2<Integer, Integer>(timeOffset + 1, 1))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 2, 6), + new Tuple2<Integer, Integer>(timeOffset + 3, 3))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 4, 10), + new Tuple2<Integer, Integer>(timeOffset + 5, 5))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 6, 14), + new Tuple2<Integer, Integer>(timeOffset + 7, 7))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 8, 18), + new Tuple2<Integer, Integer>(timeOffset + 9, 9))); + expected.add(StreamWindow.fromElements( + new Tuple2<Integer, Integer>(timeOffset + 10, 22), + new Tuple2<Integer, Integer>(timeOffset + 11, 11))); + + assertEquals(expected, collector.getCollected()); } protected static void checkResults(List<StreamWindow<Integer>> expected, @@ -277,4 +356,31 @@ public class SlidingTimeGroupedPreReducerTest { } } + + private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { + return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1); + } + + } + + public static class TupleModKey implements KeySelector<Tuple2<Integer, Integer>, Integer> { + + private static final long serialVersionUID = 1L; + + private int m; + + public TupleModKey(int m) { + this.m = m; + } + + @Override + public Integer getKey(Tuple2<Integer, Integer> value) throws Exception { + return value.f1 % m; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6dbec/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java index bc3b13b..a48bc0c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java @@ -22,9 +22,13 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.windowing.StreamWindow; import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; @@ -34,56 +38,106 @@ import org.junit.Test; public class SlidingTimePreReducerTest { TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null); + TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>"); ReduceFunction<Integer> reducer = new SumReducer(); + ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer(); @Test public void testPreReduce1() throws Exception { - TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>(); + // This ensures that the buffer is properly cleared after a burst of elements by + // replaying the same sequence of elements with a later timestamp and expecting the same + // result. - SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer, - serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() { + TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>(); + + SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer, + tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() { private static final long serialVersionUID = 1L; @Override - public long getTimestamp(Integer value) { - return value; + public long getTimestamp(Tuple2<Integer, Integer> value) { + return value.f0; } }, 1)); - preReducer.store(1); - preReducer.store(2); + int timeOffset = 0; + + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2)); preReducer.emitWindow(collector); - preReducer.store(3); - preReducer.store(4); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4)); preReducer.evict(1); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(5); - preReducer.store(6); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6)); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(7); - preReducer.store(8); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8)); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(9); - preReducer.store(10); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10)); preReducer.emitWindow(collector); preReducer.evict(2); - preReducer.store(11); - preReducer.store(12); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12)); preReducer.emitWindow(collector); - preReducer.store(13); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13)); + + // ensure that everything is cleared out + preReducer.evict(100); + + + timeOffset = 25; // a little while later... + + // Repeat the same sequence, this should produce the same result + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2)); + preReducer.emitWindow(collector); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4)); + preReducer.evict(1); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6)); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8)); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10)); + preReducer.emitWindow(collector); + preReducer.evict(2); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11)); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12)); + preReducer.emitWindow(collector); + preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13)); + + List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>(); + timeOffset = 0; // rewind ... + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33))); + + timeOffset = 25; // and back to the future ... + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27))); + expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33))); - List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>(); - expected.add(StreamWindow.fromElements(3)); - expected.add(StreamWindow.fromElements(9)); - expected.add(StreamWindow.fromElements(15)); - expected.add(StreamWindow.fromElements(21)); - expected.add(StreamWindow.fromElements(27)); - expected.add(StreamWindow.fromElements(33)); assertEquals(expected, collector.getCollected()); } @@ -254,4 +308,16 @@ public class SlidingTimePreReducerTest { } } + + private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { + return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1); + } + + } + }