laraschmidt commented on a change in pull request #15540:
URL: https://github.com/apache/beam/pull/15540#discussion_r731267716



##########
File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +397,186 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testRunnerNoSkew() {
+    PCollection<Duration> input =
+        p.apply(
+            "create",
+            Create.timestamped(
+                Arrays.asList(new Duration(0L), new Duration(1L)), 
Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output 
timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(0L)));
+    thrown.expectMessage("output");
+    p.run();
+  }
+
+  @Test
+  @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+  public void testRunnerAllowedSkew() {
+    PCollection<Duration> input =
+        p.apply(
+            "create",
+            Create.timestamped(
+                Arrays.asList(new Duration(0L), new Duration(1L)), 
Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(new Duration(2L))));
+    p.run();
+  }
+
+  /**
+   * Demonstrates that attempting to set a timer with an output timestamp 
before the timestamp of
+   * the current element with zero {@link DoFn#getAllowedTimestampSkew() 
allowed timestamp skew}
+   * throws.
+   */
+  @Test
+  public void testTimerBackwardsInTimeNoSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.ZERO);
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    // A timer with output timestamp at the current timestamp is fine.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(KV.of("1", 
Duration.ZERO), new Instant(0)));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("output timestamp of firing timers");
+    thrown.expectMessage(
+        String.format("output timestamp %s", new 
Instant(0).minus(Duration.millis(1L))));
+    thrown.expectMessage(
+        String.format(
+            "allowed skew %s", 
PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+
+    // A timer with output timestamp before (current time - skew) is forbidden
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.millis(1L)), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to have a timer with output timestamp before 
the timestamp of the
+   * current element plus the value of {@link DoFn#getAllowedTimestampSkew()} 
throws, but between
+   * that value and the current timestamp succeeds.
+   */
+  @Test
+  public void testTimerSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.standardMinutes(10L));
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    // Timer with output timestamp between "now" and "now - allowed skew" 
succeeds.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("1", Duration.standardMinutes(5L)), new Instant(0)));
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("output timestamp of firing timers");
+    thrown.expectMessage(
+        String.format("output timestamp %s", new 
Instant(0).minus(Duration.standardHours(1L))));
+    thrown.expectMessage(
+        String.format(
+            "allowed skew %s",
+            
PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
+    // Timer with output timestamp before "now - allowed skew" fails.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.standardHours(1L)), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to output an element with a timestamp before 
the current one
+   * always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to 
{@link Long#MAX_VALUE}
+   * milliseconds.
+   */
+  @Test
+  public void testTimerInfiniteSkew() {
+    TimerSkewingDoFn fn = new 
TimerSkewingDoFn(Duration.millis(Long.MAX_VALUE));
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("1", Duration.millis(1L)), new Instant(0)));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.millis(1L)),
+            BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1))));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of(
+                "3",
+                // This is the maximum amount a timestamp in beam can move 
(from the maximum
+                // timestamp
+                // to the minimum timestamp).
+                Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())
+                    
.minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()))),
+            BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  @Test
+  @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+  public void testRunnerTimerNoSkew() {
+    List<KV<String, Duration>> durations =
+        Arrays.asList(KV.of("0", new Duration(0L)), KV.of("2", new 
Duration(1L)));
+    PCollection<KV<String, Duration>> input =
+        p.apply("create", Create.timestamped(durations, Arrays.asList(0L, 
2L)));
+    input.apply(ParDo.of(new TimerSkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output 
timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(1L)));

Review comment:
       Swapped to using PAssert and checking the PCollection output for the 
validates runner tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to