lukecwik commented on a change in pull request #15540:
URL: https://github.com/apache/beam/pull/15540#discussion_r746960039
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1957,6 +2125,84 @@ public Duration getAllowedTimestampSkew() {
pipeline.run();
}
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ public void testProcessElementSkew() {
+ List<KV<String, Duration>> durations =
+ Arrays.asList(KV.of("0", Duration.millis(0L)), KV.of("2",
Duration.millis(1L)));
+ PCollection<KV<String, Duration>> input =
+ pipeline.apply("create", Create.timestamped(durations,
Arrays.asList(0L, 1L)));
+
+ PCollection<String> noSkew =
+ input.apply("noSkew", ParDo.of(new
ProcessElementTimestampSkewingDoFn(Duration.ZERO)));
+ PAssert.that(noSkew)
+ .containsInAnyOrder(
+ TIMER_ELEMENT,
+ OUTPUT_ELEMENT,
+ TIMER_ELEMENT + new Instant(0L),
+ OUTPUT_ELEMENT + new Instant(0L));
+
+ PCollection<String> skew =
+ input.apply(
+ "skew", ParDo.of(new
ProcessElementTimestampSkewingDoFn(Duration.millis(2L))));
+ PAssert.that(skew)
+ .containsInAnyOrder(TIMER_ELEMENT, OUTPUT_ELEMENT, TIMER_ELEMENT,
OUTPUT_ELEMENT);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({UsesTimersInParDo.class, ValidatesRunner.class})
+ public void testOnTimerTimestampSkew() {
+ List<KV<String, String>> durations = Arrays.asList(KV.of("0", "0"));
+ PCollection<KV<String, String>> input =
+ pipeline.apply("create", Create.timestamped(durations,
Arrays.asList(0L)));
+ PCollection<String> noSkew =
+ input.apply(
+ "noskew",
+ ParDo.of(new OnTimerTimestampSkewingDoFn(Duration.millis(0L),
Duration.millis(3L))));
+ PAssert.that(noSkew)
+ .containsInAnyOrder(OUTPUT_ELEMENT + new Instant(-3L), TIMER_ELEMENT
+ new Instant(-3L));
+ PCollection<String> skew =
+ input.apply(
+ "skew",
+ ParDo.of(new OnTimerTimestampSkewingDoFn(Duration.millis(3L),
Duration.millis(2L))));
+ PAssert.that(skew).containsInAnyOrder(OUTPUT_ELEMENT, TIMER_ELEMENT);
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category({UsesOnWindowExpiration.class, UsesTimersInParDo.class,
ValidatesRunner.class})
+ public void testOnWindowTimestampSkew() {
+ Duration windowDuration = Duration.millis(10L);
+ List<KV<String, String>> durations = Arrays.asList(KV.of("key", "0"));
+ PCollection<KV<String, String>> input =
+ pipeline.apply("create", Create.timestamped(durations,
Arrays.asList(0L)));
+
+ PCollection<String> noSkew =
+ input
+ .apply("noSkewWindow",
Window.into(FixedWindows.of(windowDuration)))
+ .apply(
+ "noskew",
+ ParDo.of(
+ new OnWindowExpirationTimestampSkewingDoFn(
+ Duration.millis(0L), Duration.millis(3L))));
+ PAssert.that(noSkew)
+ .containsInAnyOrder(
+ OUTPUT_ELEMENT + new
Instant(windowDuration.minus(3L).minus(1L).getMillis()));
Review comment:
> Task :sdks:java:core:compileTestJava
```
/home/runner/work/beam/beam/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:2196:
warning: [JodaPlusMinusLong] Use of JodaTime's type.plus(long) or
type.minus(long) is not allowed (where <type> =
{Duration,Instant,DateTime,DateMidnight}). Please use
type.plus(Duration.millis(long)) or type.minus(Duration.millis(long)) instead.
OUTPUT_ELEMENT + new
Instant(windowDuration.minus(3L).minus(1L).getMillis()));
^
(see https://errorprone.info/bugpattern/JodaPlusMinusLong)
Did you mean 'OUTPUT_ELEMENT + new
Instant(windowDuration.minus(Duration.millis(3L)).minus(1L).getMillis()));'?
```
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1790,6 +1791,173 @@ public void finishBundle(FinishBundleContext c) {
/** Tests to validate output timestamps. */
@RunWith(JUnit4.class)
public static class TimestampTests extends SharedTestBase implements
Serializable {
+
+ static final String TIMER_ELEMENT = "timer";
+ static final String OUTPUT_ELEMENT = "output";
+
+ /**
+ * Checks that the given message is correct and includes the element
timestamp, allowed skew,
+ * and output timestamp.
+ */
+ static boolean hasExpectedError(
+ IllegalArgumentException e,
+ Duration allowedSkew,
+ Instant elementTimestamp,
+ Instant outputTimestamp) {
+ return e.getMessage().contains("timestamp of the ")
+ && e.getMessage().contains(elementTimestamp.toString())
+ && e.getMessage().contains("timestamp " + outputTimestamp)
+ && e.getMessage()
+ .contains("allowed skew (" +
PeriodFormat.getDefault().print(allowedSkew.toPeriod()))
+ && e.getMessage().contains("getAllowedTimestampSkew");
+ }
+
+ /**
+ * A {@link DoFn} that outputs an element at and creates/sets a timer with
an output timestamp
+ * equal to the input timestamp minus the input element's value. Keys are
ignored but required
+ * for timers.
+ */
+ private static class ProcessElementTimestampSkewingDoFn
+ extends DoFn<KV<String, Duration>, String> {
+
+ static final String TIMER_ID = "testTimerId";
+ private final Duration allowedSkew;
+
+ @TimerId(TIMER_ID)
+ private static final TimerSpec timer =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ private ProcessElementTimestampSkewingDoFn(Duration allowedSkew) {
+ this.allowedSkew = allowedSkew;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(TIMER_ID)
Timer timer) {
+ Instant outputTimestamp =
context.timestamp().minus(context.element().getValue());
+ try {
+ context.outputWithTimestamp(OUTPUT_ELEMENT, outputTimestamp);
+ } catch (IllegalArgumentException e) {
+ if (hasExpectedError(e, allowedSkew, context.timestamp(),
outputTimestamp)) {
+ context.output(OUTPUT_ELEMENT + outputTimestamp.toString());
+ }
+ }
+ try {
+ timer.withOutputTimestamp(outputTimestamp).set(new Instant(0));
+ context.output(TIMER_ELEMENT);
+ } catch (IllegalArgumentException e) {
+ if (hasExpectedError(e, allowedSkew, context.timestamp(),
outputTimestamp)) {
+ context.output(TIMER_ELEMENT + outputTimestamp.toString());
+ }
+ }
+ }
+
+ @OnTimer(TIMER_ID)
+ public void onTimer(OnTimerContext context) {}
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return allowedSkew;
+ }
+ }
+
+ /**
+ * A {@link DoFn} that sets a timer that outputs an element and sets a
second timer with an
+ * output timestamp equal to the input timestamp minus the input element's
value. Keys are
+ * ignored but required for timers.
+ */
+ private static class OnTimerTimestampSkewingDoFn extends DoFn<KV<String,
String>, String> {
+
+ static final String FIRST_TIMER_ID = "firstTestTimerId";
+ static final String SECOND_TIMER_ID = "secondTestTimerId";
+ private final Duration allowedSkew;
+ private final Duration outputTimestampSkew;
+
+ @TimerId(FIRST_TIMER_ID)
+ private static final TimerSpec firstTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @TimerId(SECOND_TIMER_ID)
+ private static final TimerSpec secondTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ private OnTimerTimestampSkewingDoFn(Duration allowedSkew, Duration
outputTimestampSkew) {
+ this.allowedSkew = allowedSkew;
+ this.outputTimestampSkew = outputTimestampSkew;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context,
@TimerId(FIRST_TIMER_ID) Timer timer) {
+ timer.set(context.timestamp());
+ }
+
+ @OnTimer(SECOND_TIMER_ID)
+ public void onSecondTimer(OnTimerContext context) {}
+
+ @OnTimer(FIRST_TIMER_ID)
+ public void onFirstTimer(OnTimerContext context,
@TimerId(SECOND_TIMER_ID) Timer timer) {
+ Instant outputTimestamp =
context.timestamp().minus(outputTimestampSkew);
+ try {
+ context.outputWithTimestamp(OUTPUT_ELEMENT, outputTimestamp);
+ } catch (IllegalArgumentException e) {
+ if (hasExpectedError(e, allowedSkew, context.timestamp(),
outputTimestamp)) {
+ context.output(OUTPUT_ELEMENT + outputTimestamp);
+ }
+ }
+ try {
+ timer.withOutputTimestamp(outputTimestamp).set(context.timestamp());
+ context.output(TIMER_ELEMENT);
+ } catch (IllegalArgumentException e) {
+ if (hasExpectedError(e, allowedSkew, context.timestamp(),
outputTimestamp)) {
+ context.output(TIMER_ELEMENT + outputTimestamp);
+ }
+ }
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return allowedSkew;
+ }
+ }
+
+ /**
+ * A {@link DoFn} that on window expiration outputs an element with an
output timestamp equal
+ * output timestamp equal to the input timestamp minus the input element's
value. Keys are
+ * ignored but required for timers.
Review comment:
```suggestion
* A {@link DoFn} that on window expiration outputs an element with an
output timestamp equal
* to the input timestamp minus the input element's value. Keys are
ignored but required for timers.
```
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -1957,6 +2125,84 @@ public Duration getAllowedTimestampSkew() {
pipeline.run();
}
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ public void testProcessElementSkew() {
+ List<KV<String, Duration>> durations =
+ Arrays.asList(KV.of("0", Duration.millis(0L)), KV.of("2",
Duration.millis(1L)));
+ PCollection<KV<String, Duration>> input =
+ pipeline.apply("create", Create.timestamped(durations,
Arrays.asList(0L, 1L)));
+
+ PCollection<String> noSkew =
+ input.apply("noSkew", ParDo.of(new
ProcessElementTimestampSkewingDoFn(Duration.ZERO)));
+ PAssert.that(noSkew)
Review comment:
The logic within `testOnTimerTimestampSkew` is much simpler to
understand because the skew and noskew variants are differentiated.
Do you mind splitting this one up similarly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]