johnyangk commented on a change in pull request #172: [NEMO-270] Test different 
triggers in GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172#discussion_r238538386
 
 

 ##########
 File path: 
compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 ##########
 @@ -248,4 +252,120 @@ public void test() {
 
     doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+    final Duration lateness = Duration.standardSeconds(1);
+    final AfterWatermark.AfterWatermarkEarlyAndLate trigger = 
AfterWatermark.pastEndOfWindow()
+      // early firing
+      .withEarlyFirings(
+        AfterProcessingTime
+          .pastFirstElementInPane()
+          // early firing 1 sec after receiving an element
+          .plusDelayOf(Duration.millis(1000)))
+      // late firing: Fire on any late data.
+      .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+    final FixedWindows window = (FixedWindows) Window.into(
+      FixedWindows.of(Duration.standardSeconds(5)))
+      // lateness
+      .withAllowedLateness(lateness)
+      .triggering(trigger)
+      .accumulatingFiredPanes().getWindowFn();
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+      new GroupByKeyAndWindowDoFnTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(window).withTrigger(trigger)
+          .withMode(ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(lateness),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(NULL_INPUT_CODER),
+        DisplayData.none());
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new 
TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), new Instant(1), window.assignWindow(new 
Instant(1)), PaneInfo.NO_FIRING));
+
+    // early firing is not related to the watermark progress
+    doFnTransform.onWatermark(new Watermark(2));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), new Instant(3), window.assignWindow(new 
Instant(3)), PaneInfo.NO_FIRING));
+    // EARLY firing... waiting >= 1 sec
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    doFnTransform.onWatermark(new Watermark(5));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    // ACCUMULATION MODE
+    checkOutput(KV.of("1", Arrays.asList("hello", "world")), 
oc.outputs.get(0).getValue());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    // ON TIME
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), 
PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(5001));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    // ACCUMULATION MODE
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), 
oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // actual window: [0-5000)
+    // allowed lateness: 1000 (ms)
+    // current watermark: 5001
+    // data: 4500
+    // the data timestamp + allowed lateness > current watermark,
+    // so it should be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello!"), new Instant(4500),
+      window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6000));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    // The data should  be accumulated to the previous window because it 
allows 1 second lateness
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "hello!")), 
oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // data timestamp: 4800
+    // current watermark: 6000
+    // data timestamp + allowed lateness < current watermark
+    // It should not be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello!"), new Instant(4800),
 
 Review comment:
   hello -> bye 
   
   😄 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to