Tidy LateDataDroppingDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4e5db51 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4e5db51 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4e5db51 Branch: refs/heads/gearpump-runner Commit: d4e5db51a025a831ddf4e3bc0e003caebabf647b Parents: 497cfab Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 22 11:56:53 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:58:08 2017 -0700 ---------------------------------------------------------------------- .../core/LateDataDroppingDoFnRunner.java | 33 ++++++++++---------- 1 file changed, 17 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d4e5db51/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 1cf1509..28938c1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -134,26 +134,27 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWin // The element is too late for this window. droppedDueToLateness.inc(); WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since too far behind inputWatermark:{}; outputWatermark:{}", - input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + "{}: Dropping element at {} for key:{}; window:{} " + + "since too far behind inputWatermark:{}; outputWatermark:{}", + LateDataFilter.class.getSimpleName(), + input.getTimestamp(), + key, + window, + timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); } } - Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter( - concatElements, - new Predicate<WindowedValue<InputT>>() { - @Override - public boolean apply(WindowedValue<InputT> input) { - BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); - if (canDropDueToExpiredWindow(window)) { - return false; - } else { - return true; - } - } - }); + Iterable<WindowedValue<InputT>> nonLateElements = + Iterables.filter( + concatElements, + new Predicate<WindowedValue<InputT>>() { + @Override + public boolean apply(WindowedValue<InputT> input) { + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + return !canDropDueToExpiredWindow(window); + } + }); return nonLateElements; }
