Fixes the GroupAlsoByWindowTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52614ea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52614ea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52614ea3 Branch: refs/heads/master Commit: 52614ea36a7431d83f907d99d3fb251c2f2b3551 Parents: 69f7623 Author: kl0u <[email protected]> Authored: Wed Mar 2 16:09:53 2016 +0100 Committer: Davor Bonaci <[email protected]> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../streaming/GroupAlsoByWindowTest.java | 70 ++++++++++---------- 1 file changed, 36 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52614ea3/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java index 434f827..01f9c32 100644 --- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java @@ -116,7 +116,7 @@ public class GroupAlsoByWindowTest { new Instant(initialTime + 1), new IntervalWindow(new Instant(0), new Instant(2000)), PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime)); + , initialTime + 1)); expectedOutput.add(new Watermark(initialTime + 2000)); expectedOutput.add(new StreamRecord<>( @@ -124,14 +124,15 @@ public class GroupAlsoByWindowTest { new Instant(initialTime + 1999), new IntervalWindow(new Instant(0), new Instant(2000)), PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1)) - , initialTime)); + , initialTime + 1999)); + expectedOutput.add(new StreamRecord<>( WindowedValue.of(KV.of("key1", 6), new Instant(initialTime + 1999), new IntervalWindow(new Instant(0), new Instant(2000)), PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2)) - , initialTime)); + , initialTime + 1999)); expectedOutput.add(new Watermark(initialTime + 4000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -181,7 +182,7 @@ public class GroupAlsoByWindowTest { new Instant(initialTime + 1), new IntervalWindow(new Instant(1), new Instant(5700)), PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime)); + , initialTime + 1)); expectedOutput.add(new Watermark(initialTime + 6000)); expectedOutput.add(new StreamRecord<>( @@ -189,7 +190,7 @@ public class GroupAlsoByWindowTest { new Instant(initialTime + 6700), new IntervalWindow(new Instant(1), new Instant(10900)), PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0)) - , initialTime)); + , initialTime + 6700)); expectedOutput.add(new Watermark(initialTime + 12000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -210,13 +211,13 @@ public class GroupAlsoByWindowTest { new Instant(initialTime + 5000), new IntervalWindow(new Instant(0), new Instant(10000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 5000)); expectedOutput.add(new StreamRecord<>( WindowedValue.of(KV.of("key1", 6), new Instant(initialTime + 1), new IntervalWindow(new Instant(-5000), new Instant(5000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 1)); expectedOutput.add(new Watermark(initialTime + 10000)); expectedOutput.add(new StreamRecord<>( @@ -224,19 +225,19 @@ public class GroupAlsoByWindowTest { new Instant(initialTime + 15000), new IntervalWindow(new Instant(10000), new Instant(20000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 15000)); expectedOutput.add(new StreamRecord<>( WindowedValue.of(KV.of("key1", 3), new Instant(initialTime + 10000), new IntervalWindow(new Instant(5000), new Instant(15000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 10000)); expectedOutput.add(new StreamRecord<>( WindowedValue.of(KV.of("key2", 1), new Instant(initialTime + 19500), new IntervalWindow(new Instant(10000), new Instant(20000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 19500)); expectedOutput.add(new Watermark(initialTime + 20000)); expectedOutput.add(new StreamRecord<>( @@ -250,13 +251,13 @@ public class GroupAlsoByWindowTest { */ new IntervalWindow(new Instant(15000), new Instant(25000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 20000)); expectedOutput.add(new StreamRecord<>( WindowedValue.of(KV.of("key1", 8), new Instant(initialTime + 20000), new IntervalWindow(new Instant(15000), new Instant(25000)), PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)) - , initialTime)); + , initialTime + 20000)); expectedOutput.add(new Watermark(initialTime + 25000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -272,13 +273,13 @@ public class GroupAlsoByWindowTest { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1)); expectedOutput.add(new Watermark(initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); expectedOutput.add(new Watermark(initialTime + 20000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -295,13 +296,13 @@ public class GroupAlsoByWindowTest { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime)); + new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime)); + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000)); expectedOutput.add(new Watermark(initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500)); expectedOutput.add(new Watermark(initialTime + 20000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -326,21 +327,21 @@ public class GroupAlsoByWindowTest { * */ expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), - new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime)); + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); expectedOutput.add(new Watermark(initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); expectedOutput.add(new Watermark(initialTime + 20000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -348,7 +349,7 @@ public class GroupAlsoByWindowTest { testHarness.close(); } - // Disabled + @Test public void testCompoundAccumulatingPanesProgram() throws Exception { WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc; long initialTime = 0L; @@ -357,21 +358,21 @@ public class GroupAlsoByWindowTest { ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5), - new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime)); + new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6), - new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime)); + new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200)); expectedOutput.add(new Watermark(initialTime + 10000)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11), - new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500)); expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1), - new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime)); + new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500)); expectedOutput.add(new Watermark(initialTime + 20000)); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator()); @@ -434,11 +435,12 @@ public class GroupAlsoByWindowTest { StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1; StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2; - if (sr0.getTimestamp() != sr1.getTimestamp()) { - return (int) (sr0.getTimestamp() - sr1.getTimestamp()); + int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis()); + if (comparison != 0) { + return comparison; } - int comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); + comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey()); if(comparison == 0) { comparison = Integer.compare( sr0.getValue().getValue().getValue(),
