Move ActiveWindowSet and implementations to runners-core

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bcf02986
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bcf02986
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bcf02986

Branch: refs/heads/master
Commit: bcf02986df5d7831bb6fbe4c304bef6857e395f3
Parents: cc662d6
Author: Kenneth Knowles <k...@google.com>
Authored: Mon Dec 12 20:18:02 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Fri Dec 16 13:48:37 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/ActiveWindowSet.java      | 173 ++++++++
 .../runners/core/MergingActiveWindowSet.java    | 428 +++++++++++++++++++
 .../runners/core/NonMergingActiveWindowSet.java |  91 ++++
 .../runners/core/ReduceFnContextFactory.java    |   1 -
 .../beam/runners/core/ReduceFnRunner.java       |   3 -
 .../TriggerStateMachineContextFactory.java      |   2 +-
 .../core/MergingActiveWindowSetTest.java        | 262 ++++++++++++
 .../triggers/TriggerStateMachineTester.java     |   8 +-
 .../apache/beam/sdk/util/ActiveWindowSet.java   | 173 --------
 .../beam/sdk/util/MergingActiveWindowSet.java   | 428 -------------------
 .../sdk/util/NonMergingActiveWindowSet.java     |  91 ----
 .../sdk/util/MergingActiveWindowSetTest.java    | 262 ------------
 12 files changed, 959 insertions(+), 963 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java
