Repository: beam Updated Branches: refs/heads/release-0.6.0 dc64c2fc0 -> ebc2ba5bf
Move GC timer checking to StatefulDoFnRunner.CleanupTimer Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a18b5b16 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a18b5b16 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a18b5b16 Branch: refs/heads/release-0.6.0 Commit: a18b5b1648489f14fd7a621f345e4d21c09b437f Parents: dc64c2f Author: Aljoscha Krettek <[email protected]> Authored: Fri Mar 10 08:29:27 2017 +0100 Committer: Ahmet Altay <[email protected]> Committed: Fri Mar 10 17:13:40 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/StatefulDoFnRunner.java | 29 ++++++++++++++++---- 1 file changed, 23 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a18b5b16/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 154d8bc..926345e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -115,15 +115,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> @Override public void onTimer( String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { - boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); - Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); - if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) { + if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) { stateCleaner.clearForWindow(window); // There should invoke the onWindowExpiration of DoFn } else { - if (isEventTimer || !dropLateData(window)) { - doFnRunner.onTimer(timerId, window, timestamp, timeDomain); - } + // a timer can never be late because we don't allow setting timers after GC time + doFnRunner.onTimer(timerId, window, timestamp, timeDomain); } } @@ -151,6 +148,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> * Set the garbage collect time of the window to timer. */ void setForWindow(BoundedWindow window); + + /** + * Checks whether the given timer is a cleanup timer for the window. + */ + boolean isForWindow( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain); + } /** @@ -191,6 +198,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); } + @Override + public boolean isForWindow( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp); + } } /**
