Properly deal with late processing-time timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/86522157 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/86522157 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/86522157 Branch: refs/heads/release-0.6.0 Commit: 86522157a79fd9a753436312ff8b746cb5740135 Parents: 8fa718d Author: Aljoscha Krettek <[email protected]> Authored: Fri Mar 10 15:25:26 2017 +0100 Committer: Ahmet Altay <[email protected]> Committed: Fri Mar 10 17:13:57 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/StatefulDoFnRunner.java | 40 ++++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/86522157/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index c672902..d27193c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -76,33 +76,31 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> } @Override - public void processElement(WindowedValue<InputT> compressedElem) { + public void processElement(WindowedValue<InputT> input) { // StatefulDoFnRunner always observes windows, so we need to explode - for (WindowedValue<InputT> value : compressedElem.explodeWindows()) { + for (WindowedValue<InputT> value : input.explodeWindows()) { BoundedWindow window = value.getWindows().iterator().next(); - if (!dropLateData(window)) { + if (isLate(window)) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "StatefulDoFnRunner.processElement: Dropping element at {}; window:{} " + + "since too far behind inputWatermark:{}", + input.getTimestamp(), window, cleanupTimer.currentInputWatermarkTime()); + } else { cleanupTimer.setForWindow(window); doFnRunner.processElement(value); } } } - private boolean dropLateData(BoundedWindow window) { + private boolean isLate(BoundedWindow window) { Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant inputWM = cleanupTimer.currentInputWatermarkTime(); - if (gcTime.isBefore(inputWM)) { - // The element is too late for this window. - droppedDueToLateness.addValue(1L); - WindowTracing.debug( - "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} " - + "since too far behind inputWatermark:{}", window, inputWM); - return true; - } else { - return false; - } + return gcTime.isBefore(inputWM); } @Override @@ -112,8 +110,18 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> stateCleaner.clearForWindow(window); // There should invoke the onWindowExpiration of DoFn } else { - // a timer can never be late because we don't allow setting timers after GC time - doFnRunner.onTimer(timerId, window, timestamp, timeDomain); + // An event-time timer can never be late because we don't allow setting timers after GC time. + // Ot can happen that a processing-time time fires for a late window, we need to ignore + // this. + if (!timeDomain.equals(TimeDomain.EVENT_TIME) && isLate(window)) { + // don't increment the dropped counter, only do that for elements + WindowTracing.debug( + "StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} " + + "since window is too far behind inputWatermark:{}", + timestamp, window, cleanupTimer.currentInputWatermarkTime()); + } else { + doFnRunner.onTimer(timerId, window, timestamp, timeDomain); + } } }
