lukecwik commented on a change in pull request #15540:
URL: https://github.com/apache/beam/pull/15540#discussion_r737900079



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Instant outputTimestamp = context.timestamp().minus(context.element());
+        try {
+          context.outputWithTimestamp(CORRECT_ELEMENT, outputTimestamp);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, 
String> {
+      static final String TIMER_ID = "testTimerId";
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      @TimerId(TIMER_ID)
+      private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      private TimerSkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(TIMER_ID) 
Timer timer) {
+

Review comment:
       ```suggestion
   ```

##########
File path: 
runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
##########
@@ -865,6 +871,28 @@ public BundleFinalizer bundleFinalizer() {
       throw new UnsupportedOperationException(
           "Bundle finalization is not supported in non-portable pipelines.");
     }

Review comment:
       We should be checking timestamp `OnWindowExpiration#outputWithTimestamp` 
as well:
   
https://github.com/apache/beam/blob/7b0de72a84298f19365d3a2166822105d534c92b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1053
   
https://github.com/apache/beam/blob/7b0de72a84298f19365d3a2166822105d534c92b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1063

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Instant outputTimestamp = context.timestamp().minus(context.element());
+        try {
+          context.outputWithTimestamp(CORRECT_ELEMENT, outputTimestamp);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, 
String> {

Review comment:
       ```suggestion
       private static class ProcessElementTimerSkewingDoFn extends 
DoFn<KV<String, Duration>, String> {
   ```

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {

Review comment:
       nit: I would consider combining the DoFns into three variants:
   * ProcessElementTimestampSkewingDoFn
   * OnTimerTimestampSkewingDoFn
   * OnWindowExpirationTimestampSkewingDoFn
   
   and ensure that you test the normal `outputWithTimestamp` and timer output 
timestamp in one go instead of trying to have a different DoFn for each. This 
will also help with the currently confusing naming that exists (including the 
names I suggested).

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Instant outputTimestamp = context.timestamp().minus(context.element());
+        try {
+          context.outputWithTimestamp(CORRECT_ELEMENT, outputTimestamp);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, 
String> {
+      static final String TIMER_ID = "testTimerId";
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      @TimerId(TIMER_ID)
+      private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      private TimerSkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(TIMER_ID) 
Timer timer) {
+
+        Instant outputTimestamp = 
context.timestamp().minus(context.element().getValue());
+        try {
+          timer.withOutputTimestamp(outputTimestamp).set(new Instant(0));
+          context.output(CORRECT_ELEMENT);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.

Review comment:
       ditto on messaging consolidation

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Instant outputTimestamp = context.timestamp().minus(context.element());
+        try {
+          context.outputWithTimestamp(CORRECT_ELEMENT, outputTimestamp);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, 
String> {
+      static final String TIMER_ID = "testTimerId";
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      @TimerId(TIMER_ID)
+      private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      private TimerSkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(TIMER_ID) 
Timer timer) {
+
+        Instant outputTimestamp = 
context.timestamp().minus(context.element().getValue());
+        try {
+          timer.withOutputTimestamp(outputTimestamp).set(new Instant(0));
+          context.output(CORRECT_ELEMENT);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @OnTimer(TIMER_ID)
+      public void onTimer(OnTimerContext context) {}
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerOutputSkewingDoFn extends DoFn<KV<String, 
String>, String> {

Review comment:
       ```suggestion
       private static class OnTimerOutputSkewingDoFn extends DoFn<KV<String, 
String>, String> {
   ```

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Instant outputTimestamp = context.timestamp().minus(context.element());
+        try {
+          context.outputWithTimestamp(CORRECT_ELEMENT, outputTimestamp);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerSkewingDoFn extends DoFn<KV<String, Duration>, 
String> {
+      static final String TIMER_ID = "testTimerId";
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      @TimerId(TIMER_ID)
+      private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+      private TimerSkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(TIMER_ID) 
Timer timer) {
+
+        Instant outputTimestamp = 
context.timestamp().minus(context.element().getValue());
+        try {
+          timer.withOutputTimestamp(outputTimestamp).set(new Instant(0));
+          context.output(CORRECT_ELEMENT);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.
+          if (e.getMessage().contains("timestamp " + outputTimestamp)) {
+            context.output(outputTimestamp.toString());
+          }
+        }
+      }
+
+      @OnTimer(TIMER_ID)
+      public void onTimer(OnTimerContext context) {}
+
+      @Override
+      public Duration getAllowedTimestampSkew() {
+        return allowedSkew;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that creates/sets a timer with an output timestamp equal 
to the input
+     * timestamp minus the input element's value. Keys are ignored but 
required for timers.
+     */
+    private static class TimerOutputSkewingDoFn extends DoFn<KV<String, 
String>, String> {
+      static final String TIMER_ID = "testTimerId";
+      static final String CORRECT_ELEMENT = "element";
+      private final Duration allowedSkew;
+      private final Duration outputTimestampSkew;
+
+      @TimerId(TIMER_ID)
+      private static final TimerSpec timer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+      private TimerOutputSkewingDoFn(Duration allowedSkew, Duration 
outputTimestampSkew) {
+        this.allowedSkew = allowedSkew;
+        this.outputTimestampSkew = outputTimestampSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(TIMER_ID) 
Timer timer) {
+        timer.set(context.timestamp());
+      }
+
+      @OnTimer(TIMER_ID)
+      public void onTimer(OnTimerContext context) {
+        Instant outputTimestamp = 
context.timestamp().minus(outputTimestampSkew);

Review comment:
       We should also have a variant that covers timer skew within the 
`onTimer` method and also one that checks skew for `OnWindowExpiration`

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {
+      private final Duration allowedSkew;
+      static final String CORRECT_ELEMENT = "element";
+
+      private SkewingDoFn(Duration allowedSkew) {
+        this.allowedSkew = allowedSkew;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        Instant outputTimestamp = context.timestamp().minus(context.element());
+        try {
+          context.outputWithTimestamp(CORRECT_ELEMENT, outputTimestamp);
+        } catch (IllegalArgumentException e) {
+          // Errors differ between runners but at least check that the output 
timestamp is printed.

Review comment:
       I was under the impression that we consolidated all the error messages 
to be the same, is this still not the case?

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1958,6 +2071,87 @@ public Duration getAllowedTimestampSkew() {
 
       pipeline.run();
     }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    public void testRunnerNoSkew() {

Review comment:
       If we combine the DoFns into the three suggested can we rename these 
tests to:
   * testProcessElementTimestampYYYSkew()
   * testOnTimerTimestampYYYSkew()
   * testOnWindowExpirationTimestampYYYSkew()
   
   where `YYY` is `Allowed` or `Denied`
   
   otherwise you can have names like `testProcessElementOutputSkewingYYYSkew()`
   
   Finally, it seems like it would be pretty easy to combine the allowed and 
denied case into a singular test case. Will help reduce the explosion of cases 
since each validates runner test will add minutes of runtime to the Dataflow 
validates runner suite.

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1791,6 +1791,119 @@ public void finishBundle(FinishBundleContext c) {
   /** Tests to validate output timestamps. */
   @RunWith(JUnit4.class)
   public static class TimestampTests extends SharedTestBase implements 
Serializable {
+    /**
+     * A {@link DoFn} that outputs elements with timestamp equal to the input 
timestamp minus the
+     * input element.
+     */
+    private static class SkewingDoFn extends DoFn<Duration, String> {

Review comment:
       ```suggestion
       private static class ProcessElementOutputSkewingDoFn extends 
DoFn<Duration, String> {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to