Fixes the GroupAlsoByWindowTest.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52614ea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52614ea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52614ea3

Branch: refs/heads/master
Commit: 52614ea36a7431d83f907d99d3fb251c2f2b3551
Parents: 69f7623
Author: kl0u <[email protected]>
Authored: Wed Mar 2 16:09:53 2016 +0100
Committer: Davor Bonaci <[email protected]>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../streaming/GroupAlsoByWindowTest.java        | 70 ++++++++++----------
 1 file changed, 36 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52614ea3/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index 434f827..01f9c32 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -116,7 +116,7 @@ public class GroupAlsoByWindowTest {
                                                new Instant(initialTime + 1),
                                                new IntervalWindow(new 
Instant(0), new Instant(2000)),
                                                PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
-                               , initialTime));
+                               , initialTime + 1));
                expectedOutput.add(new Watermark(initialTime + 2000));
 
                expectedOutput.add(new StreamRecord<>(
@@ -124,14 +124,15 @@ public class GroupAlsoByWindowTest {
                                                new Instant(initialTime + 1999),
                                                new IntervalWindow(new 
Instant(0), new Instant(2000)),
                                                PaneInfo.createPane(false, 
false, PaneInfo.Timing.LATE, 1, 1))
-                               , initialTime));
+                               , initialTime + 1999));
+
 
                expectedOutput.add(new StreamRecord<>(
                                WindowedValue.of(KV.of("key1", 6),
                                                new Instant(initialTime + 1999),
                                                new IntervalWindow(new 
Instant(0), new Instant(2000)),
                                                PaneInfo.createPane(false, 
false, PaneInfo.Timing.LATE, 2, 2))
-                               , initialTime));
+                               , initialTime + 1999));
                expectedOutput.add(new Watermark(initialTime + 4000));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -181,7 +182,7 @@ public class GroupAlsoByWindowTest {
                                                new Instant(initialTime + 1),
                                                new IntervalWindow(new 
Instant(1), new Instant(5700)),
                                                PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
-                               , initialTime));
+                               , initialTime + 1));
                expectedOutput.add(new Watermark(initialTime + 6000));
 
                expectedOutput.add(new StreamRecord<>(
@@ -189,7 +190,7 @@ public class GroupAlsoByWindowTest {
                                                new Instant(initialTime + 6700),
                                                new IntervalWindow(new 
Instant(1), new Instant(10900)),
                                                PaneInfo.createPane(true, 
false, PaneInfo.Timing.ON_TIME, 0, 0))
-                               , initialTime));
+                               , initialTime + 6700));
                expectedOutput.add(new Watermark(initialTime + 12000));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -210,13 +211,13 @@ public class GroupAlsoByWindowTest {
                                                new Instant(initialTime + 5000),
                                                new IntervalWindow(new 
Instant(0), new Instant(10000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 5000));
                expectedOutput.add(new StreamRecord<>(
                                WindowedValue.of(KV.of("key1", 6),
                                                new Instant(initialTime + 1),
                                                new IntervalWindow(new 
Instant(-5000), new Instant(5000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 1));
                expectedOutput.add(new Watermark(initialTime + 10000));
 
                expectedOutput.add(new StreamRecord<>(
@@ -224,19 +225,19 @@ public class GroupAlsoByWindowTest {
                                                new Instant(initialTime + 
15000),
                                                new IntervalWindow(new 
Instant(10000), new Instant(20000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 15000));
                expectedOutput.add(new StreamRecord<>(
                                WindowedValue.of(KV.of("key1", 3),
                                                new Instant(initialTime + 
10000),
                                                new IntervalWindow(new 
Instant(5000), new Instant(15000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 10000));
                expectedOutput.add(new StreamRecord<>(
                                WindowedValue.of(KV.of("key2", 1),
                                                new Instant(initialTime + 
19500),
                                                new IntervalWindow(new 
Instant(10000), new Instant(20000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 19500));
                expectedOutput.add(new Watermark(initialTime + 20000));
 
                expectedOutput.add(new StreamRecord<>(
@@ -250,13 +251,13 @@ public class GroupAlsoByWindowTest {
                                                 */
                                                new IntervalWindow(new 
Instant(15000), new Instant(25000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 20000));
                expectedOutput.add(new StreamRecord<>(
                                WindowedValue.of(KV.of("key1", 8),
                                                new Instant(initialTime + 
20000),
                                                new IntervalWindow(new 
Instant(15000), new Instant(25000)),
                                                PaneInfo.createPane(true, true, 
PaneInfo.Timing.ON_TIME))
-                               , initialTime));
+                               , initialTime + 20000));
                expectedOutput.add(new Watermark(initialTime + 25000));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -272,13 +273,13 @@ public class GroupAlsoByWindowTest {
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
                expectedOutput.add(new Watermark(initialTime + 10000));
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
10000));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
19500));
                expectedOutput.add(new Watermark(initialTime + 20000));
 
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -295,13 +296,13 @@ public class GroupAlsoByWindowTest {
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
                expectedOutput.add(new Watermark(initialTime + 10000));
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 
19500));
                expectedOutput.add(new Watermark(initialTime + 20000));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
@@ -326,21 +327,21 @@ public class GroupAlsoByWindowTest {
                 * */
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 
19500));
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 
1200));
 
                expectedOutput.add(new Watermark(initialTime + 10000));
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 
19500));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
19500));
 
                expectedOutput.add(new Watermark(initialTime + 20000));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -348,7 +349,7 @@ public class GroupAlsoByWindowTest {
                testHarness.close();
        }
 
-       // Disabled
+       @Test
        public void testCompoundAccumulatingPanesProgram() throws Exception {
                WindowingStrategy strategy = 
fixedWindowWithCompoundTriggerStrategyAcc;
                long initialTime = 0L;
@@ -357,21 +358,21 @@ public class GroupAlsoByWindowTest {
                ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+                               new Instant(initialTime + 1), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+                               new Instant(initialTime + 10000), null, 
PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 
19500));
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+                               new Instant(initialTime + 1200), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 
1200));
 
                expectedOutput.add(new Watermark(initialTime + 10000));
 
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 
19500));
                expectedOutput.add(new 
StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+                               new Instant(initialTime + 19500), null, 
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 
19500));
 
                expectedOutput.add(new Watermark(initialTime + 20000));
                TestHarnessUtil.assertOutputEqualsSorted("Output was not 
correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -434,11 +435,12 @@ public class GroupAlsoByWindowTest {
                                StreamRecord<WindowedValue<KV<String, 
Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
                                StreamRecord<WindowedValue<KV<String, 
Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
 
-                               if (sr0.getTimestamp() != sr1.getTimestamp()) {
-                                       return (int) (sr0.getTimestamp() - 
sr1.getTimestamp());
+                               int comparison = (int) 
(sr0.getValue().getTimestamp().getMillis() - 
sr1.getValue().getTimestamp().getMillis());
+                               if (comparison != 0) {
+                                       return comparison;
                                }
 
-                               int comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+                               comparison = 
sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
                                if(comparison == 0) {
                                        comparison = Integer.compare(
                                                        
sr0.getValue().getValue().getValue(),

Reply via email to