This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a005fd7 Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers. a005fd7 is described below commit a005fd765a762183ca88df90f261f6d4a20cf3e0 Author: Rehman <rehmanmurada...@gmail.com> AuthorDate: Sun Feb 9 12:55:27 2020 +0500 Merge pull request #10627:[BEAM-2535] Support outputTimestamp and watermark holds in processing timers. --- .../apache/beam/runners/core/SimpleDoFnRunner.java | 80 +++++++++++++----- .../beam/runners/direct/WatermarkManager.java | 47 +++++++++-- .../beam/runners/direct/WatermarkManagerTest.java | 5 +- .../org/apache/beam/sdk/transforms/ParDoTest.java | 97 +++++++++++++++++++++- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 1 - 5 files changed, 199 insertions(+), 31 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index aaae986..98d8e04 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -762,7 +762,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out try { TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); return new TimerInternalsTimer( - window(), getNamespace(), timerId, spec, stepContext.timerInternals()); + window(), getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -774,7 +774,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out TimerSpec spec = (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); return new TimerInternalsTimerMap( - timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals()); + timerFamilyId, + window(), + getNamespace(), + spec, + timestamp(), + stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -949,7 +954,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out try { TimerSpec spec = (TimerSpec) signature.timerDeclarations().get(timerId).field().get(fn); return new TimerInternalsTimer( - window, getNamespace(), timerId, spec, stepContext.timerInternals()); + window, getNamespace(), timerId, spec, timestamp(), stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -961,7 +966,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out TimerSpec spec = (TimerSpec) signature.timerFamilyDeclarations().get(timerFamilyId).field().get(fn); return new TimerInternalsTimerMap( - timerFamilyId, window(), getNamespace(), spec, stepContext.timerInternals()); + timerFamilyId, + window(), + getNamespace(), + spec, + timestamp(), + stepContext.timerInternals()); } catch (IllegalAccessException e) { throw new RuntimeException(e); } @@ -1006,6 +1016,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private final TimerSpec spec; private Instant target; private Instant outputTimestamp; + private final Instant elementInputTimestamp; private Duration period = Duration.ZERO; private Duration offset = Duration.ZERO; @@ -1014,12 +1025,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out StateNamespace namespace, String timerId, TimerSpec spec, + Instant elementInputTimestamp, TimerInternals timerInternals) { this.window = window; this.namespace = namespace; this.timerId = timerId; this.timerFamilyId = ""; this.spec = spec; + this.elementInputTimestamp = elementInputTimestamp; this.timerInternals = timerInternals; } @@ -1029,12 +1042,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out String timerId, String timerFamilyId, TimerSpec spec, + Instant elementInputTimestamp, TimerInternals timerInternals) { this.window = window; this.namespace = namespace; this.timerId = timerId; this.timerFamilyId = timerFamilyId; this.spec = spec; + this.elementInputTimestamp = elementInputTimestamp; this.timerInternals = timerInternals; } @@ -1111,24 +1126,35 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out * </ul> */ private void setAndVerifyOutputTimestamp() { - // Output timestamp is currently not supported in processing time timers. - if (outputTimestamp != null && !TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - throw new IllegalStateException("Cannot set outputTimestamp in processing time domain."); + + if (outputTimestamp != null) { + checkArgument( + !outputTimestamp.isBefore(elementInputTimestamp), + "output timestamp %s should be after input message timestamp or output timestamp of firing timers %s", + outputTimestamp, + elementInputTimestamp); } + // Output timestamp is set to the delivery time if not initialized by an user. - if (outputTimestamp == null) { + if (outputTimestamp == null && TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { outputTimestamp = target; } - - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); - checkArgument( - !target.isAfter(windowExpiry), - "Attempted to set event time timer that outputs for %s but that is" - + " after the expiration of window %s", - target, - windowExpiry); + // For processing timers + if (outputTimestamp == null) { + // For processing timers output timestamp will be: + // 1) timestamp of input element + // OR + // 2) output timestamp of firing timer. + outputTimestamp = elementInputTimestamp; } + + Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + checkArgument( + !target.isAfter(windowExpiry), + "Attempted to set event time timer that outputs for %s but that is" + + " after the expiration of window %s", + target, + windowExpiry); } /** @@ -1163,6 +1189,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out private final BoundedWindow window; private final StateNamespace namespace; private final TimerSpec spec; + private final Instant elementInputTimestamp; private final String timerFamilyId; public TimerInternalsTimerMap( @@ -1170,10 +1197,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out BoundedWindow window, StateNamespace namespace, TimerSpec spec, + Instant elementInputTimestamp, TimerInternals timerInternals) { this.window = window; this.namespace = namespace; this.spec = spec; + this.elementInputTimestamp = elementInputTimestamp; this.timerInternals = timerInternals; this.timerFamilyId = timerFamilyId; } @@ -1181,7 +1210,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out @Override public void set(String timerId, Instant absoluteTime) { Timer timer = - new TimerInternalsTimer(window, namespace, timerId, timerFamilyId, spec, timerInternals); + new TimerInternalsTimer( + window, + namespace, + timerId, + timerFamilyId, + spec, + elementInputTimestamp, + timerInternals); timer.set(absoluteTime); timers.put(timerId, timer); } @@ -1191,7 +1227,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out if (timers.get(timerId) == null) { Timer timer = new TimerInternalsTimer( - window, namespace, timerId, timerFamilyId, spec, timerInternals); + window, + namespace, + timerId, + timerFamilyId, + spec, + elementInputTimestamp, + timerInternals); timers.put(timerId, timer); } return timers.get(timerId); diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 265ebb1..35cfd23 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -327,10 +327,19 @@ public class WatermarkManager<ExecutableT, CollectionT> { if (pendingTimers.isEmpty()) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } else { - return pendingTimers.firstEntry().getElement().getOutputTimestamp(); + return getMinimumOutputTimestamp(pendingTimers); } } + private Instant getMinimumOutputTimestamp(SortedMultiset<TimerData> timers) { + Instant minimumOutputTimestamp = timers.firstEntry().getElement().getOutputTimestamp(); + for (TimerData timerData : timers) { + minimumOutputTimestamp = + INSTANT_ORDERING.min(timerData.getOutputTimestamp(), minimumOutputTimestamp); + } + return minimumOutputTimestamp; + } + @VisibleForTesting synchronized void updateTimers(TimerUpdate update) { NavigableSet<TimerData> keyTimers = @@ -597,20 +606,29 @@ public class WatermarkManager<ExecutableT, CollectionT> { Instant earliest = THE_END_OF_TIME.get(); for (NavigableSet<TimerData> timers : processingTimers.values()) { if (!timers.isEmpty()) { - earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); + earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(timers), earliest); } } for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) { if (!timers.isEmpty()) { - earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest); + earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(timers), earliest); } } if (!pendingTimers.isEmpty()) { - earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest); + earliest = INSTANT_ORDERING.min(getMinimumOutputTimestamp(pendingTimers), earliest); } return earliest; } + private Instant getMinimumOutputTimestamp(NavigableSet<TimerData> timers) { + Instant minimumOutputTimestamp = timers.first().getOutputTimestamp(); + for (TimerData timerData : timers) { + minimumOutputTimestamp = + INSTANT_ORDERING.min(timerData.getOutputTimestamp(), minimumOutputTimestamp); + } + return minimumOutputTimestamp; + } + private synchronized void updateTimers(TimerUpdate update) { Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key); Table<StateNamespace, String, TimerData> existingTimersForKey = @@ -738,15 +756,25 @@ public class WatermarkManager<ExecutableT, CollectionT> { private final String name; private final SynchronizedProcessingTimeInputWatermark inputWm; + private final PerKeyHolds holds; private AtomicReference<Instant> latestRefresh; public SynchronizedProcessingTimeOutputWatermark( String name, SynchronizedProcessingTimeInputWatermark inputWm) { this.name = name; this.inputWm = inputWm; + holds = new PerKeyHolds(); this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); } + public synchronized void updateHold(Object key, Instant newHold) { + if (newHold == null) { + holds.removeHold(key); + } else { + holds.updateHold(key, newHold); + } + } + @Override public String getName() { return name; @@ -780,7 +808,8 @@ public class WatermarkManager<ExecutableT, CollectionT> { // downstream timers to. Instant oldRefresh = latestRefresh.get(); Instant newTimestamp = - INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp()); + INSTANT_ORDERING.min( + inputWm.get(), holds.getMinHold(), inputWm.getEarliestTimerTimestamp()); latestRefresh.set(newTimestamp); return updateAndTrace(getName(), oldRefresh, newTimestamp); } @@ -788,6 +817,7 @@ public class WatermarkManager<ExecutableT, CollectionT> { @Override public synchronized String toString() { return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class) + .add("holds", holds) .add("latestRefresh", latestRefresh) .toString(); } @@ -1133,6 +1163,9 @@ public class WatermarkManager<ExecutableT, CollectionT> { TransformWatermarks transformWms = transformToWatermarks.get(executable); transformWms.setEventTimeHold( inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold()); + + transformWms.setSynchronizedProcessingTimeHold( + inputBundle == null ? null : inputBundle.getKey(), pending.getEarliestHold()); } /** @@ -1438,6 +1471,10 @@ public class WatermarkManager<ExecutableT, CollectionT> { outputWatermark.updateHold(key, newHold); } + private void setSynchronizedProcessingTimeHold(Object key, Instant newHold) { + synchronizedProcessingOutputWatermark.updateHold(key, newHold); + } + private void removePending(Bundle<?, ?> bundle) { inputWatermark.removePending(bundle); synchronizedProcessingInputWatermark.removePending(bundle); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 5e9cfc2..eef43c7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -1052,10 +1052,9 @@ public class WatermarkManagerTest implements Serializable { Collections.emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); manager.refreshAll(); - assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), equalTo(clock.now())); clock.set(new Instant(Long.MAX_VALUE)); - assertThat(filteredWms.getSynchronizedProcessingOutputTime(), equalTo(new Instant(4096))); + assertThat( filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(greaterThan(new Instant(4096)))); @@ -1161,7 +1160,7 @@ public class WatermarkManagerTest implements Serializable { BoundedWindow.TIMESTAMP_MAX_VALUE); manager.refreshAll(); - assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(lessThan(clock.now()))); + assertThat(downstreamWms.getSynchronizedProcessingInputTime(), not(greaterThan(clock.now()))); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 65f7003..6bcb1fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -97,6 +97,7 @@ import org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders; import org.apache.beam.sdk.testing.UsesStatefulParDo; import org.apache.beam.sdk.testing.UsesStrictTimerOrdering; import org.apache.beam.sdk.testing.UsesTestStream; +import org.apache.beam.sdk.testing.UsesTestStreamWithOutputTimestamp; import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime; import org.apache.beam.sdk.testing.UsesTimerMap; import org.apache.beam.sdk.testing.UsesTimersInParDo; @@ -3936,8 +3937,12 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement( - @TimerId(timerId) Timer timer, OutputReceiver<KV<String, Long>> o) { - timer.withOutputTimestamp(new Instant(5)).set(new Instant(10)); + @TimerId(timerId) Timer timer, + @Timestamp Instant timestamp, + OutputReceiver<KV<String, Long>> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.millis(5))) + .set(timestamp.plus(Duration.millis(10))); // Output a message. This will cause the next DoFn to set a timer as well. o.output(KV.of("foo", 100L)); } @@ -3958,6 +3963,7 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement( @TimerId(timerId) Timer timer, + @Timestamp Instant timestamp, @StateId("timerFired") ValueState<Boolean> timerFiredState) { Boolean timerFired = timerFiredState.read(); assertTrue(timerFired == null || !timerFired); @@ -3966,7 +3972,7 @@ public class ParDoTest implements Serializable { // DoFn timer's watermark hold. This timer should not fire until the previous timer // fires and removes // the watermark hold. - timer.set(new Instant(8)); + timer.set(timestamp.plus(Duration.millis(8))); } @OnTimer(timerId) @@ -3996,6 +4002,91 @@ public class ParDoTest implements Serializable { pipeline.run(); } + @Test + @Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesTimersInParDo.class, + UsesTestStreamWithProcessingTime.class, + UsesTestStreamWithOutputTimestamp.class + }) + public void testOutputTimestampWithProcessingTime() { + final String timerId = "foo"; + DoFn<KV<String, Integer>, KV<String, Integer>> fn1 = + new DoFn<KV<String, Integer>, KV<String, Integer>>() { + + @TimerId(timerId) + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, + @Timestamp Instant timestamp, + OutputReceiver<KV<String, Integer>> o) { + timer + .withOutputTimestamp(timestamp.plus(Duration.standardSeconds(5))) + .offset(Duration.standardSeconds(10)) + .setRelative(); + // Output a message. This will cause the next DoFn to set a timer as well. + o.output(KV.of("foo", 100)); + } + + @OnTimer(timerId) + public void onTimer(OnTimerContext c, BoundedWindow w) {} + }; + + DoFn<KV<String, Integer>, Integer> fn2 = + new DoFn<KV<String, Integer>, Integer>() { + + @TimerId(timerId) + private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @StateId("timerFired") + final StateSpec<ValueState<Boolean>> timerFiredState = StateSpecs.value(); + + @ProcessElement + public void processElement( + @TimerId(timerId) Timer timer, + @StateId("timerFired") ValueState<Boolean> timerFiredState) { + Boolean timerFired = timerFiredState.read(); + assertTrue(timerFired == null || !timerFired); + // Set a timer to 8. This is earlier than the previous DoFn's timer, but after the + // previous + // DoFn timer's watermark hold. This timer should not fire until the previous timer + // fires and removes + // the watermark hold. + timer.set(new Instant(8)); + } + + @OnTimer(timerId) + public void onTimer( + @StateId("timerFired") ValueState<Boolean> timerFiredState, + OutputReceiver<Integer> o) { + timerFiredState.write(true); + o.output(100); + } + }; + + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceProcessingTime(Duration.standardSeconds(1)) + // Cause fn2 to set a timer. + .addElements(KV.of("key", 1)) + // Normally this would case fn2's timer to expire, but it shouldn't here because of + // the output timestamp. + .advanceProcessingTime(Duration.standardSeconds(9)) + .advanceWatermarkTo(new Instant(11)) + // If the timer fired, then this would case fn2 to fail with an assertion error. + .addElements(KV.of("key", 1)) + .advanceProcessingTime(Duration.standardSeconds(100)) + .advanceWatermarkToInfinity(); + PCollection<Integer> output = + pipeline.apply(stream).apply("first", ParDo.of(fn1)).apply("second", ParDo.of(fn2)); + + PAssert.that(output).containsInAnyOrder(100); // result output + pipeline.run(); + } + private static class TwoTimerTest extends PTransform<PBegin, PDone> { private static PTransform<PBegin, PDone> of( diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 70d0d0a..35e08a0 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -916,7 +916,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> { this.currentOutputTimestamp = outputTime; return this; } - /** * For event time timers the target time should be prior to window GC time. So it returns * min(time to set, GC Time of window).