ibzib commented on a change in pull request #12836:
URL: https://github.com/apache/beam/pull/12836#discussion_r489814915



##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -1613,7 +1613,13 @@ public TimerUpdateBuilder setTimer(TimerData setTimer) {
             "Got a timer for after the end of time (%s), got %s",
             BoundedWindow.TIMESTAMP_MAX_VALUE,
             setTimer.getTimestamp());
-        deletedTimers.remove(setTimer);
+        deletedTimers.remove(

Review comment:
       Can you add a comment here as well explaining why we need to use the 
epoch for timestamp and outputTimestamp?

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
##########
@@ -69,7 +69,9 @@ public void setTimer(TimerData timerData) {
 
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
-    throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
+    // For deleting a timer, we don't cate about the timestamp and 
outputTimestamp.

Review comment:
       ```suggestion
       // For deleting a timer, we don't care about the timestamp and 
outputTimestamp.
   ```

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -1624,7 +1630,15 @@ public TimerUpdateBuilder setTimer(TimerData setTimer) {
        */
       public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
         deletedTimers.add(deletedTimer);
-        setTimers.remove(deletedTimer);
+        TimerData timerToDelete = null;
+        for (TimerData timer : setTimers) {
+          if (timer.getDomain().equals(deletedTimer.getDomain())
+              && timer.getNamespace().equals(deletedTimer.getNamespace())
+              && timer.getTimerId().equals(deletedTimer.getTimerId())) {
+            timerToDelete = timer;
+          }
+        }
+        setTimers.remove(timerToDelete);

Review comment:
       Should we warn the user if they try to clear an unset timer?

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4609,6 +4609,346 @@ public void onTimer(
       pipeline.run();
     }
 
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerWithinSameBundle() {
+
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(2)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearEventTimeTimerWithinSameBundle() {
+      final String timerId = "event-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 37))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearUnsetProcessingTimeTimer() {
+      final String timerId = "processing-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(TimeDomain timeDomain, OutputReceiver<Integer> 
r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(4)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearUnsetEventTimeTimer() {
+      final String timerId = "event-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer> r) {
+              timer.clear();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 37))
+              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output).containsInAnyOrder(3);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void testClearProcessingTimeTimerMaybeFire() {
+      final String timerId = "processing-timer";
+      final String clearTimerId = "clear-timer";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @TimerId(timerId)
+            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @TimerId(clearTimerId)
+            private final TimerSpec clearTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer,
+                @TimerId(clearTimerId) Timer clearTimer,
+                OutputReceiver<Integer> r) {
+              timer.offset(Duration.standardSeconds(1)).setRelative();
+              clearTimer.offset(Duration.standardSeconds(1)).setRelative();
+              timer.offset(Duration.standardMinutes(2)).setRelative();
+              r.output(3);
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(OutputReceiver<Integer> r) {
+              r.output(42);
+            }
+
+            @OnTimer(clearTimerId)
+            public void clearTimer(@TimerId(timerId) Timer timer) {
+              timer.clear();
+            }
+          };
+
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .addElements(KV.of("hello", 37))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(2)))
+              .advanceProcessingTime(
+                  Duration.millis(
+                          DateTimeUtils.currentTimeMillis() / 1000 * 1000) // 
round to seconds
+                      .plus(Duration.standardMinutes(4)))
+              .advanceWatermarkToInfinity();
+
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+      PAssert.that(output)
+          .satisfies(
+              (Iterable<Integer> input) -> {
+                assertEquals(1, Iterables.frequency(input, 3));
+                assertTrue(
+                    Iterables.frequency(input, 42) == 1 || 
Iterables.frequency(input, 42) == 2);

Review comment:
       Can you add a comment explaining why this behavior is not deterministic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to