This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new dce846b Merge pull request #15247: [BEAM-12686] Enable self-managed
relative timers
dce846b is described below
commit dce846b36a4fb9140c4c5d14e10b72f835f03d98
Author: reuvenlax <[email protected]>
AuthorDate: Sat Jul 31 13:50:49 2021 -0700
Merge pull request #15247: [BEAM-12686] Enable self-managed relative timers
* expose relative time
* update comment and formatting
* fix test and address comment
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 20 +++++-------
.../main/java/org/apache/beam/sdk/state/Timer.java | 7 ++++
.../org/apache/beam/sdk/transforms/ParDoTest.java | 37 ----------------------
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 13 ++++----
4 files changed, 22 insertions(+), 55 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0676915..bfa4398 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1124,14 +1124,13 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
@Override
public void set(Instant target) {
this.target = target;
- verifyAbsoluteTimeDomain();
setAndVerifyOutputTimestamp();
setUnderlyingTimer();
}
@Override
public void setRelative() {
- Instant now = getCurrentTime();
+ Instant now = getCurrentRelativeTime();
if (period.equals(Duration.ZERO)) {
target = now.plus(offset);
} else {
@@ -1181,14 +1180,6 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
return this;
}
- /** Verifies that the time domain of this timer is acceptable for absolute
timers. */
- private void verifyAbsoluteTimeDomain() {
- if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
- throw new IllegalStateException(
- "Cannot only set relative timers in processing time domain." + "
Use #setRelative()");
- }
- }
-
/**
*
*
@@ -1255,8 +1246,13 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
namespace, timerId, timerFamilyId, target, outputTimestamp,
spec.getTimeDomain());
}
- private Instant getCurrentTime() {
- switch (spec.getTimeDomain()) {
+ @Override
+ public Instant getCurrentRelativeTime() {
+ return getCurrentTime(spec.getTimeDomain());
+ }
+
+ private Instant getCurrentTime(TimeDomain timeDomain) {
+ switch (timeDomain) {
case EVENT_TIME:
return timerInternals.currentInputWatermarkTime();
case PROCESSING_TIME:
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index 5f1c047..78453ee7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -98,4 +98,11 @@ public interface Timer {
* the timer fires.
*/
Timer withOutputTimestamp(Instant outputTime);
+
+ /**
+ * Returns the current relative time used by {@link #setRelative()} and
{@link #offset}. This can
+ * be used by a client that self-manages relative timers (e.g. one that
stores the current timer
+ * time in a state variable.
+ */
+ Instant getCurrentRelativeTime();
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 50ab860..f720612 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3820,43 +3820,6 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class})
- public void testAbsoluteProcessingTimeTimerRejected() throws Exception {
- final String timerId = "foo";
-
- DoFn<KV<String, Integer>, Integer> fn =
- new DoFn<KV<String, Integer>, Integer>() {
-
- @TimerId(timerId)
- private final TimerSpec spec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-
- @ProcessElement
- public void processElement(@TimerId(timerId) Timer timer) {
- try {
- timer.set(new Instant(0));
- fail("Should have failed due to processing time with absolute
timer.");
- } catch (RuntimeException e) {
- String message = e.getMessage();
- List<String> expectedSubstrings =
- Arrays.asList("relative timers", "processing time");
- expectedSubstrings.forEach(
- str ->
- Preconditions.checkState(
- message.contains(str),
- "Pipeline didn't fail with the expected strings:
%s",
- expectedSubstrings));
- }
- }
-
- @OnTimer(timerId)
- public void onTimer() {}
- };
-
- pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
public void testOutOfBoundsEventTimeTimer() throws Exception {
final String timerId = "foo";
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 52485db..e9cd5a8 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1721,6 +1721,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
fireTimestamp = elementTimestampOrTimerFireTimestamp;
break;
case PROCESSING_TIME:
+ // TODO: This should use an injected clock when using TestStream.
fireTimestamp = new Instant(DateTimeUtils.currentTimeMillis());
break;
default:
@@ -1743,12 +1744,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
@Override
public void set(Instant absoluteTime) {
- // Verifies that the time domain of this timer is acceptable for
absolute timers.
- if (!TimeDomain.EVENT_TIME.equals(timeDomain)) {
- throw new IllegalArgumentException(
- "Can only set relative timers in processing time domain. Use
#setRelative()");
- }
-
// Ensures that the target time is reasonable. For event time timers
this means that the time
// should be prior to window GC time.
if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
@@ -1808,6 +1803,12 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
this.outputTimestamp = outputTime;
return this;
}
+
+ @Override
+ public Instant getCurrentRelativeTime() {
+ return fireTimestamp;
+ }
+
/**
* For event time timers the target time should be prior to window GC
time. So it returns
* min(time to set, GC Time of window).