Repository: incubator-beam Updated Branches: refs/heads/master b2b5f429f -> de91b8014
Only remove window from active window set if it is still active Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c415be87 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c415be87 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c415be87 Branch: refs/heads/master Commit: c415be870d03fd9491982cc8e1100165e5c8323c Parents: 0442a24 Author: Mark Shields <[email protected]> Authored: Wed Mar 9 14:02:35 2016 -0800 Committer: Mark Shields <[email protected]> Committed: Wed Mar 9 14:06:36 2016 -0800 ---------------------------------------------------------------------- .../dataflow/sdk/util/MergingActiveWindowSet.java | 13 +++++-------- .../google/cloud/dataflow/sdk/util/ReduceFnRunner.java | 9 ++++++--- 2 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java index 95e378d..5af4ea5 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java @@ -72,9 +72,7 @@ import javax.annotation.Nullable; */ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWindowSet<W> { private final WindowFn<Object, W> windowFn; - - @Nullable - private Map<W, Set<W>> activeWindowToStateAddressWindows; + private final Map<W, Set<W>> activeWindowToStateAddressWindows; /** * As above, but only for EPHEMERAL windows. Does not need to be persisted. @@ -94,16 +92,14 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi * MERGED. Otherwise W1 is EPHEMERAL. * </ul> */ - @Nullable - private Map<W, W> windowToActiveWindow; + private final Map<W, W> windowToActiveWindow; /** * Deep clone of {@link #activeWindowToStateAddressWindows} as of last commit. * * <p>Used to avoid writing to state if no changes have been made during the work unit. */ - @Nullable - private Map<W, Set<W>> originalActiveWindowToStateAddressWindows; + private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows; /** * Handle representing our state in the backend. @@ -195,6 +191,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi @Override public void remove(W window) { + Preconditions.checkState(isActive(window), "Window %s is not active", window); for (W stateAddressWindow : activeWindowToStateAddressWindows.get(window)) { windowToActiveWindow.remove(stateAddressWindow); } @@ -522,7 +519,7 @@ public class MergingActiveWindowSet<W extends BoundedWindow> implements ActiveWi private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) { Map<W, Set<W>> newMultimap = new HashMap<>(); for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) { - newMultimap.put(entry.getKey(), new LinkedHashSet<W>(entry.getValue())); + newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue())); } return newMultimap; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c415be87/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 1a009bb..2b6e0d4 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -523,7 +523,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // - The trigger may implement isClosed as constant false. // - If the window function does not support windowing then all windows will be considered // active. - // So we must combine the above. + // So we must take conjunction of activeWindows and triggerRunner state. boolean windowIsActive = activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state()); @@ -602,7 +602,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { boolean windowIsActive) throws Exception { if (windowIsActive) { - // Since window is still active the trigger has not closed. + // Since window was still active the trigger may not have closed. reduceFn.clearState(renamedContext); watermarkHold.clearHolds(renamedContext); nonEmptyPanes.clearPane(renamedContext.state()); @@ -622,7 +622,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } paneInfoTracker.clear(directContext.state()); - activeWindows.remove(directContext.window()); + if (activeWindows.isActive(directContext.window())) { + // Don't need to track address state windows anymore + activeWindows.remove(directContext.window()); + } // We'll never need to test for the trigger being closed again. triggerRunner.clearFinished(directContext.state()); }
