ReduceFnTester can advance clocks without firing timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1c1f2395 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1c1f2395 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1c1f2395 Branch: refs/heads/gearpump-runner Commit: 1c1f239501349f5120b0d619c4eea9c435500b78 Parents: d4e5db5 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 12:52:42 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnTester.java | 24 +++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1c1f2395/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 7f83eae..ab9fd6e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -420,6 +420,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { return result; } + public void advanceInputWatermarkNoTimers(Instant newInputWatermark) throws Exception { + timerInternals.advanceInputWatermark(newInputWatermark); + } + /** * Advance the input watermark to the specified time, firing any timers that should * fire. Then advance the output watermark as far as possible. @@ -451,6 +455,10 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { runner.persist(); } + public void advanceProcessingTimeNoTimers(Instant newProcessingTime) throws Exception { + timerInternals.advanceProcessingTime(newProcessingTime); + } + /** * If {@link #autoAdvanceOutputWatermark} is {@literal false}, advance the output watermark * to the given value. Otherwise throw. @@ -535,13 +543,27 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exception { ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - ArrayList timers = new ArrayList(1); + ArrayList<TimerData> timers = new ArrayList<>(1); timers.add( TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain)); runner.onTimers(timers); runner.persist(); } + public void fireTimers(W window, TimestampedValue<TimeDomain>... timers) throws Exception { + ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); + ArrayList<TimerData> timerData = new ArrayList<>(timers.length); + for (TimestampedValue<TimeDomain> timer : timers) { + timerData.add( + TimerData.of( + StateNamespaces.window(windowFn.windowCoder(), window), + timer.getTimestamp(), + timer.getValue())); + } + runner.onTimers(timerData); + runner.persist(); + } + /** * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output * elements.
