ibzib commented on a change in pull request #12836:
URL: https://github.com/apache/beam/pull/12836#discussion_r489821074
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,346 @@ public void onTimer(
pipeline.run();
}
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerWithinSameBundle() {
+
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer>
r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(2)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearEventTimeTimerWithinSameBundle() {
+ final String timerId = "event-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearUnsetProcessingTimeTimer() {
+ final String timerId = "processing-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer>
r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(4)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearUnsetEventTimeTimer() {
+ final String timerId = "event-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(@TimerId(timerId) Timer timer,
OutputReceiver<Integer> r) {
+ timer.clear();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new
Instant(0).plus(Duration.standardSeconds(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class,
UsesTestStream.class})
+ public void testClearProcessingTimeTimerMaybeFire() {
+ final String timerId = "processing-timer";
+ final String clearTimerId = "clear-timer";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @TimerId(clearTimerId)
+ private final TimerSpec clearTimerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer,
+ @TimerId(clearTimerId) Timer clearTimer,
+ OutputReceiver<Integer> r) {
+ timer.offset(Duration.standardSeconds(1)).setRelative();
+ clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+ timer.offset(Duration.standardMinutes(2)).setRelative();
+ r.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OutputReceiver<Integer> r) {
+ r.output(42);
+ }
+
+ @OnTimer(clearTimerId)
+ public void clearTimer(@TimerId(timerId) Timer timer) {
+ timer.clear();
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .addElements(KV.of("hello", 37))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(2)))
+ .advanceProcessingTime(
+ Duration.millis(
+ DateTimeUtils.currentTimeMillis() / 1000 * 1000) //
round to seconds
+ .plus(Duration.standardMinutes(4)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output)
+ .satisfies(
+ (Iterable<Integer> input) -> {
+ assertEquals(1, Iterables.frequency(input, 3));
+ assertTrue(
+ Iterables.frequency(input, 42) == 1 ||
Iterables.frequency(input, 42) == 2);
Review comment:
Can you add a comment explaining why this behavior is not defined?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]