lukecwik commented on a change in pull request #12836:
URL: https://github.com/apache/beam/pull/12836#discussion_r491185424



##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -1613,7 +1613,15 @@ public TimerUpdateBuilder setTimer(TimerData setTimer) {
             "Got a timer for after the end of time (%s), got %s",
             BoundedWindow.TIMESTAMP_MAX_VALUE,
             setTimer.getTimestamp());
-        deletedTimers.remove(setTimer);
+        // When constructing TimerData for deleted timer, we are using 
Instant.EPOCH for timestamp
+        // and output timestamp.

Review comment:
       ```suggestion
           // Use a fake timestamp (EPOCH) for the fire and output timestamps 
since they are ignored.
   ```

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,350 @@ public void onTimer(
       pipeline.run();
     }
 
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerWithinSameBundle() {
+
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =

Review comment:
       Dependent on the timer discussion I started on dev@:
   
   Lets create a test for the timer loop where the timer is set by an initial 
element and during its firing will always be set again and a second element 
will clear the timer. We should setup the test stream such that the timer is 
eligible to fire at the same time an element is eligible to be processed that 
would clear the timer.
   
   This will require us to keep track of whether a timer has been cleared 
within a bundle prior to its firing.

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,350 @@ public void onTimer(
       pipeline.run();
     }
 
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerWithinSameBundle() {
+
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(2)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearEventTimeTimerWithinSameBundle() {
+      final String timerId = "event-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 37))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearUnsetProcessingTimeTimer() {
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(4)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearUnsetEventTimeTimer() {
+      final String timerId = "event-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 37))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerMaybeFire() {
+      final String timerId = "processing-timer";
+      final String clearTimerId = "clear-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @TimerId(clearTimerId)
+            private final TimerSpec clearTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer,
+                @TimerId(clearTimerId) Timer clearTimer,
+                OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.offset(Duration.standardMinutes(2)).setRelative();

Review comment:
       Why are we setting `timer` twice, the second set will override the first.

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,350 @@ public void onTimer(
       pipeline.run();
     }
 
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerWithinSameBundle() {
+
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(2)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearEventTimeTimerWithinSameBundle() {
+      final String timerId = "event-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 37))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearUnsetProcessingTimeTimer() {
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(4)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearUnsetEventTimeTimer() {
+      final String timerId = "event-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 37))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerMaybeFire() {
+      final String timerId = "processing-timer";
+      final String clearTimerId = "clear-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @TimerId(clearTimerId)
+            private final TimerSpec clearTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer,
+                @TimerId(clearTimerId) Timer clearTimer,
+                OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.offset(Duration.standardMinutes(2)).setRelative();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+
+            @OnTimer(clearTimerId)
+            public void clearTimer(@TimerId(timerId) Timer timer) {
+              timer.clear();
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(2)))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(4)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output)
+          .satisfies(
+              (Iterable<Integer> input) -> {
+                assertEquals(1, Iterables.frequency(input, 3));
+                // When setting timer and clearing timer happens on different 
bundles, whether the
+                // timer gets cleared depends on runner implementation.
+                assertTrue(
+                    Iterables.frequency(input, 42) == 1 || 
Iterables.frequency(input, 42) == 2);
+                return null;
+              });
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    @Ignore("https://issues.apache.org/jira/browse/BEAM-2791";)
+    public void testClearEventTimeTimerMaybeFire() {
+      final String timerId = "event-timer";
+      final String clearTimerId = "clear-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @TimerId(clearTimerId)
+            private final TimerSpec clearSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer,
+                @TimerId(clearTimerId) Timer clearTimer,
+                OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.offset(Duration.standardSeconds(3)).setRelative();

Review comment:
       ditto for setting this timer twice, second set will override the first.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */
   void setRelative();
 
+  /** Clears the timer. */

Review comment:
       State and timers are key and window partitioned so a timer can't be 
cleared in parallel while it is being set from a different bundle. We should 
ensure that a cleared timer from a previous bundle will not be fired for a 
future bundle.
   
   For timers within the same bundle there is a question as to whether we 
should try to prevent a timer from firing if it was cleared within 
`@ProcessElement` or during a different timer callback. For example, supporting 
clearing a timer loop makes a lot of sense:
   ```
   @ProcessElement
   process(ProcessContext c) {
     if (initialCondition) {
       setTimer();
     } else {
       clearTimer();
     }
   }
   
   @OnTimer
   onTimer(...) {
     do some side effect
     set timer to fire again in the future
   }
   ```
   
   This logic now has a race condition where the condition has to be stored and 
checked within the timer loop otherwise we may clear the timer and then set the 
timer again since the timer fires after the element is processed within the 
same bundle.
   
   Started a [discussion on 
dev@](https://lists.apache.org/thread.html/r172e81586527cdccd94983c4348fc063c8cd37355c02bb441df2fa23%40%3Cdev.beam.apache.org%3E)
 to get clarity as to what behavior we want.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to