johnyangk closed pull request #172: [NEMO-270] Test different triggers in 
GroupByKeyAndWindowDoFnTransformTest
URL: https://github.com/apache/incubator-nemo/pull/172
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 06dde5861..a4315082a 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -24,6 +24,7 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -240,15 +241,6 @@ private void triggerTimers(final K key,
       // The DoFnRunner interface requires WindowedValue,
       // but this windowed value is actually not used in the ReduceFnRunner 
internal.
       
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
-
-      // output watermark
-      // we set output watermark to the minimum of the timer data
-      long keyOutputTimestamp = Long.MAX_VALUE;
-      for (final TimerInternals.TimerData timer : timerDataList) {
-        keyOutputTimestamp = Math.min(keyOutputTimestamp, 
timer.getTimestamp().getMillis());
-      }
-
-      timerInternals.advanceOutputWatermark(new Instant(keyOutputTimestamp));
     }
   }
 
@@ -349,14 +341,21 @@ public TimerInternals timerInternalsForKey(final K key) {
 
     @Override
     public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
-      // adds the output timestamp to the watermark hold of each key
-      // +1 to the output timestamp because if the window is [0-5000), the 
timestamp is 4999
-      // TODO #270: consider early firing
-      // TODO #270: This logic may not be applied to early firing outputs
-      keyAndWatermarkHoldMap.put(output.getValue().getKey(),
-        new Watermark(output.getTimestamp().getMillis() + 1));
+
+      // The watermark advances only in ON_TIME
+      if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
+        final K key = output.getValue().getKey();
+        final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
+          inMemoryTimerInternalsFactory.timerInternalsForKey(key);
+        keyAndWatermarkHoldMap.put(key,
+          // adds the output timestamp to the watermark hold of each key
+          // +1 to the output timestamp because if the window is [0-5000), the 
timestamp is 4999
+          new Watermark(output.getTimestamp().getMillis() + 1));
+        timerInternals.advanceOutputWatermark(new 
Instant(output.getTimestamp().getMillis() + 1));
+      }
       outputCollector.emit(output);
     }
+
     @Override
     public void emitWatermark(final Watermark watermark) {
       outputCollector.emitWatermark(watermark);
diff --git 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index 474c79c0d..f0749c0c0 100644
--- 
a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ 
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -33,16 +33,20 @@
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-import static java.util.Collections.emptyList;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.EARLY;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.LATE;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static 
org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
-// TODO #270: Test different triggers
 public final class GroupByKeyAndWindowDoFnTransformTest {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName());
   private final static Coder NULL_INPUT_CODER = null;
   private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
 
@@ -248,4 +252,123 @@ public void test() {
 
     doFnTransform.close();
   }
+
+  /**
+   * Test complex triggers that emit early and late firing.
+   */
+  @Test
+  public void eventTimeTriggerTest() {
+    final Duration lateness = Duration.standardSeconds(1);
+    final AfterWatermark.AfterWatermarkEarlyAndLate trigger = 
AfterWatermark.pastEndOfWindow()
+      // early firing
+      .withEarlyFirings(
+        AfterProcessingTime
+          .pastFirstElementInPane()
+          // early firing 1 sec after receiving an element
+          .plusDelayOf(Duration.millis(1000)))
+      // late firing: Fire on any late data.
+      .withLateFirings(AfterPane.elementCountAtLeast(1));
+
+    final FixedWindows window = (FixedWindows) Window.into(
+      FixedWindows.of(Duration.standardSeconds(5)))
+      // lateness
+      .withAllowedLateness(lateness)
+      .triggering(trigger)
+      // TODO #308: Test discarding of refinements
+      .accumulatingFiredPanes().getWindowFn();
+
+    final TupleTag<String> outputTag = new TupleTag<>("main-output");
+    final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
+      new GroupByKeyAndWindowDoFnTransform(
+        NULL_OUTPUT_CODERS,
+        outputTag,
+        WindowingStrategy.of(window).withTrigger(trigger)
+          .withMode(ACCUMULATING_FIRED_PANES)
+        .withAllowedLateness(lateness),
+        PipelineOptionsFactory.as(NemoPipelineOptions.class),
+        SystemReduceFn.buffering(NULL_INPUT_CODER),
+        DisplayData.none());
+
+
+    final Transform.Context context = mock(Transform.Context.class);
+    final TestOutputCollector<KV<String, Iterable<String>>> oc = new 
TestOutputCollector();
+    doFnTransform.prepare(context, oc);
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello"), new Instant(1), window.assignWindow(new 
Instant(1)), PaneInfo.NO_FIRING));
+
+    // early firing is not related to the watermark progress
+    doFnTransform.onWatermark(new Watermark(2));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "world"), new Instant(3), window.assignWindow(new 
Instant(3)), PaneInfo.NO_FIRING));
+    // EARLY firing... waiting >= 1 sec
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    // GBKTransform emits data when receiving watermark
+    // TODO #250: element-wise processing
+    doFnTransform.onWatermark(new Watermark(5));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
+    // ACCUMULATION MODE
+    checkOutput(KV.of("1", Arrays.asList("hello", "world")), 
oc.outputs.get(0).getValue());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    oc.outputs.clear();
+
+    // ON TIME
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), 
PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(5001));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    // ACCUMULATION MODE
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), 
oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // actual window: [0-5000)
+    // allowed lateness: 1000 (ms)
+    // current watermark: 5001
+    // data: 4500
+    // the data timestamp + allowed lateness > current watermark,
+    // so it should be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "bye!"), new Instant(4500),
+      window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6000));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    // The data should  be accumulated to the previous window because it 
allows 1 second lateness
+    checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), 
oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+    // LATE DATA
+    // data timestamp: 4800
+    // current watermark: 6000
+    // data timestamp + allowed lateness < current watermark
+    // It should not be accumulated to the prev window
+    doFnTransform.onData(WindowedValue.of(
+      KV.of("1", "hello again!"), new Instant(4800),
+      window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
+    doFnTransform.onWatermark(new Watermark(6300));
+    assertEquals(1, oc.outputs.size());
+    assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
+    LOG.info("Output: {}", oc.outputs.get(0));
+    checkOutput(KV.of("1", Arrays.asList("hello again!")), 
oc.outputs.get(0).getValue());
+    oc.outputs.clear();
+
+
+    doFnTransform.close();
+
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to