new file mode 100644
index 0000000..79d1f3f
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ActiveWindowSet.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+/**
+ * Track which windows are <i>active</i>, and the <i>state address 
window(s)</i> under which their
+ * state is stored. Also help with the multi-step process of merging windows 
and their associated
+ * state.
+ *
+ * <p>When windows are merged we must also merge their state. For example, we 
may need to
+ * concatenate buffered elements, sum a count of elements, or find a new 
minimum timestamp.
+ * If we start with two windows {@code Wa} and {@code Wb} and later discover 
they should be
+ * merged into window {@code Wab} then, naively, we must copy and merge the 
states of {@code Wa}
+ * and {@code Wab} into {@code Wab}.
+ *
+ * <p>However, the common case for merging windows is for a new window to be 
merged into an
+ * existing window. Thus, if {@code Wa} is the existing window and {@code Wb} 
the new window it
+ * is more efficient to leave the state for {@code Wa} where it is, and simply 
redirect {@code
+ * Wab} to it. In this case we say {@code Wab} has a state address window of 
{@code Wa}.
+ *
+ * <p>Even if windows {@code Wa} and {@code Wb} already have state, it can 
still be more efficient
+ * to append the state of {@code Wb} onto {@code Wa} rather than copy the 
state from {@code Wa}
+ * and {@code Wb} into {@code Wab}.
+ *
+ * <p>We use the following terminology for windows:
+ * <ol>
+ * <li><b>ACTIVE</b>: A window that has state associated with it and has not 
itself been merged
+ * away. The window may have one (or more) state address windows under which 
its
+ * non-empty state is stored. A state value for an ACTIVE window must be 
derived by reading
+ * the state in (all of) its state address windows. Note that only pre 1.4 
pipelines
+ * use multiple state address windows per active window. From 1.4 onwards we 
eagerly merge
+ * window state into a single state address window.
+ * <li><b>NEW</b>: The initial state for a window of an incoming element which 
is not
+ * already ACTIVE. We have not yet called {@link WindowFn#mergeWindows}, and 
so don't yet know
+ * whether the window will be be merged into another NEW or ACTIVE window, or 
will
+ * become an ACTIVE window in its own right.
+ * </ol>
+ *
+ * <p>If no windows will ever be merged we can use the trivial implementation 
{@link
+ * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this 
data structure is in
+ * {@link MergingActiveWindowSet}.
+ *
+ * @param <W> the type of window being managed
+ */
+public interface ActiveWindowSet<W extends BoundedWindow> {
+  /**
+   * Callback for {@link #merge}.
+   */
+  public interface MergeCallback<W extends BoundedWindow> {
+    /**
+     * Called when windows are about to be merged, but before any {@link 
#onMerge} callback
+     * has been made.
+     *
+     * @param toBeMerged  the windows about to be merged.
+     * @param mergeResult the result window, either a member of {@code 
toBeMerged} or new.
+     */
+    void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception;
+
+    /**
+     * Called when windows are about to be merged, after all {@link 
#prefetchOnMerge} calls
+     * have been made, but before the active window set has been updated to 
reflect the merge.
+     *
+     * @param toBeMerged  the windows about to be merged.
+     * @param mergeResult the result window, either a member of {@code 
toBeMerged} or new.
+     */
+    void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception;
+  }
+
+  /**
+   * Remove any remaining NEW windows since they were not promoted to being 
ACTIVE
+   * by {@link #ensureWindowIsActive} and we don't need to record anything 
about them.
+   */
+  void cleanupTemporaryWindows();
+
+  /**
+   * Save any state changes needed.
+   */
+  void persist();
+
+  /**
+   * Return (a view of) the set of currently ACTIVE and NEW windows.
+   */
+  Set<W> getActiveAndNewWindows();
+
+  /**
+   * Return {@code true} if {@code window} is ACTIVE.
+   */
+  boolean isActive(W window);
+
+  /**
+   * Return {@code true} if {@code window} is ACTIVE or NEW.
+   */
+  boolean isActiveOrNew(W window);
+
+  /**
+   * Called when an incoming element indicates it is a member of {@code 
window}, but before we
+   * have started processing that element. If {@code window} is not already 
known to be ACTIVE,
+   * then add it as NEW.
+   */
+  void ensureWindowExists(W window);
+
+  /**
+   * Called when a NEW or ACTIVE window is now known to be ACTIVE.
+   * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only 
state address
+   * window).
+   */
+  void ensureWindowIsActive(W window);
+
+  /**
+   * If {@code window} is not already known to be ACTIVE then add it as ACTIVE.
+   * For testing only.
+   */
+  @VisibleForTesting
+  void addActiveForTesting(W window);
+
+  /**
+   * Remove {@code window} from the set.
+   */
+  void remove(W window);
+
+  /**
+   * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated 
with this window set,
+   * merging as many of the NEW and ACTIVE windows as possible. {@code 
mergeCallback} will be
+   * invoked for each group of windows that are merged. After this all merge 
result windows will
+   * be ACTIVE, and all windows which have been merged away will be neither 
ACTIVE nor NEW.
+   */
+  void merge(MergeCallback<W> mergeCallback) throws Exception;
+
+  /**
+   * Signal that all state in {@link #readStateAddresses} for {@code window} 
has been merged into
+   * the {@link #writeStateAddress} for {@code window}.
+   */
+  void merged(W window);
+
+  /**
+   * Return the state address windows for ACTIVE {@code window} from which all 
state associated
+   * should be read and merged.
+   */
+  Set<W> readStateAddresses(W window);
+
+  /**
+   * Return the state address window of ACTIVE {@code window} into which all 
new state should be
+   * written. Always one of the results of {@link #readStateAddresses}.
+   */
+  W writeStateAddress(W window);
+
+  /**
+   * Return the state address window into which all new state should be 
written after
+   * ACTIVE windows {@code toBeMerged} have been merged into {@code 
mergeResult}.
+   */
+  W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
new file mode 100644
index 0000000..720377a
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.SetCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+
+/**
+ * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
+ */
+public class MergingActiveWindowSet<W extends BoundedWindow> implements 
ActiveWindowSet<W> {
+  private final WindowFn<Object, W> windowFn;
+
+  /**
+   * Map ACTIVE and NEW windows to their state address windows. Persisted.
+   * <ul>
+   * <li>A NEW window has the empty set as its value.
+   * <li>An ACTIVE window has its (typically singleton) set of state address 
windows as
+   * its value.
+   * </ul>
+   */
+  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
+
+  /**
+   * Deep clone of {@link #activeWindowToStateAddressWindows} as of last 
commit.
+   * Used to avoid writing to state if no changes have been made during the 
work unit.
+   */
+  private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
+
+  /**
+   * Handle representing our state in the backend.
+   */
+  private final ValueState<Map<W, Set<W>>> valueState;
+
+  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, 
StateInternals<?> state) {
+    this.windowFn = windowFn;
+
+    StateTag<Object, ValueState<Map<W, Set<W>>>> tag =
+        StateTags.makeSystemTagInternal(StateTags.value(
+            "tree", MapCoder.of(windowFn.windowCoder(), 
SetCoder.of(windowFn.windowCoder()))));
+    valueState = state.state(StateNamespaces.global(), tag);
+    // Little use trying to prefetch this state since the ReduceFnRunner
+    // is stymied until it is available.
+    activeWindowToStateAddressWindows = emptyIfNull(valueState.read());
+    originalActiveWindowToStateAddressWindows = 
deepCopy(activeWindowToStateAddressWindows);
+  }
+
+  @Override
+  public void cleanupTemporaryWindows() {
+    // All NEW windows can be forgotten since they must have ended up being 
merged into
+    // some other ACTIVE window.
+    Iterator<Map.Entry<W, Set<W>>> iter = 
activeWindowToStateAddressWindows.entrySet().iterator();
+    while (iter.hasNext()) {
+      Map.Entry<W, Set<W>> entry = iter.next();
+      if (entry.getValue().isEmpty()) {
+        iter.remove();
+      }
+    }
+  }
+
+  @Override
+  public void persist() {
+    checkInvariants();
+    if (activeWindowToStateAddressWindows.isEmpty()) {
+      // Force all persistent state to disappear.
+      valueState.clear();
+      return;
+    }
+    if 
(activeWindowToStateAddressWindows.equals(originalActiveWindowToStateAddressWindows))
 {
+      // No change.
+      return;
+    }
+    valueState.write(activeWindowToStateAddressWindows);
+    // No need to update originalActiveWindowToStateAddressWindows since this 
object is about to
+    // become garbage.
+  }
+
+  @Override
+  public Set<W> getActiveAndNewWindows() {
+    return activeWindowToStateAddressWindows.keySet();
+  }
+
+  @Override
+  public boolean isActive(W window) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    return stateAddressWindows != null && !stateAddressWindows.isEmpty();
+  }
+
+  @Override
+  public boolean isActiveOrNew(W window) {
+    return activeWindowToStateAddressWindows.containsKey(window);
+  }
+
+  @Override
+  public void ensureWindowExists(W window) {
+    if (!activeWindowToStateAddressWindows.containsKey(window)) {
+      // Add window as NEW.
+      activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
+    }
+  }
+
+  @Override
+  public void ensureWindowIsActive(W window) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    checkState(stateAddressWindows != null,
+                             "Cannot ensure window %s is active since it is 
neither ACTIVE nor NEW",
+                             window);
+    if (stateAddressWindows != null && stateAddressWindows.isEmpty()) {
+      // Window was NEW, make it ACTIVE with itself as its state address 
window.
+      stateAddressWindows.add(window);
+    }
+  }
+
+  @Override
+  @VisibleForTesting
+  public void addActiveForTesting(W window) {
+    if (!activeWindowToStateAddressWindows.containsKey(window)) {
+      // Make window ACTIVE with itself as its state address window.
+      Set<W> stateAddressWindows = new LinkedHashSet<>();
+      stateAddressWindows.add(window);
+      activeWindowToStateAddressWindows.put(window, stateAddressWindows);
+    }
+  }
+
+  @VisibleForTesting
+  public void addActiveForTesting(W window, Iterable<W> stateAddressWindows) {
+    if (!activeWindowToStateAddressWindows.containsKey(window)) {
+      activeWindowToStateAddressWindows.put(window, 
Sets.newLinkedHashSet(stateAddressWindows));
+    }
+  }
+
+  @Override
+  public void remove(W window) {
+    activeWindowToStateAddressWindows.remove(window);
+  }
+
+  private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
+    private MergeCallback<W> mergeCallback;
+    private final List<Collection<W>> allToBeMerged;
+    private final List<W> allMergeResults;
+    private final Set<W> seen;
+
+    public MergeContextImpl(MergeCallback<W> mergeCallback) {
+      windowFn.super();
+      this.mergeCallback = mergeCallback;
+      allToBeMerged = new ArrayList<>();
+      allMergeResults = new ArrayList<>();
+      seen = new HashSet<>();
+    }
+
+    @Override
+    public Collection<W> windows() {
+      return activeWindowToStateAddressWindows.keySet();
+    }
+
+    @Override
+    public void merge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+      // The arguments have come from userland.
+      checkNotNull(toBeMerged);
+      checkNotNull(mergeResult);
+      List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size());
+      boolean includesMergeResult = false;
+      for (W window : toBeMerged) {
+        checkNotNull(window);
+        checkState(isActiveOrNew(window), "Expecting merge window %s to be 
ACTIVE or NEW", window);
+        if (window.equals(mergeResult)) {
+          includesMergeResult = true;
+        }
+        boolean notDup = seen.add(window);
+        checkState(notDup, "Expecting merge window %s to appear in at most one 
merge set", window);
+        copyOfToBeMerged.add(window);
+      }
+      if (!includesMergeResult) {
+        checkState(!isActive(mergeResult), "Expecting result window %s to be 
NEW", mergeResult);
+      }
+      allToBeMerged.add(copyOfToBeMerged);
+      allMergeResults.add(mergeResult);
+    }
+
+    public void recordMerges() throws Exception {
+      for (int i = 0; i < allToBeMerged.size(); i++) {
+        mergeCallback.prefetchOnMerge(allToBeMerged.get(i), 
allMergeResults.get(i));
+      }
+      for (int i = 0; i < allToBeMerged.size(); i++) {
+        mergeCallback.onMerge(allToBeMerged.get(i), allMergeResults.get(i));
+        recordMerge(allToBeMerged.get(i), allMergeResults.get(i));
+      }
+      allToBeMerged.clear();
+      allMergeResults.clear();
+      seen.clear();
+    }
+  }
+
+  @Override
+  public void merge(MergeCallback<W> mergeCallback) throws Exception {
+    MergeContextImpl context = new MergeContextImpl(mergeCallback);
+
+    // See what the window function does with the NEW and already ACTIVE 
windows.
+    // Entering userland.
+    windowFn.mergeWindows(context);
+
+    // Actually do the merging and invoke the callbacks.
+    context.recordMerges();
+  }
+
+  /**
+   * A {@link WindowFn#mergeWindows} call has determined that {@code 
toBeMerged} (which must
+   * all be ACTIVE}) should be considered equivalent to {@code activeWindow} 
(which is either a
+   * member of {@code toBeMerged} or is a new window). Make the corresponding 
change in
+   * the active window set.
+   */
+  private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
+    // Note that mergedWriteStateAddress must predict the result of 
writeStateAddress
+    // after the corresponding merge has been applied.
+    // Thus we must ensure the first state address window in the merged result 
here is
+    // deterministic.
+    // Thus we use a linked hash set.
+    Set<W> newStateAddressWindows = new LinkedHashSet<>();
+    Set<W> existingStateAddressWindows = 
activeWindowToStateAddressWindows.get(mergeResult);
+    if (existingStateAddressWindows != null) {
+      // Preserve all the existing state address windows for mergeResult.
+      newStateAddressWindows.addAll(existingStateAddressWindows);
+    }
+
+    for (W other : toBeMerged) {
+      Set<W> otherStateAddressWindows = 
activeWindowToStateAddressWindows.get(other);
+      checkState(otherStateAddressWindows != null,
+                               "Window %s is not ACTIVE or NEW", other);
+
+      if (otherStateAddressWindows != null) {
+        for (W otherStateAddressWindow : otherStateAddressWindows) {
+          // Since otherTarget equiv other AND other equiv mergeResult
+          // THEN otherTarget equiv mergeResult.
+          newStateAddressWindows.add(otherStateAddressWindow);
+        }
+      }
+      activeWindowToStateAddressWindows.remove(other);
+
+      // Now other equiv mergeResult.
+    }
+
+    if (newStateAddressWindows.isEmpty()) {
+      // If stateAddressWindows is empty then toBeMerged must have only 
contained EPHEMERAL windows.
+      // Promote mergeResult to be ACTIVE now.
+      newStateAddressWindows.add(mergeResult);
+    }
+
+    activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows);
+
+    merged(mergeResult);
+  }
+
+  @Override
+  public void merged(W window) {
+    // Take just the first state address window.
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
+    W first = Iterables.getFirst(stateAddressWindows, null);
+    stateAddressWindows.clear();
+    stateAddressWindows.add(first);
+  }
+
+  /**
+   * Return the state address windows for ACTIVE {@code window} from which all 
state associated
+   * should be read and merged.
+   */
+  @Override
+  public Set<W> readStateAddresses(W window) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
+    return stateAddressWindows;
+  }
+
+  /**
+   * Return the state address window of ACTIVE {@code window} into which all 
new state should be
+   * written.
+   */
+  @Override
+  public W writeStateAddress(W window) {
+    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
+    checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
+    W result = Iterables.getFirst(stateAddressWindows, null);
+    checkState(result != null, "Window %s is still NEW", window);
+    return result;
+  }
+
+  @Override
+  public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
+    Set<W> stateAddressWindows = 
activeWindowToStateAddressWindows.get(mergeResult);
+    if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
+      return Iterables.getFirst(stateAddressWindows, null);
+    }
+    for (W mergedWindow : toBeMerged) {
+      stateAddressWindows = 
activeWindowToStateAddressWindows.get(mergedWindow);
+      if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
+        return Iterables.getFirst(stateAddressWindows, null);
+      }
+    }
+    return mergeResult;
+  }
+
+  @VisibleForTesting
+  public void checkInvariants() {
+    Set<W> knownStateAddressWindows = new HashSet<>();
+    for (Map.Entry<W, Set<W>> entry : 
activeWindowToStateAddressWindows.entrySet()) {
+      W active = entry.getKey();
+      checkState(!entry.getValue().isEmpty(),
+                               "Unexpected empty state address window set for 
ACTIVE window %s",
+                               active);
+      for (W stateAddressWindow : entry.getValue()) {
+        checkState(knownStateAddressWindows.add(stateAddressWindow),
+                                 "%s is in more than one state address window 
set",
+                                 stateAddressWindow);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("MergingActiveWindowSet {\n");
+    for (Map.Entry<W, Set<W>> entry : 
activeWindowToStateAddressWindows.entrySet()) {
+      W active = entry.getKey();
+      Set<W> stateAddressWindows = entry.getValue();
+      if (stateAddressWindows.isEmpty()) {
+        sb.append("  NEW ");
+        sb.append(active);
+        sb.append('\n');
+      } else {
+        sb.append("  ACTIVE ");
+        sb.append(active);
+        sb.append(":\n");
+        for (W stateAddressWindow : stateAddressWindows) {
+          sb.append("    ");
+          sb.append(stateAddressWindow);
+          sb.append("\n");
+        }
+      }
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof MergingActiveWindowSet)) {
+      return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    MergingActiveWindowSet<W> other = (MergingActiveWindowSet<W>) o;
+
+    return 
activeWindowToStateAddressWindows.equals(other.activeWindowToStateAddressWindows);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(activeWindowToStateAddressWindows);
+  }
+
+  /**
+   * Replace null {@code multimap} with empty map, and replace null entries in 
{@code multimap}
+   * with
+   * empty sets.
+   */
+  private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> 
multimap) {
+    if (multimap == null) {
+      return new HashMap<>();
+    } else {
+      for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
+        if (entry.getValue() == null) {
+          entry.setValue(new LinkedHashSet<W>());
+        }
+      }
+      return multimap;
+    }
+  }
+
+  /**
+   * Return a deep copy of {@code multimap}.
+   */
+  private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
+    Map<W, Set<W>> newMultimap = new HashMap<>();
+    for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
+      newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
+    }
+    return newMultimap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java
new file mode 100644
index 0000000..fec6c45
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonMergingActiveWindowSet.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+/**
+ * Implementation of {@link ActiveWindowSet} used with {@link WindowFn 
WindowFns} that don't
+ * support
+ * merging.
+ *
+ * @param <W> the types of windows being managed
+ */
+public class NonMergingActiveWindowSet<W extends BoundedWindow> implements 
ActiveWindowSet<W> {
+  @Override
+  public void cleanupTemporaryWindows() {}
+
+  @Override
+  public void persist() {}
+
+  @Override
+  public Set<W> getActiveAndNewWindows() {
+    // Only supported when merging.
+    throw new java.lang.UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isActive(W window) {
+    // Windows should never disappear, since we don't support merging.
+    return true;
+  }
+
+  @Override
+  public boolean isActiveOrNew(W window) {
+    return true;
+  }
+
+  @Override
+  public void ensureWindowExists(W window) {}
+
+  @Override
+  public void ensureWindowIsActive(W window) {}
+
+  @Override
+  @VisibleForTesting
+  public void addActiveForTesting(W window) {}
+
+  @Override
+  public void remove(W window) {}
+
+  @Override
+  public void merge(MergeCallback<W> mergeCallback) throws Exception {}
+
+  @Override
+  public void merged(W window) {}
+
+  @Override
+  public Set<W> readStateAddresses(W window) {
+    return ImmutableSet.of(window);
+  }
+
+  @Override
+  public W writeStateAddress(W window) {
+    return window;
+  }
+
+  @Override
+  public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
+    return mergeResult;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index c71897d..eae1a8b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.ActiveWindowSet;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 6f7bbcf..96e76b7 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -50,9 +50,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet;
-import org.apache.beam.sdk.util.MergingActiveWindowSet;
-import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index 1c06e8d..e3df4ee 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -24,12 +24,12 @@ import com.google.common.collect.Iterables;
 import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ActiveWindowSet;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
new file mode 100644
index 0000000..a4928e3
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test NonMergingActiveWindowSet.
+ */
+@RunWith(JUnit4.class)
+public class MergingActiveWindowSetTest {
+  private Sessions windowFn;
+  private StateInternals<String> state;
+  private MergingActiveWindowSet<IntervalWindow> set;
+  private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
+
+  @Before
+  public void setup() {
+    windowFn = Sessions.withGapDuration(Duration.millis(10));
+    state = InMemoryStateInternals.forKey("dummyKey");
+    set = new MergingActiveWindowSet<>(windowFn, state);
+    @SuppressWarnings("unchecked")
+    ActiveWindowSet.MergeCallback<IntervalWindow>
+        callback = mock(ActiveWindowSet.MergeCallback.class);
+    this.callback = callback;
+  }
+
+  @After
+  public void after() {
+    set = null;
+    state = null;
+    windowFn = null;
+  }
+
+  private void add(long... instants) {
+    for (final long instant : instants) {
+      System.out.println("ADD " + instant);
+      Sessions.AssignContext context = windowFn.new AssignContext() {
+        @Override
+        public Object element() {
+          return (Object) instant;
+        }
+
+        @Override
+        public Instant timestamp() {
+          return new Instant(instant);
+        }
+
+        @Override
+        public BoundedWindow window() {
+          return GlobalWindow.INSTANCE;
+        }
+      };
+
+      for (IntervalWindow window : windowFn.assignWindows(context)) {
+        set.ensureWindowExists(window);
+      }
+    }
+  }
+
+  private Map<IntervalWindow, IntervalWindow> merge(
+      List<IntervalWindow> toBeMerged,
+      IntervalWindow mergeResult) throws Exception {
+    IntervalWindow predictedPostMergeWriteStateAddress =
+        set.mergedWriteStateAddress(toBeMerged, mergeResult);
+
+    System.out.println("BEFORE MERGE");
+    System.out.println(set);
+    Map<IntervalWindow, IntervalWindow> map = new HashMap<>();
+    for (IntervalWindow window : toBeMerged) {
+      System.out.println("WILL MERGE " + window + " INTO " + mergeResult);
+      map.put(window, mergeResult);
+    }
+    System.out.println("AFTER MERGE");
+    set.merge(callback);
+    verify(callback).onMerge(toBeMerged, mergeResult);
+    System.out.println(set);
+
+    assertEquals(predictedPostMergeWriteStateAddress, 
set.writeStateAddress(mergeResult));
+
+    return map;
+  }
+
+  private void activate(Map<IntervalWindow, IntervalWindow> map, long... 
instants) {
+    for (long instant : instants) {
+      IntervalWindow window = window(instant, 10);
+      IntervalWindow active = map.get(window);
+      if (active == null) {
+        active = window;
+      }
+      System.out.println("ACTIVATE " + active);
+      set.ensureWindowIsActive(active);
+    }
+    set.checkInvariants();
+  }
+
+  private void cleanup() {
+    System.out.println("CLEANUP");
+    set.cleanupTemporaryWindows();
+    set.checkInvariants();
+    System.out.println(set);
+    set.persist();
+    MergingActiveWindowSet<IntervalWindow> reloaded =
+        new MergingActiveWindowSet<>(windowFn, state);
+    reloaded.checkInvariants();
+    assertEquals(set, reloaded);
+  }
+
+  private IntervalWindow window(long start, long size) {
+    return new IntervalWindow(new Instant(start), new Duration(size));
+  }
+
+  @Test
+  public void testLifecycle() throws Exception {
+    // Step 1: New elements show up, introducing NEW windows which are 
partially merged.
+    // NEW 1+10
+    // NEW 2+10
+    // NEW 15+10
+    // =>
+    // ACTIVE 1+11 (target 1+11)
+    // ACTIVE 15+10 (target 15+10)
+    add(1, 2, 15);
+    assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(15, 10)),
+                 set.getActiveAndNewWindows());
+    Map<IntervalWindow, IntervalWindow> map =
+        merge(ImmutableList.of(window(1, 10), window(2, 10)),
+              window(1, 11));
+    activate(map, 1, 2, 15);
+    assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), 
set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 11)));
+    assertEquals(
+        ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 
10)));
+    cleanup();
+
+    // Step 2: Another element, merged into an existing ACTIVE window.
+    // NEW 3+10
+    // =>
+    // ACTIVE 1+12 (target 1+11)
+    // ACTIVE 15+10 (target 15+10)
+    add(3);
+    assertEquals(ImmutableSet.of(window(3, 10), window(1, 11), window(15, 10)),
+                 set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 11), window(3, 10)),
+                window(1, 12));
+    activate(map, 3);
+    assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), 
set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 12)));
+    assertEquals(
+        ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 
10)));
+    cleanup();
+
+    // Step 3: Another element, causing two ACTIVE windows to be merged.
+    // NEW 8+10
+    // =>
+    // ACTIVE 1+24 (target 1+11)
+    add(8);
+    assertEquals(ImmutableSet.of(window(8, 10), window(1, 12), window(15, 10)),
+                 set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)),
+                window(1, 24));
+    activate(map, 8);
+    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
+    cleanup();
+
+    // Step 4: Another element, merged into an existing ACTIVE window.
+    // NEW 9+10
+    // =>
+    // ACTIVE 1+24 (target 1+11)
+    add(9);
+    assertEquals(ImmutableSet.of(window(9, 10), window(1, 24)), 
set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 24), window(9, 10)),
+                window(1, 24));
+    activate(map, 9);
+    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
+    cleanup();
+
+    // Step 5: Another element reusing earlier window, merged into an existing 
ACTIVE window.
+    // NEW 1+10
+    // =>
+    // ACTIVE 1+24 (target 1+11)
+    add(1);
+    assertEquals(ImmutableSet.of(window(1, 10), window(1, 24)), 
set.getActiveAndNewWindows());
+    map = merge(ImmutableList.of(window(1, 10), window(1, 24)),
+                window(1, 24));
+    activate(map, 1);
+    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
+    assertEquals(
+        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
+    cleanup();
+
+    // Step 6: Window is closed.
+    set.remove(window(1, 24));
+    cleanup();
+    assertTrue(set.getActiveAndNewWindows().isEmpty());
+  }
+
+  @Test
+  public void testLegacyState() {
+    // Pre 1.4 we merged window state lazily.
+    // Simulate loading an active window set with multiple state address 
windows.
+    set.addActiveForTesting(window(1, 12),
+                            ImmutableList.of(window(1, 10), window(2, 10), 
window(3, 10)));
+
+
+    // Make sure we can detect and repair the state.
+    assertTrue(set.isActive(window(1, 12)));
+    assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(3, 10)),
+                 set.readStateAddresses(window(1, 12)));
+    assertEquals(window(1, 10),
+                 set.mergedWriteStateAddress(
+                     ImmutableList.of(window(1, 10), window(2, 10), window(3, 
10)),
+                     window(1, 12)));
+    set.merged(window(1, 12));
+    cleanup();
+
+    // For then on we are back to the eager case.
+    assertEquals(ImmutableSet.of(window(1, 10)), 
set.readStateAddresses(window(1, 12)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 8969405..2a626d4 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -32,15 +32,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.ActiveWindowSet;
+import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.MergingActiveWindowSet;
+import org.apache.beam.runners.core.NonMergingActiveWindowSet;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet;
-import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
-import org.apache.beam.sdk.util.MergingActiveWindowSet;
-import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.Timers;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
deleted file mode 100644
index 2e0af29..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ActiveWindowSet.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.Collection;
-import java.util.Set;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-
-/**
- * Track which windows are <i>active</i>, and the <i>state address 
window(s)</i> under which their
- * state is stored. Also help with the multi-step process of merging windows 
and their associated
- * state.
- *
- * <p>When windows are merged we must also merge their state. For example, we 
may need to
- * concatenate buffered elements, sum a count of elements, or find a new 
minimum timestamp.
- * If we start with two windows {@code Wa} and {@code Wb} and later discover 
they should be
- * merged into window {@code Wab} then, naively, we must copy and merge the 
states of {@code Wa}
- * and {@code Wab} into {@code Wab}.
- *
- * <p>However, the common case for merging windows is for a new window to be 
merged into an
- * existing window. Thus, if {@code Wa} is the existing window and {@code Wb} 
the new window it
- * is more efficient to leave the state for {@code Wa} where it is, and simply 
redirect {@code
- * Wab} to it. In this case we say {@code Wab} has a state address window of 
{@code Wa}.
- *
- * <p>Even if windows {@code Wa} and {@code Wb} already have state, it can 
still be more efficient
- * to append the state of {@code Wb} onto {@code Wa} rather than copy the 
state from {@code Wa}
- * and {@code Wb} into {@code Wab}.
- *
- * <p>We use the following terminology for windows:
- * <ol>
- * <li><b>ACTIVE</b>: A window that has state associated with it and has not 
itself been merged
- * away. The window may have one (or more) state address windows under which 
its
- * non-empty state is stored. A state value for an ACTIVE window must be 
derived by reading
- * the state in (all of) its state address windows. Note that only pre 1.4 
pipelines
- * use multiple state address windows per active window. From 1.4 onwards we 
eagerly merge
- * window state into a single state address window.
- * <li><b>NEW</b>: The initial state for a window of an incoming element which 
is not
- * already ACTIVE. We have not yet called {@link WindowFn#mergeWindows}, and 
so don't yet know
- * whether the window will be be merged into another NEW or ACTIVE window, or 
will
- * become an ACTIVE window in its own right.
- * </ol>
- *
- * <p>If no windows will ever be merged we can use the trivial implementation 
{@link
- * NonMergingActiveWindowSet}. Otherwise, the actual implementation of this 
data structure is in
- * {@link MergingActiveWindowSet}.
- *
- * @param <W> the type of window being managed
- */
-public interface ActiveWindowSet<W extends BoundedWindow> {
-  /**
-   * Callback for {@link #merge}.
-   */
-  public interface MergeCallback<W extends BoundedWindow> {
-    /**
-     * Called when windows are about to be merged, but before any {@link 
#onMerge} callback
-     * has been made.
-     *
-     * @param toBeMerged  the windows about to be merged.
-     * @param mergeResult the result window, either a member of {@code 
toBeMerged} or new.
-     */
-    void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception;
-
-    /**
-     * Called when windows are about to be merged, after all {@link 
#prefetchOnMerge} calls
-     * have been made, but before the active window set has been updated to 
reflect the merge.
-     *
-     * @param toBeMerged  the windows about to be merged.
-     * @param mergeResult the result window, either a member of {@code 
toBeMerged} or new.
-     */
-    void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception;
-  }
-
-  /**
-   * Remove any remaining NEW windows since they were not promoted to being 
ACTIVE
-   * by {@link #ensureWindowIsActive} and we don't need to record anything 
about them.
-   */
-  void cleanupTemporaryWindows();
-
-  /**
-   * Save any state changes needed.
-   */
-  void persist();
-
-  /**
-   * Return (a view of) the set of currently ACTIVE and NEW windows.
-   */
-  Set<W> getActiveAndNewWindows();
-
-  /**
-   * Return {@code true} if {@code window} is ACTIVE.
-   */
-  boolean isActive(W window);
-
-  /**
-   * Return {@code true} if {@code window} is ACTIVE or NEW.
-   */
-  boolean isActiveOrNew(W window);
-
-  /**
-   * Called when an incoming element indicates it is a member of {@code 
window}, but before we
-   * have started processing that element. If {@code window} is not already 
known to be ACTIVE,
-   * then add it as NEW.
-   */
-  void ensureWindowExists(W window);
-
-  /**
-   * Called when a NEW or ACTIVE window is now known to be ACTIVE.
-   * Ensure that if it is NEW then it becomes ACTIVE (with itself as its only 
state address
-   * window).
-   */
-  void ensureWindowIsActive(W window);
-
-  /**
-   * If {@code window} is not already known to be ACTIVE then add it as ACTIVE.
-   * For testing only.
-   */
-  @VisibleForTesting
-  void addActiveForTesting(W window);
-
-  /**
-   * Remove {@code window} from the set.
-   */
-  void remove(W window);
-
-  /**
-   * Invoke {@link WindowFn#mergeWindows} on the {@code WindowFn} associated 
with this window set,
-   * merging as many of the NEW and ACTIVE windows as possible. {@code 
mergeCallback} will be
-   * invoked for each group of windows that are merged. After this all merge 
result windows will
-   * be ACTIVE, and all windows which have been merged away will be neither 
ACTIVE nor NEW.
-   */
-  void merge(MergeCallback<W> mergeCallback) throws Exception;
-
-  /**
-   * Signal that all state in {@link #readStateAddresses} for {@code window} 
has been merged into
-   * the {@link #writeStateAddress} for {@code window}.
-   */
-  void merged(W window);
-
-  /**
-   * Return the state address windows for ACTIVE {@code window} from which all 
state associated
-   * should be read and merged.
-   */
-  Set<W> readStateAddresses(W window);
-
-  /**
-   * Return the state address window of ACTIVE {@code window} into which all 
new state should be
-   * written. Always one of the results of {@link #readStateAddresses}.
-   */
-  W writeStateAddress(W window);
-
-  /**
-   * Return the state address window into which all new state should be 
written after
-   * ACTIVE windows {@code toBeMerged} have been merged into {@code 
mergeResult}.
-   */
-  W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
deleted file mode 100644
index 066579b..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MergingActiveWindowSet.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.SetCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-
-/**
- * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations.
- */
-public class MergingActiveWindowSet<W extends BoundedWindow> implements 
ActiveWindowSet<W> {
-  private final WindowFn<Object, W> windowFn;
-
-  /**
-   * Map ACTIVE and NEW windows to their state address windows. Persisted.
-   * <ul>
-   * <li>A NEW window has the empty set as its value.
-   * <li>An ACTIVE window has its (typically singleton) set of state address 
windows as
-   * its value.
-   * </ul>
-   */
-  private final Map<W, Set<W>> activeWindowToStateAddressWindows;
-
-  /**
-   * Deep clone of {@link #activeWindowToStateAddressWindows} as of last 
commit.
-   * Used to avoid writing to state if no changes have been made during the 
work unit.
-   */
-  private final Map<W, Set<W>> originalActiveWindowToStateAddressWindows;
-
-  /**
-   * Handle representing our state in the backend.
-   */
-  private final ValueState<Map<W, Set<W>>> valueState;
-
-  public MergingActiveWindowSet(WindowFn<Object, W> windowFn, 
StateInternals<?> state) {
-    this.windowFn = windowFn;
-
-    StateTag<Object, ValueState<Map<W, Set<W>>>> tag =
-        StateTags.makeSystemTagInternal(StateTags.value(
-            "tree", MapCoder.of(windowFn.windowCoder(), 
SetCoder.of(windowFn.windowCoder()))));
-    valueState = state.state(StateNamespaces.global(), tag);
-    // Little use trying to prefetch this state since the ReduceFnRunner
-    // is stymied until it is available.
-    activeWindowToStateAddressWindows = emptyIfNull(valueState.read());
-    originalActiveWindowToStateAddressWindows = 
deepCopy(activeWindowToStateAddressWindows);
-  }
-
-  @Override
-  public void cleanupTemporaryWindows() {
-    // All NEW windows can be forgotten since they must have ended up being 
merged into
-    // some other ACTIVE window.
-    Iterator<Map.Entry<W, Set<W>>> iter = 
activeWindowToStateAddressWindows.entrySet().iterator();
-    while (iter.hasNext()) {
-      Map.Entry<W, Set<W>> entry = iter.next();
-      if (entry.getValue().isEmpty()) {
-        iter.remove();
-      }
-    }
-  }
-
-  @Override
-  public void persist() {
-    checkInvariants();
-    if (activeWindowToStateAddressWindows.isEmpty()) {
-      // Force all persistent state to disappear.
-      valueState.clear();
-      return;
-    }
-    if 
(activeWindowToStateAddressWindows.equals(originalActiveWindowToStateAddressWindows))
 {
-      // No change.
-      return;
-    }
-    valueState.write(activeWindowToStateAddressWindows);
-    // No need to update originalActiveWindowToStateAddressWindows since this 
object is about to
-    // become garbage.
-  }
-
-  @Override
-  public Set<W> getActiveAndNewWindows() {
-    return activeWindowToStateAddressWindows.keySet();
-  }
-
-  @Override
-  public boolean isActive(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    return stateAddressWindows != null && !stateAddressWindows.isEmpty();
-  }
-
-  @Override
-  public boolean isActiveOrNew(W window) {
-    return activeWindowToStateAddressWindows.containsKey(window);
-  }
-
-  @Override
-  public void ensureWindowExists(W window) {
-    if (!activeWindowToStateAddressWindows.containsKey(window)) {
-      // Add window as NEW.
-      activeWindowToStateAddressWindows.put(window, new LinkedHashSet<W>());
-    }
-  }
-
-  @Override
-  public void ensureWindowIsActive(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    checkState(stateAddressWindows != null,
-                             "Cannot ensure window %s is active since it is 
neither ACTIVE nor NEW",
-                             window);
-    if (stateAddressWindows != null && stateAddressWindows.isEmpty()) {
-      // Window was NEW, make it ACTIVE with itself as its state address 
window.
-      stateAddressWindows.add(window);
-    }
-  }
-
-  @Override
-  @VisibleForTesting
-  public void addActiveForTesting(W window) {
-    if (!activeWindowToStateAddressWindows.containsKey(window)) {
-      // Make window ACTIVE with itself as its state address window.
-      Set<W> stateAddressWindows = new LinkedHashSet<>();
-      stateAddressWindows.add(window);
-      activeWindowToStateAddressWindows.put(window, stateAddressWindows);
-    }
-  }
-
-  @VisibleForTesting
-  public void addActiveForTesting(W window, Iterable<W> stateAddressWindows) {
-    if (!activeWindowToStateAddressWindows.containsKey(window)) {
-      activeWindowToStateAddressWindows.put(window, 
Sets.newLinkedHashSet(stateAddressWindows));
-    }
-  }
-
-  @Override
-  public void remove(W window) {
-    activeWindowToStateAddressWindows.remove(window);
-  }
-
-  private class MergeContextImpl extends WindowFn<Object, W>.MergeContext {
-    private MergeCallback<W> mergeCallback;
-    private final List<Collection<W>> allToBeMerged;
-    private final List<W> allMergeResults;
-    private final Set<W> seen;
-
-    public MergeContextImpl(MergeCallback<W> mergeCallback) {
-      windowFn.super();
-      this.mergeCallback = mergeCallback;
-      allToBeMerged = new ArrayList<>();
-      allMergeResults = new ArrayList<>();
-      seen = new HashSet<>();
-    }
-
-    @Override
-    public Collection<W> windows() {
-      return activeWindowToStateAddressWindows.keySet();
-    }
-
-    @Override
-    public void merge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
-      // The arguments have come from userland.
-      checkNotNull(toBeMerged);
-      checkNotNull(mergeResult);
-      List<W> copyOfToBeMerged = new ArrayList<>(toBeMerged.size());
-      boolean includesMergeResult = false;
-      for (W window : toBeMerged) {
-        checkNotNull(window);
-        checkState(isActiveOrNew(window), "Expecting merge window %s to be 
ACTIVE or NEW", window);
-        if (window.equals(mergeResult)) {
-          includesMergeResult = true;
-        }
-        boolean notDup = seen.add(window);
-        checkState(notDup, "Expecting merge window %s to appear in at most one 
merge set", window);
-        copyOfToBeMerged.add(window);
-      }
-      if (!includesMergeResult) {
-        checkState(!isActive(mergeResult), "Expecting result window %s to be 
NEW", mergeResult);
-      }
-      allToBeMerged.add(copyOfToBeMerged);
-      allMergeResults.add(mergeResult);
-    }
-
-    public void recordMerges() throws Exception {
-      for (int i = 0; i < allToBeMerged.size(); i++) {
-        mergeCallback.prefetchOnMerge(allToBeMerged.get(i), 
allMergeResults.get(i));
-      }
-      for (int i = 0; i < allToBeMerged.size(); i++) {
-        mergeCallback.onMerge(allToBeMerged.get(i), allMergeResults.get(i));
-        recordMerge(allToBeMerged.get(i), allMergeResults.get(i));
-      }
-      allToBeMerged.clear();
-      allMergeResults.clear();
-      seen.clear();
-    }
-  }
-
-  @Override
-  public void merge(MergeCallback<W> mergeCallback) throws Exception {
-    MergeContextImpl context = new MergeContextImpl(mergeCallback);
-
-    // See what the window function does with the NEW and already ACTIVE 
windows.
-    // Entering userland.
-    windowFn.mergeWindows(context);
-
-    // Actually do the merging and invoke the callbacks.
-    context.recordMerges();
-  }
-
-  /**
-   * A {@link WindowFn#mergeWindows} call has determined that {@code 
toBeMerged} (which must
-   * all be ACTIVE}) should be considered equivalent to {@code activeWindow} 
(which is either a
-   * member of {@code toBeMerged} or is a new window). Make the corresponding 
change in
-   * the active window set.
-   */
-  private void recordMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
-    // Note that mergedWriteStateAddress must predict the result of 
writeStateAddress
-    // after the corresponding merge has been applied.
-    // Thus we must ensure the first state address window in the merged result 
here is
-    // deterministic.
-    // Thus we use a linked hash set.
-    Set<W> newStateAddressWindows = new LinkedHashSet<>();
-    Set<W> existingStateAddressWindows = 
activeWindowToStateAddressWindows.get(mergeResult);
-    if (existingStateAddressWindows != null) {
-      // Preserve all the existing state address windows for mergeResult.
-      newStateAddressWindows.addAll(existingStateAddressWindows);
-    }
-
-    for (W other : toBeMerged) {
-      Set<W> otherStateAddressWindows = 
activeWindowToStateAddressWindows.get(other);
-      checkState(otherStateAddressWindows != null,
-                               "Window %s is not ACTIVE or NEW", other);
-
-      if (otherStateAddressWindows != null) {
-        for (W otherStateAddressWindow : otherStateAddressWindows) {
-          // Since otherTarget equiv other AND other equiv mergeResult
-          // THEN otherTarget equiv mergeResult.
-          newStateAddressWindows.add(otherStateAddressWindow);
-        }
-      }
-      activeWindowToStateAddressWindows.remove(other);
-
-      // Now other equiv mergeResult.
-    }
-
-    if (newStateAddressWindows.isEmpty()) {
-      // If stateAddressWindows is empty then toBeMerged must have only 
contained EPHEMERAL windows.
-      // Promote mergeResult to be ACTIVE now.
-      newStateAddressWindows.add(mergeResult);
-    }
-
-    activeWindowToStateAddressWindows.put(mergeResult, newStateAddressWindows);
-
-    merged(mergeResult);
-  }
-
-  @Override
-  public void merged(W window) {
-    // Take just the first state address window.
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
-    W first = Iterables.getFirst(stateAddressWindows, null);
-    stateAddressWindows.clear();
-    stateAddressWindows.add(first);
-  }
-
-  /**
-   * Return the state address windows for ACTIVE {@code window} from which all 
state associated
-   * should be read and merged.
-   */
-  @Override
-  public Set<W> readStateAddresses(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
-    return stateAddressWindows;
-  }
-
-  /**
-   * Return the state address window of ACTIVE {@code window} into which all 
new state should be
-   * written.
-   */
-  @Override
-  public W writeStateAddress(W window) {
-    Set<W> stateAddressWindows = activeWindowToStateAddressWindows.get(window);
-    checkState(stateAddressWindows != null, "Window %s is not ACTIVE", window);
-    W result = Iterables.getFirst(stateAddressWindows, null);
-    checkState(result != null, "Window %s is still NEW", window);
-    return result;
-  }
-
-  @Override
-  public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
-    Set<W> stateAddressWindows = 
activeWindowToStateAddressWindows.get(mergeResult);
-    if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
-      return Iterables.getFirst(stateAddressWindows, null);
-    }
-    for (W mergedWindow : toBeMerged) {
-      stateAddressWindows = 
activeWindowToStateAddressWindows.get(mergedWindow);
-      if (stateAddressWindows != null && !stateAddressWindows.isEmpty()) {
-        return Iterables.getFirst(stateAddressWindows, null);
-      }
-    }
-    return mergeResult;
-  }
-
-  @VisibleForTesting
-  public void checkInvariants() {
-    Set<W> knownStateAddressWindows = new HashSet<>();
-    for (Map.Entry<W, Set<W>> entry : 
activeWindowToStateAddressWindows.entrySet()) {
-      W active = entry.getKey();
-      checkState(!entry.getValue().isEmpty(),
-                               "Unexpected empty state address window set for 
ACTIVE window %s",
-                               active);
-      for (W stateAddressWindow : entry.getValue()) {
-        checkState(knownStateAddressWindows.add(stateAddressWindow),
-                                 "%s is in more than one state address window 
set",
-                                 stateAddressWindow);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("MergingActiveWindowSet {\n");
-    for (Map.Entry<W, Set<W>> entry : 
activeWindowToStateAddressWindows.entrySet()) {
-      W active = entry.getKey();
-      Set<W> stateAddressWindows = entry.getValue();
-      if (stateAddressWindows.isEmpty()) {
-        sb.append("  NEW ");
-        sb.append(active);
-        sb.append('\n');
-      } else {
-        sb.append("  ACTIVE ");
-        sb.append(active);
-        sb.append(":\n");
-        for (W stateAddressWindow : stateAddressWindows) {
-          sb.append("    ");
-          sb.append(stateAddressWindow);
-          sb.append("\n");
-        }
-      }
-    }
-    sb.append("}");
-    return sb.toString();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof MergingActiveWindowSet)) {
-      return false;
-    }
-
-    @SuppressWarnings("unchecked")
-    MergingActiveWindowSet<W> other = (MergingActiveWindowSet<W>) o;
-
-    return 
activeWindowToStateAddressWindows.equals(other.activeWindowToStateAddressWindows);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(activeWindowToStateAddressWindows);
-  }
-
-  /**
-   * Replace null {@code multimap} with empty map, and replace null entries in 
{@code multimap}
-   * with
-   * empty sets.
-   */
-  private static <W> Map<W, Set<W>> emptyIfNull(@Nullable Map<W, Set<W>> 
multimap) {
-    if (multimap == null) {
-      return new HashMap<>();
-    } else {
-      for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-        if (entry.getValue() == null) {
-          entry.setValue(new LinkedHashSet<W>());
-        }
-      }
-      return multimap;
-    }
-  }
-
-  /**
-   * Return a deep copy of {@code multimap}.
-   */
-  private static <W> Map<W, Set<W>> deepCopy(Map<W, Set<W>> multimap) {
-    Map<W, Set<W>> newMultimap = new HashMap<>();
-    for (Map.Entry<W, Set<W>> entry : multimap.entrySet()) {
-      newMultimap.put(entry.getKey(), new LinkedHashSet<>(entry.getValue()));
-    }
-    return newMultimap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
deleted file mode 100644
index 99d591b..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NonMergingActiveWindowSet.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import java.util.Collection;
-import java.util.Set;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-
-/**
- * Implementation of {@link ActiveWindowSet} used with {@link WindowFn 
WindowFns} that don't
- * support
- * merging.
- *
- * @param <W> the types of windows being managed
- */
-public class NonMergingActiveWindowSet<W extends BoundedWindow> implements 
ActiveWindowSet<W> {
-  @Override
-  public void cleanupTemporaryWindows() {}
-
-  @Override
-  public void persist() {}
-
-  @Override
-  public Set<W> getActiveAndNewWindows() {
-    // Only supported when merging.
-    throw new java.lang.UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean isActive(W window) {
-    // Windows should never disappear, since we don't support merging.
-    return true;
-  }
-
-  @Override
-  public boolean isActiveOrNew(W window) {
-    return true;
-  }
-
-  @Override
-  public void ensureWindowExists(W window) {}
-
-  @Override
-  public void ensureWindowIsActive(W window) {}
-
-  @Override
-  @VisibleForTesting
-  public void addActiveForTesting(W window) {}
-
-  @Override
-  public void remove(W window) {}
-
-  @Override
-  public void merge(MergeCallback<W> mergeCallback) throws Exception {}
-
-  @Override
-  public void merged(W window) {}
-
-  @Override
-  public Set<W> readStateAddresses(W window) {
-    return ImmutableSet.of(window);
-  }
-
-  @Override
-  public W writeStateAddress(W window) {
-    return window;
-  }
-
-  @Override
-  public W mergedWriteStateAddress(Collection<W> toBeMerged, W mergeResult) {
-    return mergeResult;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcf02986/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
deleted file mode 100644
index 676a25a..0000000
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Test NonMergingActiveWindowSet.
- */
-@RunWith(JUnit4.class)
-public class MergingActiveWindowSetTest {
-  private Sessions windowFn;
-  private StateInternals<String> state;
-  private MergingActiveWindowSet<IntervalWindow> set;
-  private ActiveWindowSet.MergeCallback<IntervalWindow> callback;
-
-  @Before
-  public void setup() {
-    windowFn = Sessions.withGapDuration(Duration.millis(10));
-    state = InMemoryStateInternals.forKey("dummyKey");
-    set = new MergingActiveWindowSet<>(windowFn, state);
-    @SuppressWarnings("unchecked")
-    ActiveWindowSet.MergeCallback<IntervalWindow>
-        callback = mock(ActiveWindowSet.MergeCallback.class);
-    this.callback = callback;
-  }
-
-  @After
-  public void after() {
-    set = null;
-    state = null;
-    windowFn = null;
-  }
-
-  private void add(long... instants) {
-    for (final long instant : instants) {
-      System.out.println("ADD " + instant);
-      Sessions.AssignContext context = windowFn.new AssignContext() {
-        @Override
-        public Object element() {
-          return (Object) instant;
-        }
-
-        @Override
-        public Instant timestamp() {
-          return new Instant(instant);
-        }
-
-        @Override
-        public BoundedWindow window() {
-          return GlobalWindow.INSTANCE;
-        }
-      };
-
-      for (IntervalWindow window : windowFn.assignWindows(context)) {
-        set.ensureWindowExists(window);
-      }
-    }
-  }
-
-  private Map<IntervalWindow, IntervalWindow> merge(
-      List<IntervalWindow> toBeMerged,
-      IntervalWindow mergeResult) throws Exception {
-    IntervalWindow predictedPostMergeWriteStateAddress =
-        set.mergedWriteStateAddress(toBeMerged, mergeResult);
-
-    System.out.println("BEFORE MERGE");
-    System.out.println(set);
-    Map<IntervalWindow, IntervalWindow> map = new HashMap<>();
-    for (IntervalWindow window : toBeMerged) {
-      System.out.println("WILL MERGE " + window + " INTO " + mergeResult);
-      map.put(window, mergeResult);
-    }
-    System.out.println("AFTER MERGE");
-    set.merge(callback);
-    verify(callback).onMerge(toBeMerged, mergeResult);
-    System.out.println(set);
-
-    assertEquals(predictedPostMergeWriteStateAddress, 
set.writeStateAddress(mergeResult));
-
-    return map;
-  }
-
-  private void activate(Map<IntervalWindow, IntervalWindow> map, long... 
instants) {
-    for (long instant : instants) {
-      IntervalWindow window = window(instant, 10);
-      IntervalWindow active = map.get(window);
-      if (active == null) {
-        active = window;
-      }
-      System.out.println("ACTIVATE " + active);
-      set.ensureWindowIsActive(active);
-    }
-    set.checkInvariants();
-  }
-
-  private void cleanup() {
-    System.out.println("CLEANUP");
-    set.cleanupTemporaryWindows();
-    set.checkInvariants();
-    System.out.println(set);
-    set.persist();
-    MergingActiveWindowSet<IntervalWindow> reloaded =
-        new MergingActiveWindowSet<>(windowFn, state);
-    reloaded.checkInvariants();
-    assertEquals(set, reloaded);
-  }
-
-  private IntervalWindow window(long start, long size) {
-    return new IntervalWindow(new Instant(start), new Duration(size));
-  }
-
-  @Test
-  public void testLifecycle() throws Exception {
-    // Step 1: New elements show up, introducing NEW windows which are 
partially merged.
-    // NEW 1+10
-    // NEW 2+10
-    // NEW 15+10
-    // =>
-    // ACTIVE 1+11 (target 1+11)
-    // ACTIVE 15+10 (target 15+10)
-    add(1, 2, 15);
-    assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(15, 10)),
-                 set.getActiveAndNewWindows());
-    Map<IntervalWindow, IntervalWindow> map =
-        merge(ImmutableList.of(window(1, 10), window(2, 10)),
-              window(1, 11));
-    activate(map, 1, 2, 15);
-    assertEquals(ImmutableSet.of(window(1, 11), window(15, 10)), 
set.getActiveAndNewWindows());
-    assertEquals(
-        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 11)));
-    assertEquals(
-        ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 
10)));
-    cleanup();
-
-    // Step 2: Another element, merged into an existing ACTIVE window.
-    // NEW 3+10
-    // =>
-    // ACTIVE 1+12 (target 1+11)
-    // ACTIVE 15+10 (target 15+10)
-    add(3);
-    assertEquals(ImmutableSet.of(window(3, 10), window(1, 11), window(15, 10)),
-                 set.getActiveAndNewWindows());
-    map = merge(ImmutableList.of(window(1, 11), window(3, 10)),
-                window(1, 12));
-    activate(map, 3);
-    assertEquals(ImmutableSet.of(window(1, 12), window(15, 10)), 
set.getActiveAndNewWindows());
-    assertEquals(
-        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 12)));
-    assertEquals(
-        ImmutableSet.of(window(15, 10)), set.readStateAddresses(window(15, 
10)));
-    cleanup();
-
-    // Step 3: Another element, causing two ACTIVE windows to be merged.
-    // NEW 8+10
-    // =>
-    // ACTIVE 1+24 (target 1+11)
-    add(8);
-    assertEquals(ImmutableSet.of(window(8, 10), window(1, 12), window(15, 10)),
-                 set.getActiveAndNewWindows());
-    map = merge(ImmutableList.of(window(1, 12), window(8, 10), window(15, 10)),
-                window(1, 24));
-    activate(map, 8);
-    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
-    assertEquals(
-        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
-    cleanup();
-
-    // Step 4: Another element, merged into an existing ACTIVE window.
-    // NEW 9+10
-    // =>
-    // ACTIVE 1+24 (target 1+11)
-    add(9);
-    assertEquals(ImmutableSet.of(window(9, 10), window(1, 24)), 
set.getActiveAndNewWindows());
-    map = merge(ImmutableList.of(window(1, 24), window(9, 10)),
-                window(1, 24));
-    activate(map, 9);
-    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
-    assertEquals(
-        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
-    cleanup();
-
-    // Step 5: Another element reusing earlier window, merged into an existing 
ACTIVE window.
-    // NEW 1+10
-    // =>
-    // ACTIVE 1+24 (target 1+11)
-    add(1);
-    assertEquals(ImmutableSet.of(window(1, 10), window(1, 24)), 
set.getActiveAndNewWindows());
-    map = merge(ImmutableList.of(window(1, 10), window(1, 24)),
-                window(1, 24));
-    activate(map, 1);
-    assertEquals(ImmutableSet.of(window(1, 24)), set.getActiveAndNewWindows());
-    assertEquals(
-        ImmutableSet.of(window(1, 11)), set.readStateAddresses(window(1, 24)));
-    cleanup();
-
-    // Step 6: Window is closed.
-    set.remove(window(1, 24));
-    cleanup();
-    assertTrue(set.getActiveAndNewWindows().isEmpty());
-  }
-
-  @Test
-  public void testLegacyState() {
-    // Pre 1.4 we merged window state lazily.
-    // Simulate loading an active window set with multiple state address 
windows.
-    set.addActiveForTesting(window(1, 12),
-                            ImmutableList.of(window(1, 10), window(2, 10), 
window(3, 10)));
-
-
-    // Make sure we can detect and repair the state.
-    assertTrue(set.isActive(window(1, 12)));
-    assertEquals(ImmutableSet.of(window(1, 10), window(2, 10), window(3, 10)),
-                 set.readStateAddresses(window(1, 12)));
-    assertEquals(window(1, 10),
-                 set.mergedWriteStateAddress(
-                     ImmutableList.of(window(1, 10), window(2, 10), window(3, 
10)),
-                     window(1, 12)));
-    set.merged(window(1, 12));
-    cleanup();
-
-    // For then on we are back to the eager case.
-    assertEquals(ImmutableSet.of(window(1, 10)), 
set.readStateAddresses(window(1, 12)));
-  }
-}

Reply via email to