lukecwik commented on a change in pull request #12836: URL: https://github.com/apache/beam/pull/12836#discussion_r491185424
########## File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ########## @@ -1613,7 +1613,15 @@ public TimerUpdateBuilder setTimer(TimerData setTimer) { "Got a timer for after the end of time (%s), got %s", BoundedWindow.TIMESTAMP_MAX_VALUE, setTimer.getTimestamp()); - deletedTimers.remove(setTimer); + // When constructing TimerData for deleted timer, we are using Instant.EPOCH for timestamp + // and output timestamp. Review comment: ```suggestion // Use a fake timestamp (EPOCH) for the fire and output timestamps since they are ignored. ``` ########## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ########## @@ -4609,6 +4609,350 @@ public void onTimer( pipeline.run(); } + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearProcessingTimeTimerWithinSameBundle() { + + final String timerId = "processing-timer"; + + DoFn<KV<String, Integer>, Integer> fn = Review comment: Dependent on the timer discussion I started on dev@: Lets create a test for the timer loop where the timer is set by an initial element and during its firing will always be set again and a second element will clear the timer. We should setup the test stream such that the timer is eligible to fire at the same time an element is eligible to be processed that would clear the timer. This will require us to keep track of whether a timer has been cleared within a bundle prior to its firing. ########## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ########## @@ -4609,6 +4609,350 @@ public void onTimer( pipeline.run(); } + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearProcessingTimeTimerWithinSameBundle() { + + final String timerId = "processing-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .addElements(KV.of("hello", 37)) + .advanceProcessingTime( + Duration.millis( + DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds + .plus(Duration.standardMinutes(2))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearEventTimeTimerWithinSameBundle() { + final String timerId = "event-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceWatermarkTo(new Instant(0)) + .addElements(KV.of("hello", 37)) + .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearUnsetProcessingTimeTimer() { + final String timerId = "processing-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .addElements(KV.of("hello", 37)) + .advanceProcessingTime( + Duration.millis( + DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds + .plus(Duration.standardMinutes(4))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearUnsetEventTimeTimer() { + final String timerId = "event-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceWatermarkTo(new Instant(0)) + .addElements(KV.of("hello", 37)) + .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearProcessingTimeTimerMaybeFire() { + final String timerId = "processing-timer"; + final String clearTimerId = "clear-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @TimerId(clearTimerId) + private final TimerSpec clearTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, + @TimerId(clearTimerId) Timer clearTimer, + OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + clearTimer.offset(Duration.standardSeconds(1)).setRelative(); + timer.offset(Duration.standardMinutes(2)).setRelative(); Review comment: Why are we setting `timer` twice, the second set will override the first. ########## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ########## @@ -4609,6 +4609,350 @@ public void onTimer( pipeline.run(); } + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearProcessingTimeTimerWithinSameBundle() { + + final String timerId = "processing-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .addElements(KV.of("hello", 37)) + .advanceProcessingTime( + Duration.millis( + DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds + .plus(Duration.standardMinutes(2))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearEventTimeTimerWithinSameBundle() { + final String timerId = "event-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceWatermarkTo(new Instant(0)) + .addElements(KV.of("hello", 37)) + .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearUnsetProcessingTimeTimer() { + final String timerId = "processing-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .addElements(KV.of("hello", 37)) + .advanceProcessingTime( + Duration.millis( + DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds + .plus(Duration.standardMinutes(4))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearUnsetEventTimeTimer() { + final String timerId = "event-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement(@TimerId(timerId) Timer timer, OutputReceiver<Integer> r) { + timer.clear(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(OutputReceiver<Integer> r) { + r.output(42); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceWatermarkTo(new Instant(0)) + .addElements(KV.of("hello", 37)) + .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output).containsInAnyOrder(3); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + public void testClearProcessingTimeTimerMaybeFire() { + final String timerId = "processing-timer"; + final String clearTimerId = "clear-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @TimerId(clearTimerId) + private final TimerSpec clearTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, + @TimerId(clearTimerId) Timer clearTimer, + OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + clearTimer.offset(Duration.standardSeconds(1)).setRelative(); + timer.offset(Duration.standardMinutes(2)).setRelative(); + r.output(3); + } + + @OnTimer(timerId) + public void onTimer(OutputReceiver<Integer> r) { + r.output(42); + } + + @OnTimer(clearTimerId) + public void clearTimer(@TimerId(timerId) Timer timer) { + timer.clear(); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .addElements(KV.of("hello", 37)) + .advanceProcessingTime( + Duration.millis( + DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds + .plus(Duration.standardMinutes(2))) + .advanceProcessingTime( + Duration.millis( + DateTimeUtils.currentTimeMillis() / 1000 * 1000) // round to seconds + .plus(Duration.standardMinutes(4))) + .advanceWatermarkToInfinity(); + + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + PAssert.that(output) + .satisfies( + (Iterable<Integer> input) -> { + assertEquals(1, Iterables.frequency(input, 3)); + // When setting timer and clearing timer happens on different bundles, whether the + // timer gets cleared depends on runner implementation. + assertTrue( + Iterables.frequency(input, 42) == 1 || Iterables.frequency(input, 42) == 2); + return null; + }); + pipeline.run(); + } + + @Test + @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class}) + @Ignore("https://issues.apache.org/jira/browse/BEAM-2791") + public void testClearEventTimeTimerMaybeFire() { + final String timerId = "event-timer"; + final String clearTimerId = "clear-timer"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(clearTimerId) + private final TimerSpec clearSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, + @TimerId(clearTimerId) Timer clearTimer, + OutputReceiver<Integer> r) { + timer.offset(Duration.standardSeconds(1)).setRelative(); + clearTimer.offset(Duration.standardSeconds(1)).setRelative(); + timer.offset(Duration.standardSeconds(3)).setRelative(); Review comment: ditto for setting this timer twice, second set will override the first. ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java ########## @@ -81,6 +81,9 @@ */ void setRelative(); + /** Clears the timer. */ Review comment: State and timers are key and window partitioned so a timer can't be cleared in parallel while it is being set from a different bundle. We should ensure that a cleared timer from a previous bundle will not be fired for a future bundle. For timers within the same bundle there is a question as to whether we should try to prevent a timer from firing if it was cleared within `@ProcessElement` or during a different timer callback. For example, supporting clearing a timer loop makes a lot of sense: ``` @ProcessElement process(ProcessContext c) { if (initialCondition) { setTimer(); } else { clearTimer(); } } @OnTimer onTimer(...) { do some side effect set timer to fire again in the future } ``` This logic now has a race condition where the condition has to be stored and checked within the timer loop otherwise we may clear the timer and then set the timer again since the timer fires after the element is processed within the same bundle. Started a [discussion on dev@](https://lists.apache.org/thread.html/r172e81586527cdccd94983c4348fc063c8cd37355c02bb441df2fa23%40%3Cdev.beam.apache.org%3E) to get clarity as to what behavior we want. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org