taegeonum commented on a change in pull request #151: [NEMO-267] Consider 
watermark holds in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/151#discussion_r231362660
 
 

 ##########
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##########
 @@ -99,76 +106,148 @@ public void test() {
     final Instant ts6 = new Instant(1800);
     final Instant ts7 = new Instant(1900);
     final Watermark watermark3 = new Watermark(2100);
+    final Instant ts8 = new Instant(2200);
+    final Instant ts9 = new Instant(2300);
+    final Watermark watermark4 = new Watermark(3000);
+
+
+    List<IntervalWindow> sortedWindows = new 
ArrayList<>(slidingWindows.assignWindows(ts1));
+    Collections.sort(sortedWindows, new Comparator<IntervalWindow>() {
+      @Override
+      public int compare(IntervalWindow o1, IntervalWindow o2) {
+        return o1.maxTimestamp().compareTo(o2.maxTimestamp());
+      }
+    });
+
+    // [0---1000)
+    final IntervalWindow window0 = sortedWindows.get(0);
+    // [0---2000)
+    final IntervalWindow window1 = sortedWindows.get(1);
+
+    sortedWindows.clear();
+    sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
+    Collections.sort(sortedWindows, new Comparator<IntervalWindow>() {
+      @Override
+      public int compare(IntervalWindow o1, IntervalWindow o2) {
+        return o1.maxTimestamp().compareTo(o2.maxTimestamp());
+      }
+    });
+
+    // [1000--3000)
+    final IntervalWindow window2 = sortedWindows.get(1);
 
 
     final Transform.Context context = mock(Transform.Context.class);
     final TestOutputCollector<KV<String, Iterable<String>>> oc = new 
TestOutputCollector();
     doFnTransform.prepare(context, oc);
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "hello"), ts1, fixedwindows.assignWindow(ts1), 
PaneInfo.NO_FIRING));
+      KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), 
PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "world"), ts2, fixedwindows.assignWindow(ts2), 
PaneInfo.NO_FIRING));
+      KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), 
PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "hello"), ts3, fixedwindows.assignWindow(ts3), 
PaneInfo.NO_FIRING));
+      KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), 
PaneInfo.NO_FIRING));
 
     doFnTransform.onWatermark(watermark);
 
+    // output
+    // 1: ["hello", "world"]
+    // 2: ["hello"]
     Collections.sort(oc.outputs, (o1, o2) -> 
o1.getValue().getKey().compareTo(o2.getValue().getKey()));
 
     // windowed result for key 1
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts1)), 
oc.outputs.get(0).getWindows());
+    assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
     checkOutput(KV.of("1", Arrays.asList("hello", "world")), 
oc.outputs.get(0).getValue());
 
     // windowed result for key 2
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts3)), 
oc.outputs.get(1).getWindows());
+    assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
     checkOutput(KV.of("2", Arrays.asList("hello")), 
oc.outputs.get(1).getValue());
 
+    assertEquals(2, oc.outputs.size());
+    assertEquals(1, oc.watermarks.size());
+
     // check output watermark
-    assertEquals(fixedwindows.assignWindow(ts1).maxTimestamp().getMillis(),
+    assertEquals(1000,
       oc.watermarks.get(0).getTimestamp());
 
     oc.outputs.clear();
     oc.watermarks.clear();
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("1", "a"), ts4, fixedwindows.assignWindow(ts4), 
PaneInfo.NO_FIRING));
+      KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), 
PaneInfo.NO_FIRING));
 
     // do not emit anything
     doFnTransform.onWatermark(watermark2);
     assertEquals(0, oc.outputs.size());
     assertEquals(0, oc.watermarks.size());
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "a"), ts5, fixedwindows.assignWindow(ts5), 
PaneInfo.NO_FIRING));
+      KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), 
PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("3", "a"), ts6, fixedwindows.assignWindow(ts6), 
PaneInfo.NO_FIRING));
+      KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), 
PaneInfo.NO_FIRING));
 
     doFnTransform.onData(WindowedValue.of(
-      KV.of("2", "b"), ts7, fixedwindows.assignWindow(ts7), 
PaneInfo.NO_FIRING));
+      KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), 
PaneInfo.NO_FIRING));
 
-    // emit windowed value
+    // emit window1
     doFnTransform.onWatermark(watermark3);
 
+    // output
+    // 1: ["hello", "world", "a"]
+    // 2: ["hello"]
+    // 3: ["a", "a", "b"]
     Collections.sort(oc.outputs, (o1, o2) -> 
o1.getValue().getKey().compareTo(o2.getValue().getKey()));
 
+
     // windowed result for key 1
-    assertEquals(Arrays.asList(fixedwindows.assignWindow(ts4)), 
oc.outputs.get(0).getWindows());
-    checkOutput(KV.of("1", Arrays.asList("a")), oc.outputs.get(0).getValue());
+    assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
 
 Review comment:
   I will add TODO for the test

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to