This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dce846b  Merge pull request #15247: [BEAM-12686] Enable self-managed 
relative timers
dce846b is described below

commit dce846b36a4fb9140c4c5d14e10b72f835f03d98
Author: reuvenlax <[email protected]>
AuthorDate: Sat Jul 31 13:50:49 2021 -0700

    Merge pull request #15247: [BEAM-12686] Enable self-managed relative timers
    
    * expose relative time
    
    * update comment and formatting
    
    * fix test and address comment
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 20 +++++-------
 .../main/java/org/apache/beam/sdk/state/Timer.java |  7 ++++
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 37 ----------------------
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 13 ++++----
 4 files changed, 22 insertions(+), 55 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0676915..bfa4398 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1124,14 +1124,13 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
     @Override
     public void set(Instant target) {
       this.target = target;
-      verifyAbsoluteTimeDomain();
       setAndVerifyOutputTimestamp();
       setUnderlyingTimer();
     }
 
     @Override
     public void setRelative() {
-      Instant now = getCurrentTime();
+      Instant now = getCurrentRelativeTime();
       if (period.equals(Duration.ZERO)) {
         target = now.plus(offset);
       } else {
@@ -1181,14 +1180,6 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
       return this;
     }
 
-    /** Verifies that the time domain of this timer is acceptable for absolute 
timers. */
-    private void verifyAbsoluteTimeDomain() {
-      if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        throw new IllegalStateException(
-            "Cannot only set relative timers in processing time domain." + " 
Use #setRelative()");
-      }
-    }
-
     /**
      *
      *
@@ -1255,8 +1246,13 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
           namespace, timerId, timerFamilyId, target, outputTimestamp, 
spec.getTimeDomain());
     }
 
-    private Instant getCurrentTime() {
-      switch (spec.getTimeDomain()) {
+    @Override
+    public Instant getCurrentRelativeTime() {
+      return getCurrentTime(spec.getTimeDomain());
+    }
+
+    private Instant getCurrentTime(TimeDomain timeDomain) {
+      switch (timeDomain) {
         case EVENT_TIME:
           return timerInternals.currentInputWatermarkTime();
         case PROCESSING_TIME:
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index 5f1c047..78453ee7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -98,4 +98,11 @@ public interface Timer {
    * the timer fires.
    */
   Timer withOutputTimestamp(Instant outputTime);
+
+  /**
+   * Returns the current relative time used by {@link #setRelative()} and 
{@link #offset}. This can
+   * be used by a client that self-manages relative timers (e.g. one that 
stores the current timer
+   * time in a state variable.
+   */
+  Instant getCurrentRelativeTime();
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 50ab860..f720612 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3820,43 +3820,6 @@ public class ParDoTest implements Serializable {
 
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class})
-    public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
-      final String timerId = "foo";
-
-      DoFn<KV<String, Integer>, Integer> fn =
-          new DoFn<KV<String, Integer>, Integer>() {
-
-            @TimerId(timerId)
-            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-
-            @ProcessElement
-            public void processElement(@TimerId(timerId) Timer timer) {
-              try {
-                timer.set(new Instant(0));
-                fail("Should have failed due to processing time with absolute 
timer.");
-              } catch (RuntimeException e) {
-                String message = e.getMessage();
-                List<String> expectedSubstrings =
-                    Arrays.asList("relative timers", "processing time");
-                expectedSubstrings.forEach(
-                    str ->
-                        Preconditions.checkState(
-                            message.contains(str),
-                            "Pipeline didn't fail with the expected strings: 
%s",
-                            expectedSubstrings));
-              }
-            }
-
-            @OnTimer(timerId)
-            public void onTimer() {}
-          };
-
-      pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
-      pipeline.run();
-    }
-
-    @Test
-    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
     public void testOutOfBoundsEventTimeTimer() throws Exception {
       final String timerId = "foo";
 
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 52485db..e9cd5a8 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1721,6 +1721,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
           fireTimestamp = elementTimestampOrTimerFireTimestamp;
           break;
         case PROCESSING_TIME:
+          // TODO: This should use an injected clock when using TestStream.
           fireTimestamp = new Instant(DateTimeUtils.currentTimeMillis());
           break;
         default:
@@ -1743,12 +1744,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
     @Override
     public void set(Instant absoluteTime) {
-      // Verifies that the time domain of this timer is acceptable for 
absolute timers.
-      if (!TimeDomain.EVENT_TIME.equals(timeDomain)) {
-        throw new IllegalArgumentException(
-            "Can only set relative timers in processing time domain. Use 
#setRelative()");
-      }
-
       // Ensures that the target time is reasonable. For event time timers 
this means that the time
       // should be prior to window GC time.
       if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
@@ -1808,6 +1803,12 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       this.outputTimestamp = outputTime;
       return this;
     }
+
+    @Override
+    public Instant getCurrentRelativeTime() {
+      return fireTimestamp;
+    }
+
     /**
      * For event time timers the target time should be prior to window GC 
time. So it returns
      * min(time to set, GC Time of window).

Reply via email to