lukecwik commented on a change in pull request #15540:
URL: https://github.com/apache/beam/pull/15540#discussion_r727598137



##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1897,14 +1898,26 @@ private Instant minTargetAndGcTime(Instant target) {
       return Timer.cleared(userKey, dynamicTimerTag, 
Collections.singletonList(boundedWindow));
     }
 
+    @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
     private Timer<K> getTimerForTime(Instant scheduledTime) {
       if (outputTimestamp != null) {
-        checkArgument(
-            !outputTimestamp.isBefore(elementTimestampOrTimerHoldTimestamp),
-            "output timestamp %s should be after input message timestamp or 
output timestamp of"
-                + " firing timers %s",
-            outputTimestamp,
-            elementTimestampOrTimerHoldTimestamp);
+        Instant lowerBound;
+        Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        try {
+          lowerBound = 
elementTimestampOrTimerHoldTimestamp.minus(doFn.getAllowedTimestampSkew());
+        } catch (ArithmeticException e) {
+          lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+        if (outputTimestamp.isBefore(lowerBound) || 
outputTimestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
           try {
             lowerBound = 
elementTimestampOrTimerHoldTimestamp.minus(doFn.getAllowedTimestampSkew());
           } catch (ArithmeticException e) {
             lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
           }
           if (outputTimestamp.isBefore(lowerBound) || 
outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1999,6 +2012,28 @@ public void set(String dynamicTimerTag, Instant 
absoluteTime) {
     }
   }
 
+  @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
+  private void checkTimestamp(Instant timestamp) {
+    Instant lowerBound;
+    Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    try {
+      lowerBound = 
currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
+    } catch (ArithmeticException e) {
+      lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
       try {
         lowerBound = 
currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
       } catch (ArithmeticException e) {
         lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
       }
       if (timestamp.isBefore(lowerBound) || 
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -1190,13 +1195,24 @@ public Timer withOutputTimestamp(Instant 
outputTimestamp) {
      * </ul>
      */
     private void setAndVerifyOutputTimestamp() {
-
       if (outputTimestamp != null) {
-        checkArgument(
-            !outputTimestamp.isBefore(elementInputTimestamp),
-            "output timestamp %s should be after input message timestamp or 
output timestamp of firing timers %s",
-            outputTimestamp,
-            elementInputTimestamp);
+        Instant lowerBound;
+        Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        try {
+          lowerBound = 
elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
+        } catch (ArithmeticException e) {
+          lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+        if (outputTimestamp.isBefore(lowerBound) || 
outputTimestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
           try {
             lowerBound = 
elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
           } catch (ArithmeticException e) {
             lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
           }
           if (outputTimestamp.isBefore(lowerBound) || 
outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +397,186 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testRunnerNoSkew() {
+    PCollection<Duration> input =
+        p.apply(
+            "create",
+            Create.timestamped(
+                Arrays.asList(new Duration(0L), new Duration(1L)), 
Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output 
timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(0L)));
+    thrown.expectMessage("output");
+    p.run();
+  }
+
+  @Test
+  @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+  public void testRunnerAllowedSkew() {
+    PCollection<Duration> input =
+        p.apply(
+            "create",
+            Create.timestamped(
+                Arrays.asList(new Duration(0L), new Duration(1L)), 
Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(new Duration(2L))));
+    p.run();
+  }
+
+  /**
+   * Demonstrates that attempting to set a timer with an output timestamp 
before the timestamp of
+   * the current element with zero {@link DoFn#getAllowedTimestampSkew() 
allowed timestamp skew}
+   * throws.
+   */
+  @Test
+  public void testTimerBackwardsInTimeNoSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.ZERO);
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    // A timer with output timestamp at the current timestamp is fine.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(KV.of("1", 
Duration.ZERO), new Instant(0)));
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("output timestamp of firing timers");
+    thrown.expectMessage(
+        String.format("output timestamp %s", new 
Instant(0).minus(Duration.millis(1L))));
+    thrown.expectMessage(
+        String.format(
+            "allowed skew %s", 
PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+
+    // A timer with output timestamp before (current time - skew) is forbidden
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.millis(1L)), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to have a timer with output timestamp before 
the timestamp of the
+   * current element plus the value of {@link DoFn#getAllowedTimestampSkew()} 
throws, but between
+   * that value and the current timestamp succeeds.
+   */
+  @Test
+  public void testTimerSkew() {
+    TimerSkewingDoFn fn = new TimerSkewingDoFn(Duration.standardMinutes(10L));
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    // Timer with output timestamp between "now" and "now - allowed skew" 
succeeds.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("1", Duration.standardMinutes(5L)), new Instant(0)));
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("output timestamp of firing timers");
+    thrown.expectMessage(
+        String.format("output timestamp %s", new 
Instant(0).minus(Duration.standardHours(1L))));
+    thrown.expectMessage(
+        String.format(
+            "allowed skew %s",
+            
PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
+    // Timer with output timestamp before "now - allowed skew" fails.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.standardHours(1L)), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to output an element with a timestamp before 
the current one
+   * always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to 
{@link Long#MAX_VALUE}
+   * milliseconds.
+   */
+  @Test
+  public void testTimerInfiniteSkew() {
+    TimerSkewingDoFn fn = new 
TimerSkewingDoFn(Duration.millis(Long.MAX_VALUE));
+    DoFnRunner<KV<String, Duration>, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<>(),
+            Collections.emptyList(),
+            mockStepContext,
+            null,
+            Collections.emptyMap(),
+            WindowingStrategy.of(new GlobalWindows()),
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    runner.startBundle();
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("1", Duration.millis(1L)), new Instant(0)));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of("2", Duration.millis(1L)),
+            BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1))));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            KV.of(
+                "3",
+                // This is the maximum amount a timestamp in beam can move 
(from the maximum
+                // timestamp
+                // to the minimum timestamp).
+                Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())
+                    
.minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()))),
+            BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
+  @Test
+  @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+  public void testRunnerTimerNoSkew() {
+    List<KV<String, Duration>> durations =
+        Arrays.asList(KV.of("0", new Duration(0L)), KV.of("2", new 
Duration(1L)));
+    PCollection<KV<String, Duration>> input =
+        p.apply("create", Create.timestamped(durations, Arrays.asList(0L, 
2L)));
+    input.apply(ParDo.of(new TimerSkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output 
timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(1L)));

Review comment:
       I don't think this will work for Dataflow streaming.
   
   This passes on Jenkins because all of these tests don't run in both 
streaming and batch modes but will not work when we run tests inside Google.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3878,6 +3885,182 @@ public void 
testProcessElementForWindowedTruncateAndSizeRestriction() throws Exc
     }
   }
 
+  @RunWith(JUnit4.class)
+  public static class ExceptionThrowingExecutionTest {
+    @Rule public final ExpectedException thrown = ExpectedException.none();
+
+    public static final String TEST_TRANSFORM_ID = "pTransformId";
+
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<String, String> {
+      private final Duration allowedSkew;
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Duration duration = new Duration(Long.valueOf(context.element()));
+        context.outputWithTimestamp(context.element(), 
context.timestamp().minus(duration));
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    @Test
+    public void testDoFnSkewNotAllowed() throws Exception {
+      Pipeline p = Pipeline.create();
+      PCollection<String> valuePCollection = p.apply(Create.of("0", "1"));
+      PCollection<String> outputPCollection =
+          valuePCollection.apply(TEST_TRANSFORM_ID, ParDo.of(new 
SkewingDoFn(Duration.ZERO)));
+
+      SdkComponents sdkComponents = SdkComponents.create(p.getOptions());
+      RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, 
sdkComponents);
+      String inputPCollectionId = 
sdkComponents.registerPCollection(valuePCollection);
+      String outputPCollectionId = 
sdkComponents.registerPCollection(outputPCollection);
+      RunnerApi.PTransform pTransform =
+          pProto
+              .getComponents()
+              .getTransformsOrThrow(
+                  pProto
+                      .getComponents()
+                      .getTransformsOrThrow(TEST_TRANSFORM_ID)
+                      .getSubtransforms(0));
+
+      List<WindowedValue<String>> mainOutputValues = new ArrayList<>();
+      MetricsContainerStepMap metricsContainerRegistry = new 
MetricsContainerStepMap();
+      PCollectionConsumerRegistry consumers =
+          new PCollectionConsumerRegistry(
+              metricsContainerRegistry, mock(ExecutionStateTracker.class));
+
+      consumers.register(
+          outputPCollectionId,
+          TEST_TRANSFORM_ID,
+          (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) 
mainOutputValues::add,
+          StringUtf8Coder.of());
+      PTransformFunctionRegistry startFunctionRegistry =
+          new PTransformFunctionRegistry(
+              mock(MetricsContainerStepMap.class), 
mock(ExecutionStateTracker.class), "start");
+      PTransformFunctionRegistry finishFunctionRegistry =
+          new PTransformFunctionRegistry(
+              mock(MetricsContainerStepMap.class), 
mock(ExecutionStateTracker.class), "finish");
+      List<ThrowingRunnable> teardownFunctions = new ArrayList<>();
+
+      new FnApiDoFnRunner.Factory<>()
+          .createRunnerForPTransform(
+              PipelineOptionsFactory.create(),
+              null /* beamFnDataClient */,
+              null /* beamFnStateClient */,
+              null /* beamFnTimerClient */,
+              TEST_TRANSFORM_ID,
+              pTransform,
+              Suppliers.ofInstance("57L")::get,
+              pProto.getComponents().getPcollectionsMap(),
+              pProto.getComponents().getCodersMap(),
+              pProto.getComponents().getWindowingStrategiesMap(),
+              consumers,
+              startFunctionRegistry,
+              finishFunctionRegistry,
+              null /* addResetFunction */,
+              teardownFunctions::add,
+              null /* addProgressRequestCallback */,
+              null /* splitListener */,
+              null /* bundleFinalizer */);
+
+      thrown.expect(UserCodeException.class);
+      thrown.expectMessage(String.format("timestamp %s", new 
Instant(0).minus(new Duration(1L))));
+      thrown.expectMessage(
+          String.format(
+              "allowed skew (%s)", 
PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+
+      Iterables.getOnlyElement(startFunctionRegistry.getFunctions()).run();
+      mainOutputValues.clear();
+
+      FnDataReceiver<WindowedValue<?>> mainInput =
+          consumers.getMultiplexingConsumer(inputPCollectionId);
+      mainInput.accept(valueInGlobalWindow("0"));
+      mainInput.accept(timestampedValueInGlobalWindow("1", new Instant(0L)));

Review comment:
       Please swap to use `assertThrows` since `thrown.expect` is deprecated 
because it doesn't do a good job at showing which line of code is causing the 
exception.

##########
File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -472,6 +663,37 @@ public Duration getAllowedTimestampSkew() {
     }
   }
 
+  /**
+   * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input timestamp
+   * minus the input element's value. Keys are ignored but required for timers.
+   */
+  private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, 
Duration> {
+    static final String TIMER_ID = "testTimerId";
+    private final Duration allowedSkew;
+
+    @TimerId(TIMER_ID)
+    private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+    private TimerSkewingDoFn(Duration allowedSkew) {
+      this.allowedSkew = allowedSkew;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context, @TimerId(TIMER_ID) 
Timer timer) {
+      timer
+          
.withOutputTimestamp(context.timestamp().minus(context.element().getValue()))
+          .set(new Instant(0));
+    }
+
+    @OnTimer(TIMER_ID)
+    public void onTimer() {}

Review comment:
       It would be nice to be able to test the onTimer variant as well and not 
just when timers are set within processElement.

##########
File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +399,180 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class})
+  public void testRunnerNoSkew() {
+    PCollection<Duration> input =
+        p.apply("create", Create.timestamped(Arrays.asList(new Duration(0L), 
new Duration(1L)), Arrays.asList(0L, 1L)));
+    input.apply(ParDo.of(new SkewingDoFn(Duration.ZERO)));
+    // The errors differ between runners but at least check that the output 
timestamp is printed.
+    thrown.expectMessage(String.format("%s", new Instant(0L)));

Review comment:
       The issue is that `thrown.expect` is going to rely on Pipeline.run 
throwing the underlying cause for the job failure. I don't think all runners 
properly propagate the error (e.g. Dataflow streaming) that caused the pipeline 
to fail. The other failure tests in `ParDoTest.java` use `NeedsRunner` which 
run with the DirectRunner and not all runners. So if you want to only test 
DirectRunner mark as `NeedsRunner` instead of `ValidatesRunner`

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -437,19 +437,24 @@ public Instant timestamp() {
 
     @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
     private void checkTimestamp(Instant timestamp) {
-      // The documentation of getAllowedTimestampSkew explicitly permits 
Long.MAX_VALUE to be used
-      // for infinite skew. Defend against underflow in that case for 
timestamps before the epoch
-      if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
-          && 
timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+      Instant lowerBound;
+      Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      try {
+        lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
+      } catch (ArithmeticException e) {
+        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+      if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {

Review comment:
       ```suggestion
         try {
           lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
         } catch (ArithmeticException e) {
           lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
         }
         if (timestamp.isBefore(lowerBound) || 
timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
   ```

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -437,19 +437,24 @@ public Instant timestamp() {
 
     @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
     private void checkTimestamp(Instant timestamp) {
-      // The documentation of getAllowedTimestampSkew explicitly permits 
Long.MAX_VALUE to be used
-      // for infinite skew. Defend against underflow in that case for 
timestamps before the epoch
-      if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
-          && 
timestamp.isBefore(elem.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+      Instant lowerBound;
+      Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      try {
+        lowerBound = elem.getTimestamp().minus(fn.getAllowedTimestampSkew());
+      } catch (ArithmeticException e) {
+        lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+      if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {
         throw new IllegalArgumentException(
             String.format(
                 "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
-                    + "timestamp of the current input (%s) minus the allowed 
skew (%s). See the "
-                    + "DoFn#getAllowedTimestampSkew() Javadoc for details on 
changing the allowed "
-                    + "skew.",
+                    + "timestamp of the current input (%s) minus the allowed 
skew (%s) and no "
+                    + "later than %s. See the DoFn#getAllowedTimestampSkew() 
Javadoc for details "
+                    + "on changing the allowed skew.",
                 timestamp,
                 elem.getTimestamp(),
-                
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod())));
+                
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+                upperBound));

Review comment:
       ```suggestion
                   BoundedWindow.TIMESTAMP_MAX_VALUE));
   ```

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -1190,13 +1195,24 @@ public Timer withOutputTimestamp(Instant 
outputTimestamp) {
      * </ul>
      */
     private void setAndVerifyOutputTimestamp() {
-
       if (outputTimestamp != null) {
-        checkArgument(
-            !outputTimestamp.isBefore(elementInputTimestamp),
-            "output timestamp %s should be after input message timestamp or 
output timestamp of firing timers %s",
-            outputTimestamp,
-            elementInputTimestamp);
+        Instant lowerBound;
+        Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        try {
+          lowerBound = 
elementInputTimestamp.minus(fn.getAllowedTimestampSkew());
+        } catch (ArithmeticException e) {
+          lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+        }
+        if (outputTimestamp.isBefore(lowerBound) || 
outputTimestamp.isAfter(upperBound)) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "output timestamp %s (allowed skew %s) should be after input 
message timestamp or"
+                      + " output timestamp of firing timers %s and before %s",
+                  outputTimestamp,
+                  
PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()),
+                  elementInputTimestamp,
+                  upperBound));

Review comment:
       The previous message isn't great. Can we improve this to be the similar 
to the other message like:
   ```
   Cannot output timer with timestamp %s. Output timestamps must be no earlier 
than the timestamp of the current input (%s) minus the allowed skew (%s) and no 
later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on 
changing the allowed skew.
   ```
   
   Ditto here and in `FnApiDoFnRunner.java`

##########
File path: 
runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
##########
@@ -386,6 +397,186 @@ public void testInfiniteSkew() {
             BoundedWindow.TIMESTAMP_MAX_VALUE));
   }
 
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testRunnerNoSkew() {

Review comment:
       Please move the `ValidatesRunner` tests to `ParDoTest.java` since that 
is where the bulk of the other ParDo specific `ValidatesRunner` and 
`NeedsRunner` test exist.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1999,6 +2012,28 @@ public void set(String dynamicTimerTag, Instant 
absoluteTime) {
     }
   }
 
+  @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, 
but must be respected
+  private void checkTimestamp(Instant timestamp) {
+    Instant lowerBound;
+    Instant upperBound = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    try {
+      lowerBound = 
currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
+    } catch (ArithmeticException e) {
+      lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+    if (timestamp.isBefore(lowerBound) || timestamp.isAfter(upperBound)) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot output with timestamp %s. Output timestamps must be no 
earlier than the "
+                  + "timestamp of the current input (%s) minus the allowed 
skew (%s). See the "

Review comment:
       This message differs from the one in SimpleDoFnRunner as it is missing 
the upper bound.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to