Drop late data in ReduceFnTester

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/412fd7ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/412fd7ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/412fd7ea

Branch: refs/heads/gearpump-runner
Commit: 412fd7eab9e58a4d412f4dff5ffec023610b4f22
Parents: 795760d
Author: Kenneth Knowles <[email protected]>
Authored: Thu Jun 22 12:56:14 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/ReduceFnTester.java     | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/412fd7ea/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 1fe8f73..7ca96b9 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -529,8 +529,8 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
     for (TimestampedValue<InputT> value : values) {
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
-    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.processElements(
+
+    Iterable<WindowedValue<InputT>> inputs =
         Iterables.transform(
             Arrays.asList(values),
             new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
@@ -548,7 +548,12 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
                   throw new RuntimeException(e);
                 }
               }
-            }));
+            });
+
+    ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
+    runner.processElements(
+        new LateDataDroppingDoFnRunner.LateDataFilter(objectStrategy, 
timerInternals)
+            .filter(KEY, inputs));
 
     // Persist after each bundle.
     runner.persist();

Reply via email to