Drop late data in ReduceFnTester
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/412fd7ea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/412fd7ea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/412fd7ea Branch: refs/heads/gearpump-runner Commit: 412fd7eab9e58a4d412f4dff5ffec023610b4f22 Parents: 795760d Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 12:56:14 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/core/ReduceFnTester.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/412fd7ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 1fe8f73..7ca96b9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -529,8 +529,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { for (TimestampedValue<InputT> value : values) { WindowTracing.trace("TriggerTester.injectElements: {}", value); } - ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - runner.processElements( + + Iterable<WindowedValue<InputT>> inputs = Iterables.transform( Arrays.asList(values), new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() { @@ -548,7 +548,12 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { throw new RuntimeException(e); } } - })); + }); + + ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); + runner.processElements( + new LateDataDroppingDoFnRunner.LateDataFilter(objectStrategy, timerInternals) + .filter(KEY, inputs)); // Persist after each bundle. runner.persist();
