lukecwik commented on a change in pull request #12836:
URL: https://github.com/apache/beam/pull/12836#discussion_r491185424
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -1613,7 +1613,15 @@ public TimerUpdateBuilder setTimer(TimerData setTimer) {
"Got a timer for after the end of time (%s), got %s",
BoundedWindow.TIMESTAMP_MAX_VALUE,
setTimer.getTimestamp());
- deletedTimers.remove(setTimer);
+ // When constructing TimerData for deleted timer, we are using
Instant.EPOCH for timestamp
+ // and output timestamp.
Review comment:
```suggestion
// Use a fake timestamp (EPOCH) for the fire and output timestamps
since they are ignored.
```
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,350 @@ public void onTimer(
pipeline.run();
}
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerWithinSameBundle() {
+
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
Review comment:
Dependent on the timer discussion I started on dev@:
Lets create a test for the timer loop where the timer is set by an initial
element and during its firing will always be set again and a second element
will clear the timer. We should setup the test stream such that the timer is
eligible to fire at the same time an element is eligible to be processed that
would clear the timer.
This will require us to keep track of whether a timer has been cleared
within a bundle prior to its firing.
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,350 @@ public void onTimer(
pipeline.run();
}
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerWithinSameBundle() {
+
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer>
r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(2)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearEventTimeTimerWithinSameBundle() {
+ final String timerId = "event-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearUnsetProcessingTimeTimer() {
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer>
r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(4)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearUnsetEventTimeTimer() {
+ final String timerId = "event-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerMaybeFire() {
+ final String timerId = "processing-timer";
+ final String clearTimerId = "clear-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @TimerId(clearTimerId)
+ private final TimerSpec clearTimerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer,
+ @TimerId(clearTimerId) Timer clearTimer,
+ OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.offset(Duration.standardMinutes(2)).setRelative();
Review comment:
Why are we setting `timer` twice, the second set will override the first.
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,350 @@ public void onTimer(
pipeline.run();
}
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerWithinSameBundle() {
+
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer>
r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(2)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearEventTimeTimerWithinSameBundle() {
+ final String timerId = "event-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearUnsetProcessingTimeTimer() {
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer>
r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(4)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearUnsetEventTimeTimer() {
+ final String timerId = "event-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerMaybeFire() {
+ final String timerId = "processing-timer";
+ final String clearTimerId = "clear-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @TimerId(clearTimerId)
+ private final TimerSpec clearTimerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer,
+ @TimerId(clearTimerId) Timer clearTimer,
+ OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.offset(Duration.standardMinutes(2)).setRelative();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+
+ @OnTimer(clearTimerId)
+ public void clearTimer(@TimerId(timerId) Timer timer) {
+ timer.clear();
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(2)))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(4)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output)
+ .satisfies(
+ (Iterable<Integer> input) -> {
+ assertEquals(1, Iterables.frequency(input, 3));
+ // When setting timer and clearing timer happens on different
bundles, whether the
+ // timer gets cleared depends on runner implementation.
+ assertTrue(
+ Iterables.frequency(input, 42) == 1 ||
Iterables.frequency(input, 42) == 2);
+ return null;
+ });
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ @Ignore("https://issues.apache.org/jira/browse/BEAM-2791")
+ public void testClearEventTimeTimerMaybeFire() {
+ final String timerId = "event-timer";
+ final String clearTimerId = "clear-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @TimerId(clearTimerId)
+ private final TimerSpec clearSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer,
+ @TimerId(clearTimerId) Timer clearTimer,
+ OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.offset(Duration.standardSeconds(3)).setRelative();
Review comment:
ditto for setting this timer twice, second set will override the first.
##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
*/
void setRelative();
+ /** Clears the timer. */
Review comment:
State and timers are key and window partitioned so a timer can't be
cleared in parallel while it is being set from a different bundle. We should
ensure that a cleared timer from a previous bundle will not be fired for a
future bundle.
For timers within the same bundle there is a question as to whether we
should try to prevent a timer from firing if it was cleared within
`@ProcessElement` or during a different timer callback. For example, supporting
clearing a timer loop makes a lot of sense:
```
@ProcessElement
process(ProcessContext c) {
if (initialCondition) {
setTimer();
} else {
clearTimer();
}
}
@OnTimer
onTimer(...) {
do some side effect
set timer to fire again in the future
}
```
This logic now has a race condition where the condition has to be stored and
checked within the timer loop otherwise we may clear the timer and then set the
timer again since the timer fires after the element is processed within the
same bundle.
Started a [discussion on
dev@](https://lists.apache.org/thread.html/r172e81586527cdccd94983c4348fc063c8cd37355c02bb441df2fa23%40%3Cdev.beam.apache.org%3E)
to get clarity as to what behavior we want.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]