Move Towards removing WindowedValue from SDK

- Introduces ValueInSingleWindow for purposes of PAssert
- Uses ValueInSingleWindow inside DoFnTester
- Moves WindowMatchers{,Test} to runners-core

After this commit, WindowedValue does not appear in any SDK APIs
used by Pipeline authors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9891234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9891234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9891234

Branch: refs/heads/gearpump-runner
Commit: d989123424a54699ecb47ba6c0a4e437316cabce
Parents: 0fb5610
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Mon Oct 31 15:46:25 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Dec 2 13:16:04 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/core/ReduceFnRunnerTest.java   |   5 +-
 .../beam/runners/core/SplittableParDoTest.java  |  38 ++--
 .../beam/runners/core/WindowMatchers.java       | 204 +++++++++++++++++++
 .../beam/runners/core/WindowMatchersTest.java   |  82 ++++++++
 .../direct/WindowEvaluatorFactoryTest.java      |   4 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  88 ++++++++
 .../org/apache/beam/sdk/testing/PAssert.java    |  77 +++----
 .../apache/beam/sdk/testing/PaneExtractors.java |  55 +++--
 .../beam/sdk/testing/ValueInSingleWindow.java   | 134 ++++++++++++
 .../apache/beam/sdk/transforms/DoFnTester.java  |  58 +++---
 .../apache/beam/sdk/util/GatherAllPanes.java    |  86 --------
 .../apache/beam/sdk/util/IdentityWindowFn.java  |   2 +-
 .../org/apache/beam/sdk/WindowMatchers.java     | 204 -------------------
 .../org/apache/beam/sdk/WindowMatchersTest.java |  82 --------
 .../beam/sdk/testing/GatherAllPanesTest.java    | 140 +++++++++++++
 .../beam/sdk/testing/PaneExtractorsTest.java    | 133 ++++++------
 .../testing/ValueInSingleWindowCoderTest.java   |  51 +++++
 .../beam/sdk/util/GatherAllPanesTest.java       | 143 -------------
 18 files changed, 893 insertions(+), 693 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 20eb08b..ba57567 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static 
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
@@ -39,7 +39,6 @@ import com.google.common.collect.Iterables;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
-import org.apache.beam.sdk.WindowMatchers;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 990d892..b13d839 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValueInSingleWindow;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -142,14 +143,15 @@ public class SplittableParDoTest {
         PCollection.IsBounded.BOUNDED,
         makeBoundedCollection(pipeline)
             .apply("bounded to bounded", new 
SplittableParDo<>(makeParDo(boundedFn)))
-            .get(MAIN_OUTPUT_TAG).isBounded());
+            .get(MAIN_OUTPUT_TAG)
+            .isBounded());
     assertEquals(
         "Applying a bounded SDF to an unbounded collection produces an 
unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply(
-                "bounded to unbounded", new 
SplittableParDo<>(makeParDo(boundedFn)))
-            .get(MAIN_OUTPUT_TAG).isBounded());
+            .apply("bounded to unbounded", new 
SplittableParDo<>(makeParDo(boundedFn)))
+            .get(MAIN_OUTPUT_TAG)
+            .isBounded());
   }
 
   @Test
@@ -160,18 +162,16 @@ public class SplittableParDoTest {
         "Applying an unbounded SDF to a bounded collection produces a bounded 
collection",
         PCollection.IsBounded.UNBOUNDED,
         makeBoundedCollection(pipeline)
-            .apply(
-                "unbounded to bounded",
-                new SplittableParDo<>(makeParDo(unboundedFn)))
-            .get(MAIN_OUTPUT_TAG).isBounded());
+            .apply("unbounded to bounded", new 
SplittableParDo<>(makeParDo(unboundedFn)))
+            .get(MAIN_OUTPUT_TAG)
+            .isBounded());
     assertEquals(
         "Applying an unbounded SDF to an unbounded collection produces an 
unbounded collection",
         PCollection.IsBounded.UNBOUNDED,
         makeUnboundedCollection(pipeline)
-            .apply(
-                "unbounded to unbounded",
-                new SplittableParDo<>(makeParDo(unboundedFn)))
-            .get(MAIN_OUTPUT_TAG).isBounded());
+            .apply("unbounded to unbounded", new 
SplittableParDo<>(makeParDo(unboundedFn)))
+            .get(MAIN_OUTPUT_TAG)
+            .isBounded());
   }
 
   // ------------------------------- Tests for ProcessFn 
---------------------------------
@@ -224,9 +224,11 @@ public class SplittableParDoTest {
                 Instant timestamp,
                 Collection<? extends BoundedWindow> windows,
                 PaneInfo pane) {
-              tester
-                  .getMutableOutput(tester.getMainOutputTag())
-                  .add(WindowedValue.of(output, timestamp, windows, pane));
+              for (BoundedWindow window : windows) {
+                tester
+                    .getMutableOutput(tester.getMainOutputTag())
+                    .add(ValueInSingleWindow.of(output, timestamp, window, 
pane));
+              }
             }
 
             @Override
@@ -236,7 +238,11 @@ public class SplittableParDoTest {
                 Instant timestamp,
                 Collection<? extends BoundedWindow> windows,
                 PaneInfo pane) {
-              tester.getMutableOutput(tag).add(WindowedValue.of(output, 
timestamp, windows, pane));
+              for (BoundedWindow window : windows) {
+                tester
+                    .getMutableOutput(tag)
+                    .add(ValueInSingleWindow.of(output, timestamp, window, 
pane));
+              }
             }
           });
       // Do not clone since ProcessFn references non-serializable DoFnTester 
itself

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
new file mode 100644
index 0000000..6c3a7e2
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Instant;
+
+/**
+ * Matchers that are useful for working with Windowing, Timestamps, etc.
+ */
+public class WindowMatchers {
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      T value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo paneInfo) {
+
+    Collection<Matcher<? super BoundedWindow>> windowMatchers =
+        Lists.newArrayListWithCapacity(windows.size());
+    for (BoundedWindow window : windows) {
+      windowMatchers.add(Matchers.equalTo(window));
+    }
+
+    return isWindowedValue(
+        Matchers.equalTo(value),
+        Matchers.equalTo(timestamp),
+        Matchers.containsInAnyOrder(windowMatchers),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+      Matcher<? super PaneInfo> paneInfoMatcher) {
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher, Matcher<? super Instant> 
timestampMatcher) {
+    return new WindowedValueMatcher<>(
+        valueMatcher, timestampMatcher, Matchers.anything(), 
Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+      Matcher<? super T> valueMatcher) {
+    return new WindowedValueMatcher<>(
+        valueMatcher, Matchers.anything(), Matchers.anything(), 
Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, long timestamp, long windowStart, long windowEnd) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value), timestamp, windowStart, windowEnd);
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value),
+        Matchers.equalTo(timestamp),
+        Matchers.equalTo(window),
+        Matchers.equalTo(paneInfo));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      T value, Instant timestamp, BoundedWindow window) {
+    return WindowMatchers.<T>isSingleWindowedValue(
+        Matchers.equalTo(value), Matchers.equalTo(timestamp), 
Matchers.equalTo(window));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      Matcher<T> valueMatcher, long timestamp, long windowStart, long 
windowEnd) {
+    IntervalWindow intervalWindow =
+        new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
+    return WindowMatchers.<T>isSingleWindowedValue(
+        valueMatcher,
+        Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), 
timestamp),
+        Matchers.<BoundedWindow>equalTo(intervalWindow),
+        Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super BoundedWindow> windowMatcher) {
+    return new WindowedValueMatcher<T>(
+        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), 
Matchers.anything());
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+      Matcher<? super T> valueMatcher,
+      Matcher<? super Instant> timestampMatcher,
+      Matcher<? super BoundedWindow> windowMatcher,
+      Matcher<? super PaneInfo> paneInfoMatcher) {
+    return new WindowedValueMatcher<T>(
+        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), 
paneInfoMatcher);
+  }
+
+  public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
+    return Matchers.equalTo(new IntervalWindow(new Instant(start), new 
Instant(end)));
+  }
+
+  public static <T> Matcher<WindowedValue<? extends T>> 
valueWithPaneInfo(final PaneInfo paneInfo) {
+    return new TypeSafeMatcher<WindowedValue<? extends T>>() {
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("WindowedValue(paneInfo = 
").appendValue(paneInfo).appendText(")");
+      }
+
+      @Override
+      protected boolean matchesSafely(WindowedValue<? extends T> item) {
+        return Objects.equals(item.getPane(), paneInfo);
+      }
+
+      @Override
+      protected void describeMismatchSafely(
+          WindowedValue<? extends T> item, Description mismatchDescription) {
+        mismatchDescription.appendValue(item.getPane());
+      }
+    };
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @SafeVarargs
+  public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows(
+      Matcher<W>... windows) {
+    return (Matcher) Matchers.<W>containsInAnyOrder(windows);
+  }
+
+  private WindowMatchers() {}
+
+  private static class WindowedValueMatcher<T> extends 
TypeSafeMatcher<WindowedValue<? extends T>> {
+
+    private Matcher<? super T> valueMatcher;
+    private Matcher<? super Instant> timestampMatcher;
+    private Matcher<? super Collection<? extends BoundedWindow>> 
windowsMatcher;
+    private Matcher<? super PaneInfo> paneInfoMatcher;
+
+    private WindowedValueMatcher(
+        Matcher<? super T> valueMatcher,
+        Matcher<? super Instant> timestampMatcher,
+        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+        Matcher<? super PaneInfo> paneInfoMatcher) {
+      this.valueMatcher = valueMatcher;
+      this.timestampMatcher = timestampMatcher;
+      this.windowsMatcher = windowsMatcher;
+      this.paneInfoMatcher = paneInfoMatcher;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description
+          .appendText("a WindowedValue(").appendValue(valueMatcher)
+          .appendText(", ").appendValue(timestampMatcher)
+          .appendText(", ").appendValue(windowsMatcher)
+          .appendText(", ").appendValue(paneInfoMatcher)
+          .appendText(")");
+    }
+
+    @Override
+    protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
+      return valueMatcher.matches(windowedValue.getValue())
+          && timestampMatcher.matches(windowedValue.getTimestamp())
+          && windowsMatcher.matches(windowedValue.getWindows());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
new file mode 100644
index 0000000..6f4741a
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link WindowMatchers}.
+ */
+@RunWith(JUnit4.class)
+public class WindowMatchersTest {
+
+  @Test
+  public void testIsWindowedValueExact() {
+    long timestamp = 100;
+    long windowStart = 0;
+    long windowEnd = 200;
+
+    assertThat(
+        WindowedValue.of(
+            "hello",
+            new Instant(timestamp),
+            new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd)),
+            PaneInfo.NO_FIRING),
+        WindowMatchers.isWindowedValue(
+            "hello",
+            new Instant(timestamp),
+            ImmutableList.of(new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd))),
+            PaneInfo.NO_FIRING));
+  }
+
+  @Test
+  public void testIsWindowedValueReorderedWindows() {
+    long timestamp = 100;
+    long windowStart = 0;
+    long windowEnd = 200;
+    long windowStart2 = 50;
+    long windowEnd2 = 150;
+
+    assertThat(
+        WindowedValue.of(
+            "hello",
+            new Instant(timestamp),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd)),
+                new IntervalWindow(new Instant(windowStart2), new 
Instant(windowEnd2))),
+            PaneInfo.NO_FIRING),
+        WindowMatchers.isWindowedValue(
+            "hello",
+            new Instant(timestamp),
+            ImmutableList.of(
+                new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd)),
+                new IntervalWindow(new Instant(windowStart2), new 
Instant(windowEnd2))),
+            PaneInfo.NO_FIRING));
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index e2f987c..66c28ce 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static 
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
new file mode 100644
index 0000000..2b311b7
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Gathers all panes of each window into exactly one output.
+ *
+ * <p>Note that this will delay the output of a window until the garbage 
collection time (when the
+ * watermark passes the end of the window plus allowed lateness) even if the 
upstream triggers
+ * closed the window earlier.
+ */
+class GatherAllPanes<T>
+    extends PTransform<PCollection<T>, 
PCollection<Iterable<ValueInSingleWindow<T>>>> {
+  /**
+   * Gathers all panes of each window into a single output element.
+   *
+   * <p>This will gather all output panes into a single element, which causes 
them to be colocated
+   * on a single worker. As a result, this is only suitable for {@link 
PCollection PCollections}
+   * where all of the output elements for each pane fit in memory, such as in 
tests.
+   */
+  public static <T> GatherAllPanes<T> globally() {
+    return new GatherAllPanes<>();
+  }
+
+  private GatherAllPanes() {}
+
+  @Override
+  public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> 
input) {
+    WindowFn<?, ?> originalWindowFn = 
input.getWindowingStrategy().getWindowFn();
+
+    return input
+        .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
+        .setCoder(
+            ValueInSingleWindow.Coder.of(
+                input.getCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
+        .apply(
+            WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
+                .withKeyType(new TypeDescriptor<Integer>() {}))
+        .apply(
+            Window.into(
+                    new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
+                        originalWindowFn.windowCoder()))
+                .triggering(Never.ever())
+                
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+                .discardingFiredPanes())
+        // all values have the same key so they all appear as a single output 
element
+        .apply(GroupByKey.<Integer, ValueInSingleWindow<T>>create())
+        .apply(Values.<Iterable<ValueInSingleWindow<T>>>create())
+        .setWindowingStrategyInternal(input.getWindowingStrategy());
+  }
+
+  private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, 
ValueInSingleWindow<T>> {
+    @DoFn.ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window, 
c.pane()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b3a14aa..7dc78d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -63,8 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.GatherAllPanes;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
@@ -349,7 +347,7 @@ public class PAssert {
   private static class PCollectionContentsAssert<T> implements 
IterableAssert<T> {
     private final PCollection<T> actual;
     private final AssertionWindows rewindowingStrategy;
-    private final SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
paneExtractor;
+    private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, 
Iterable<T>> paneExtractor;
 
     public PCollectionContentsAssert(PCollection<T> actual) {
       this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes());
@@ -358,7 +356,7 @@ public class PAssert {
     public PCollectionContentsAssert(
         PCollection<T> actual,
         AssertionWindows rewindowingStrategy,
-        SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) 
{
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
paneExtractor) {
       this.actual = actual;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
@@ -391,7 +389,7 @@ public class PAssert {
 
     private PCollectionContentsAssert<T> withPane(
         BoundedWindow window,
-        SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) 
{
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
paneExtractor) {
       @SuppressWarnings({"unchecked", "rawtypes"})
       Coder<BoundedWindow> windowCoder =
           (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
@@ -523,7 +521,7 @@ public class PAssert {
     private final PCollection<Iterable<T>> actual;
     private final Coder<T> elementCoder;
     private final AssertionWindows rewindowingStrategy;
-    private final SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, 
Iterable<Iterable<T>>>
+    private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
         paneExtractor;
 
     public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) 
{
@@ -533,7 +531,8 @@ public class PAssert {
     public PCollectionSingletonIterableAssert(
         PCollection<Iterable<T>> actual,
         AssertionWindows rewindowingStrategy,
-        SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, 
Iterable<Iterable<T>>> paneExtractor) {
+        SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
+            paneExtractor) {
       this.actual = actual;
 
       @SuppressWarnings("unchecked")
@@ -571,7 +570,8 @@ public class PAssert {
 
     private PCollectionSingletonIterableAssert<T> withPanes(
         BoundedWindow window,
-        SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, 
Iterable<Iterable<T>>> paneExtractor) {
+        SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
+            paneExtractor) {
       @SuppressWarnings({"unchecked", "rawtypes"})
       Coder<BoundedWindow> windowCoder =
           (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
@@ -620,7 +620,8 @@ public class PAssert {
     private final PCollection<ElemT> actual;
     private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
     private final AssertionWindows rewindowActuals;
-    private final SimpleFunction<Iterable<WindowedValue<ElemT>>, 
Iterable<ElemT>> paneExtractor;
+    private final SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, 
Iterable<ElemT>>
+        paneExtractor;
     private final Coder<ViewT> coder;
 
     protected PCollectionViewAssert(
@@ -634,7 +635,7 @@ public class PAssert {
         PCollection<ElemT> actual,
         PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
         AssertionWindows rewindowActuals,
-        SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>> 
paneExtractor,
+        SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> 
paneExtractor,
         Coder<ViewT> coder) {
       this.actual = actual;
       this.view = view;
@@ -660,7 +661,7 @@ public class PAssert {
 
     private PCollectionViewAssert<ElemT, ViewT> inPane(
         BoundedWindow window,
-        SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>> 
paneExtractor) {
+        SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> 
paneExtractor) {
       return new PCollectionViewAssert<>(
           actual,
           view,
@@ -738,13 +739,14 @@ public class PAssert {
 
     private final transient PCollection<T> actual;
     private final transient AssertionWindows rewindowActuals;
-    private final transient SimpleFunction<Iterable<WindowedValue<T>>, 
Iterable<T>> extractPane;
+    private final transient SimpleFunction<Iterable<ValueInSingleWindow<T>>, 
Iterable<T>>
+        extractPane;
     private final transient PTransform<PCollection<T>, 
PCollectionView<ActualT>> actualView;
 
     public static <T, ActualT> CreateActual<T, ActualT> from(
         PCollection<T> actual,
         AssertionWindows rewindowActuals,
-        SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane,
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
extractPane,
         PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       return new CreateActual<>(actual, rewindowActuals, extractPane, 
actualView);
     }
@@ -752,7 +754,7 @@ public class PAssert {
     private CreateActual(
         PCollection<T> actual,
         AssertionWindows rewindowActuals,
-        SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane,
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
extractPane,
         PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
       this.actual = actual;
       this.rewindowActuals = rewindowActuals;
@@ -822,7 +824,7 @@ public class PAssert {
    * a single empty iterable, even though in practice most runners will not 
produce any element.
    */
   private static class GroupGlobally<T>
-      extends PTransform<PCollection<T>, 
PCollection<Iterable<WindowedValue<T>>>>
+      extends PTransform<PCollection<T>, 
PCollection<Iterable<ValueInSingleWindow<T>>>>
       implements Serializable {
     private final AssertionWindows rewindowingStrategy;
 
@@ -831,20 +833,20 @@ public class PAssert {
     }
 
     @Override
-    public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) 
{
+    public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> 
input) {
       final int combinedKey = 42;
 
       // Remove the triggering on both
       PTransform<
-              PCollection<KV<Integer, Iterable<WindowedValue<T>>>>,
-              PCollection<KV<Integer, Iterable<WindowedValue<T>>>>>
+              PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>,
+              PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
           removeTriggering =
-              Window.<KV<Integer, 
Iterable<WindowedValue<T>>>>triggering(Never.ever())
+              Window.<KV<Integer, 
Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
                   .discardingFiredPanes()
                   
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
       // Group the contents by key. If it is empty, this PCollection will be 
empty, too.
       // Then key it again with a dummy key.
-      PCollection<KV<Integer, Iterable<WindowedValue<T>>>> groupedContents =
+      PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>> 
groupedContents =
           // TODO: Split the filtering from the rewindowing, and apply 
filtering before the Gather
           // if the grouping of extra records
           input
@@ -852,45 +854,47 @@ public class PAssert {
               .apply("GatherAllOutputs", GatherAllPanes.<T>globally())
               .apply(
                   "RewindowActuals",
-                  
rewindowingStrategy.<Iterable<WindowedValue<T>>>windowActuals())
-              .apply("KeyForDummy", WithKeys.<Integer, 
Iterable<WindowedValue<T>>>of(combinedKey))
+                  
rewindowingStrategy.<Iterable<ValueInSingleWindow<T>>>windowActuals())
+              .apply(
+                  "KeyForDummy",
+                  WithKeys.<Integer, 
Iterable<ValueInSingleWindow<T>>>of(combinedKey))
               .apply("RemoveActualsTriggering", removeTriggering);
 
       // Create another non-empty PCollection that is keyed with a distinct 
dummy key
-      PCollection<KV<Integer, Iterable<WindowedValue<T>>>> keyedDummy =
+      PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>> keyedDummy =
           input
               .getPipeline()
               .apply(
                   Create.of(
                           KV.of(
                               combinedKey,
-                              (Iterable<WindowedValue<T>>)
-                                  Collections.<WindowedValue<T>>emptyList()))
+                              (Iterable<ValueInSingleWindow<T>>)
+                                  
Collections.<ValueInSingleWindow<T>>emptyList()))
                       .withCoder(groupedContents.getCoder()))
               .apply(
                   "WindowIntoDummy",
-                  rewindowingStrategy.<KV<Integer, 
Iterable<WindowedValue<T>>>>windowDummy())
+                  rewindowingStrategy.<KV<Integer, 
Iterable<ValueInSingleWindow<T>>>>windowDummy())
               .apply("RemoveDummyTriggering", removeTriggering);
 
       // Flatten them together and group by the combined key to get a single 
element
-      PCollection<KV<Integer, Iterable<Iterable<WindowedValue<T>>>>> 
dummyAndContents =
+      PCollection<KV<Integer, Iterable<Iterable<ValueInSingleWindow<T>>>>> 
dummyAndContents =
           PCollectionList.of(groupedContents)
               .and(keyedDummy)
               .apply(
                   "FlattenDummyAndContents",
-                  Flatten.<KV<Integer, 
Iterable<WindowedValue<T>>>>pCollections())
+                  Flatten.<KV<Integer, 
Iterable<ValueInSingleWindow<T>>>>pCollections())
               .apply(
                   "NeverTrigger",
-                  Window.<KV<Integer, 
Iterable<WindowedValue<T>>>>triggering(Never.ever())
+                  Window.<KV<Integer, 
Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
                       
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
                       .discardingFiredPanes())
               .apply(
                   "GroupDummyAndContents",
-                  GroupByKey.<Integer, Iterable<WindowedValue<T>>>create());
+                  GroupByKey.<Integer, 
Iterable<ValueInSingleWindow<T>>>create());
 
       return dummyAndContents
-          .apply(Values.<Iterable<Iterable<WindowedValue<T>>>>create())
-          .apply(ParDo.of(new ConcatFn<WindowedValue<T>>()));
+          .apply(Values.<Iterable<Iterable<ValueInSingleWindow<T>>>>create())
+          .apply(ParDo.of(new ConcatFn<ValueInSingleWindow<T>>()));
     }
   }
 
@@ -909,12 +913,12 @@ public class PAssert {
       implements Serializable {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
     private final AssertionWindows rewindowingStrategy;
-    private final SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
paneExtractor;
+    private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, 
Iterable<T>> paneExtractor;
 
     private GroupThenAssert(
         SerializableFunction<Iterable<T>, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
-        SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) 
{
+        SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
paneExtractor) {
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;
@@ -940,13 +944,14 @@ public class PAssert {
       extends PTransform<PCollection<Iterable<T>>, PDone> implements 
Serializable {
     private final SerializableFunction<Iterable<T>, Void> checkerFn;
     private final AssertionWindows rewindowingStrategy;
-    private final SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, 
Iterable<Iterable<T>>>
+    private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
         paneExtractor;
 
     private GroupThenAssertForSingleton(
         SerializableFunction<Iterable<T>, Void> checkerFn,
         AssertionWindows rewindowingStrategy,
-        SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, 
Iterable<Iterable<T>>> paneExtractor) {
+        SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, 
Iterable<Iterable<T>>>
+            paneExtractor) {
       this.checkerFn = checkerFn;
       this.rewindowingStrategy = rewindowingStrategy;
       this.paneExtractor = paneExtractor;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index db72a0c..dd1fac9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -25,14 +25,13 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * {@link PTransform PTransforms} which take an {@link Iterable} of {@link 
WindowedValue
- * WindowedValues} and outputs an {@link Iterable} of all values in the 
specified pane, dropping the
- * {@link WindowedValue} metadata.
+ * {@link PTransform PTransforms} which take an {@link Iterable} of {@link 
ValueInSingleWindow
+ * ValueInSingleWindows} and outputs an {@link Iterable} of all values in the 
specified pane,
+ * dropping the {@link ValueInSingleWindow} metadata.
  *
  * <p>Although all of the method signatures return SimpleFunction, users 
should ensure to set the
  * coder of any output {@link PCollection}, as appropriate {@link 
TypeDescriptor TypeDescriptors}
@@ -42,36 +41,36 @@ final class PaneExtractors {
   private PaneExtractors() {
   }
 
-  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
onlyPane() {
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
onlyPane() {
     return new ExtractOnlyPane<>();
   }
 
-  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
onTimePane() {
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
onTimePane() {
     return new ExtractOnTimePane<>();
   }
 
-  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
finalPane() {
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
finalPane() {
     return new ExtractFinalPane<>();
   }
 
-  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
nonLatePanes() {
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
nonLatePanes() {
     return new ExtractNonLatePanes<>();
   }
 
-  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
earlyPanes() {
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
earlyPanes() {
     return new ExtractEarlyPanes<>();
   }
 
-  static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> 
allPanes() {
+  static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> 
allPanes() {
     return new ExtractAllPanes<>();
   }
 
   private static class ExtractOnlyPane<T>
-      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+    public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
-      for (WindowedValue<T> value : input) {
+      for (ValueInSingleWindow<T> value : input) {
         checkState(value.getPane().isFirst() && value.getPane().isLast(),
             "Expected elements to be produced by a trigger that fires at most 
once, but got"
                 + "a value in a pane that is %s. Actual Pane Info: %s",
@@ -85,11 +84,11 @@ final class PaneExtractors {
 
 
   private static class ExtractOnTimePane<T>
-      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+    public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
-      for (WindowedValue<T> value : input) {
+      for (ValueInSingleWindow<T> value : input) {
         if (value.getPane().getTiming().equals(Timing.ON_TIME)) {
           outputs.add(value.getValue());
         }
@@ -100,11 +99,11 @@ final class PaneExtractors {
 
 
   private static class ExtractFinalPane<T>
-      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+    public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
-      for (WindowedValue<T> value : input) {
+      for (ValueInSingleWindow<T> value : input) {
         if (value.getPane().isLast()) {
           outputs.add(value.getValue());
         }
@@ -115,11 +114,11 @@ final class PaneExtractors {
 
 
   private static class ExtractAllPanes<T>
-      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+    public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
-      for (WindowedValue<T> value : input) {
+      for (ValueInSingleWindow<T> value : input) {
         outputs.add(value.getValue());
       }
       return outputs;
@@ -128,11 +127,11 @@ final class PaneExtractors {
 
 
   private static class ExtractNonLatePanes<T>
-      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+    public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
-      for (WindowedValue<T> value : input) {
+      for (ValueInSingleWindow<T> value : input) {
         if (value.getPane().getTiming() != PaneInfo.Timing.LATE) {
           outputs.add(value.getValue());
         }
@@ -142,11 +141,11 @@ final class PaneExtractors {
   }
 
   private static class ExtractEarlyPanes<T>
-      extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+      extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
     @Override
-    public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+    public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
       List<T> outputs = new ArrayList<>();
-      for (WindowedValue<T> value : input) {
+      for (ValueInSingleWindow<T> value : input) {
         if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) {
           outputs.add(value.getValue());
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
new file mode 100644
index 0000000..9ec030f
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.joda.time.Instant;
+
+/**
+ * An immutable tuple of value, timestamp, window, and pane.
+ *
+ * @param <T> the type of the value
+ */
+@AutoValue
+public abstract class ValueInSingleWindow<T> {
+  /** Returns the value of this {@code ValueInSingleWindow}. */
+  @Nullable
+  public abstract T getValue();
+
+  /** Returns the timestamp of this {@code ValueInSingleWindow}. */
+  public abstract Instant getTimestamp();
+
+  /** Returns the window of this {@code ValueInSingleWindow}. */
+  public abstract BoundedWindow getWindow();
+
+  /** Returns the pane of this {@code ValueInSingleWindow} in its window. */
+  public abstract PaneInfo getPane();
+
+  public static <T> ValueInSingleWindow<T> of(
+      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+    return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, 
paneInfo);
+  }
+
+  /** A coder for {@link ValueInSingleWindow}. */
+  public static class Coder<T> extends StandardCoder<ValueInSingleWindow<T>> {
+    private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
+    private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
+
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> valueCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) 
{
+      return new Coder<>(valueCoder, windowCoder);
+    }
+
+    @JsonCreator
+    public static <T> Coder<T> of(
+        @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+            List<org.apache.beam.sdk.coders.Coder<?>> components) {
+      checkArgument(components.size() == 2, "Expecting 2 components, got %s", 
components.size());
+      @SuppressWarnings("unchecked")
+      org.apache.beam.sdk.coders.Coder<T> valueCoder =
+          (org.apache.beam.sdk.coders.Coder<T>) components.get(0);
+      @SuppressWarnings("unchecked")
+      org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder =
+          (org.apache.beam.sdk.coders.Coder<BoundedWindow>) components.get(1);
+      return new Coder<>(valueCoder, windowCoder);
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    Coder(
+        org.apache.beam.sdk.coders.Coder<T> valueCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) 
{
+      this.valueCoder = valueCoder;
+      this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder;
+    }
+
+    @Override
+    public void encode(ValueInSingleWindow<T> windowedElem, OutputStream 
outStream, Context context)
+        throws IOException {
+      Context nestedContext = context.nested();
+      valueCoder.encode(windowedElem.getValue(), outStream, nestedContext);
+      InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, 
nestedContext);
+      windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
+      PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), 
outStream, context);
+    }
+
+    @Override
+    public ValueInSingleWindow<T> decode(InputStream inStream, Context 
context) throws IOException {
+      Context nestedContext = context.nested();
+      T value = valueCoder.decode(inStream, nestedContext);
+      Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
+      BoundedWindow window = windowCoder.decode(inStream, nestedContext);
+      PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, 
nestedContext);
+      return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, 
pane);
+    }
+
+    @Override
+    public List<? extends org.apache.beam.sdk.coders.Coder<?>> 
getCoderArguments() {
+      // Coder arguments are coders for the type parameters of the coder - 
i.e. only T.
+      return ImmutableList.of(valueCoder);
+    }
+
+    @Override
+    public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() 
{
+      // Coder components are all inner coders that it uses - i.e. both T and 
BoundedWindow.
+      return ImmutableList.of(valueCoder, windowCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      valueCoder.verifyDeterministic();
+      windowCoder.verifyDeterministic();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 0c6043f..17fa612 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,8 +34,10 @@ import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ValueInSingleWindow;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -353,10 +354,10 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
   public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
     // TODO: Should we return an unmodifiable list?
     return Lists.transform(getImmutableOutput(mainOutputTag),
-        new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() {
+        new Function<ValueInSingleWindow<OutputT>, 
TimestampedValue<OutputT>>() {
           @Override
           @SuppressWarnings("unchecked")
-          public TimestampedValue<OutputT> apply(WindowedValue<OutputT> input) 
{
+          public TimestampedValue<OutputT> apply(ValueInSingleWindow<OutputT> 
input) {
             return TimestampedValue.of(input.getValue(), input.getTimestamp());
           }
         });
@@ -378,8 +379,8 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       TupleTag<OutputT> tag,
       BoundedWindow window) {
     ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = 
ImmutableList.builder();
-    for (WindowedValue<OutputT> value : getImmutableOutput(tag)) {
-      if (value.getWindows().contains(window)) {
+    for (ValueInSingleWindow<OutputT> value : getImmutableOutput(tag)) {
+      if (value.getWindow().equals(window)) {
         valuesBuilder.add(TimestampedValue.of(value.getValue(), 
value.getTimestamp()));
       }
     }
@@ -434,10 +435,10 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
   public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
     // TODO: Should we return an unmodifiable list?
     return Lists.transform(getImmutableOutput(tag),
-        new Function<WindowedValue<T>, T>() {
+        new Function<ValueInSingleWindow<T>, T>() {
           @SuppressWarnings("unchecked")
           @Override
-          public T apply(WindowedValue<T> input) {
+          public T apply(ValueInSingleWindow<T> input) {
             return input.getValue();
           }});
   }
@@ -510,16 +511,16 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
     return combiner.extractOutput(accumulator);
   }
 
-  private <T> List<WindowedValue<T>> getImmutableOutput(TupleTag<T> tag) {
+  private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) 
{
     @SuppressWarnings({"unchecked", "rawtypes"})
-    List<WindowedValue<T>> elems = (List) outputs.get(tag);
+    List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);
     return ImmutableList.copyOf(
-        MoreObjects.firstNonNull(elems, 
Collections.<WindowedValue<T>>emptyList()));
+        MoreObjects.firstNonNull(elems, 
Collections.<ValueInSingleWindow<T>>emptyList()));
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
-  public <T> List<WindowedValue<T>> getMutableOutput(TupleTag<T> tag) {
-    List<WindowedValue<T>> outputList = (List) outputs.get(tag);
+  public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) {
+    List<ValueInSingleWindow<T>> outputList = (List) outputs.get(tag);
     if (outputList == null) {
       outputList = new ArrayList<>();
       outputs.put(tag, (List) outputList);
@@ -612,23 +613,22 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE);
     }
 
-    public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output) {
+    public <T> void noteOutput(TupleTag<T> tag, ValueInSingleWindow<T> output) 
{
       getMutableOutput(tag).add(output);
     }
   }
 
   private TestProcessContext createProcessContext(TimestampedValue<InputT> 
elem) {
-    WindowedValue<InputT> windowedValue = 
WindowedValue.timestampedValueInGlobalWindow(
-        elem.getValue(), elem.getTimestamp());
-
-    return new TestProcessContext(windowedValue);
+    return new TestProcessContext(
+        ValueInSingleWindow.of(
+            elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING));
   }
 
   private class TestProcessContext extends OldDoFn<InputT, 
OutputT>.ProcessContext {
     private final TestContext context;
-    private final WindowedValue<InputT> element;
+    private final ValueInSingleWindow<InputT> element;
 
-    private TestProcessContext(WindowedValue<InputT> element) {
+    private TestProcessContext(ValueInSingleWindow<InputT> element) {
       fn.super();
       this.context = createContext(fn);
       this.element = element;
@@ -661,7 +661,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
 
     @Override
     public BoundedWindow window() {
-      return Iterables.getOnlyElement(element.getWindows());
+      return element.getWindow();
     }
 
     @Override
@@ -683,7 +683,10 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
             Instant timestamp,
             Collection<? extends BoundedWindow> windows,
             PaneInfo pane) {
-          context.noteOutput(mainOutputTag, WindowedValue.of(output, 
timestamp, windows, pane));
+          for (BoundedWindow window : windows) {
+            context.noteOutput(
+                mainOutputTag, ValueInSingleWindow.of(output, timestamp, 
window, pane));
+          }
         }
 
         @Override
@@ -693,7 +696,10 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
             Instant timestamp,
             Collection<? extends BoundedWindow> windows,
             PaneInfo pane) {
-          context.noteOutput(tag, WindowedValue.of(output, timestamp, windows, 
pane));
+          for (BoundedWindow window : windows) {
+            context.noteOutput(
+                tag, ValueInSingleWindow.of(output, timestamp, window, pane));
+          }
         }
 
         @Override
@@ -703,7 +709,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
 
         @Override
         public Collection<? extends BoundedWindow> windows() {
-          return element.getWindows();
+          return Collections.singleton(element.getWindow());
         }
 
         @Override
@@ -742,8 +748,8 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      context.noteOutput(tag,
-          WindowedValue.of(output, timestamp, element.getWindows(), 
element.getPane()));
+      context.noteOutput(
+          tag, ValueInSingleWindow.of(output, timestamp, element.getWindow(), 
element.getPane()));
     }
 
     @Override
@@ -803,7 +809,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
   OldDoFn<InputT, OutputT> fn;
 
   /** The outputs from the {@link DoFn} under test. */
-  private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
+  private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
 
   private InMemoryStateInternals<?> stateInternals;
   private InMemoryTimerInternals timerInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
deleted file mode 100644
index 52a2ba8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * Gathers all panes of each window into exactly one output.
- *
- * <p>Note that this will delay the output of a window until the garbage 
collection time (when the
- * watermark passes the end of the window plus allowed lateness) even if the 
upstream triggers
- * closed the window earlier.
- */
-public class GatherAllPanes<T>
-    extends PTransform<PCollection<T>, 
PCollection<Iterable<WindowedValue<T>>>> {
-  /**
-   * Gathers all panes of each window into a single output element.
-   *
-   * <p>This will gather all output panes into a single element, which causes 
them to be colocated
-   * on a single worker. As a result, this is only suitable for {@link 
PCollection PCollections}
-   * where all of the output elements for each pane fit in memory, such as in 
tests.
-   */
-  public static <T> GatherAllPanes<T> globally() {
-    return new GatherAllPanes<>();
-  }
-
-  private GatherAllPanes() {}
-
-  @Override
-  public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
-    WindowFn<?, ?> originalWindowFn = 
input.getWindowingStrategy().getWindowFn();
-
-    return input
-        .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
-        .setCoder(
-            WindowedValue.FullWindowedValueCoder.of(
-                input.getCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
-        .apply(
-            WithKeys.<Integer, WindowedValue<T>>of(0).withKeyType(new 
TypeDescriptor<Integer>() {}))
-        .apply(
-            Window.into(
-                    new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
-                        originalWindowFn.windowCoder()))
-                .triggering(Never.ever())
-                
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
-                .discardingFiredPanes())
-        // all values have the same key so they all appear as a single output 
element
-        .apply(GroupByKey.<Integer, WindowedValue<T>>create())
-        .apply(Values.<Iterable<WindowedValue<T>>>create())
-        .setWindowingStrategyInternal(input.getWindowingStrategy());
-  }
-
-  private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, 
WindowedValue<T>> {
-    @DoFn.ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
-      c.output(WindowedValue.of(c.element(), c.timestamp(), window, c.pane()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index 8ca1bfd..c02e1f4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -45,7 +45,7 @@ import org.joda.time.Instant;
  * <p>This {@link WindowFn} is an internal implementation detail of 
sdk-provided utilities, and
  * should not be used by {@link Pipeline} writers.
  */
-class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
+public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
 
   /**
    * The coder of the type of windows of the input {@link PCollection}. This 
is not an arbitrary

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
deleted file mode 100644
index 3531a86..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import com.google.common.collect.Lists;
-import java.util.Collection;
-import java.util.Objects;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Instant;
-
-/**
- * Matchers that are useful for working with Windowing, Timestamps, etc.
- */
-public class WindowMatchers {
-
-  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      T value,
-      Instant timestamp,
-      Collection<? extends BoundedWindow> windows,
-      PaneInfo paneInfo) {
-
-    Collection<Matcher<? super BoundedWindow>> windowMatchers =
-        Lists.newArrayListWithCapacity(windows.size());
-    for (BoundedWindow window : windows) {
-      windowMatchers.add(Matchers.equalTo(window));
-    }
-
-    return isWindowedValue(
-        Matchers.equalTo(value),
-        Matchers.equalTo(timestamp),
-        Matchers.containsInAnyOrder(windowMatchers),
-        Matchers.equalTo(paneInfo));
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      Matcher<? super T> valueMatcher,
-      Matcher<? super Instant> timestampMatcher,
-      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
-      Matcher<? super PaneInfo> paneInfoMatcher) {
-    return new WindowedValueMatcher<>(
-        valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      Matcher<? super T> valueMatcher,
-      Matcher<? super Instant> timestampMatcher,
-      Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
-    return new WindowedValueMatcher<>(
-        valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      Matcher<? super T> valueMatcher, Matcher<? super Instant> 
timestampMatcher) {
-    return new WindowedValueMatcher<>(
-        valueMatcher, timestampMatcher, Matchers.anything(), 
Matchers.anything());
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
-      Matcher<? super T> valueMatcher) {
-    return new WindowedValueMatcher<>(
-        valueMatcher, Matchers.anything(), Matchers.anything(), 
Matchers.anything());
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      T value, long timestamp, long windowStart, long windowEnd) {
-    return WindowMatchers.<T>isSingleWindowedValue(
-        Matchers.equalTo(value), timestamp, windowStart, windowEnd);
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
-    return WindowMatchers.<T>isSingleWindowedValue(
-        Matchers.equalTo(value),
-        Matchers.equalTo(timestamp),
-        Matchers.equalTo(window),
-        Matchers.equalTo(paneInfo));
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      T value, Instant timestamp, BoundedWindow window) {
-    return WindowMatchers.<T>isSingleWindowedValue(
-        Matchers.equalTo(value), Matchers.equalTo(timestamp), 
Matchers.equalTo(window));
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      Matcher<T> valueMatcher, long timestamp, long windowStart, long 
windowEnd) {
-    IntervalWindow intervalWindow =
-        new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
-    return WindowMatchers.<T>isSingleWindowedValue(
-        valueMatcher,
-        Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), 
timestamp),
-        Matchers.<BoundedWindow>equalTo(intervalWindow),
-        Matchers.anything());
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      Matcher<? super T> valueMatcher,
-      Matcher<? super Instant> timestampMatcher,
-      Matcher<? super BoundedWindow> windowMatcher) {
-    return new WindowedValueMatcher<T>(
-        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), 
Matchers.anything());
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
-      Matcher<? super T> valueMatcher,
-      Matcher<? super Instant> timestampMatcher,
-      Matcher<? super BoundedWindow> windowMatcher,
-      Matcher<? super PaneInfo> paneInfoMatcher) {
-    return new WindowedValueMatcher<T>(
-        valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), 
paneInfoMatcher);
-  }
-
-  public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
-    return Matchers.equalTo(new IntervalWindow(new Instant(start), new 
Instant(end)));
-  }
-
-  public static <T> Matcher<WindowedValue<? extends T>> 
valueWithPaneInfo(final PaneInfo paneInfo) {
-    return new TypeSafeMatcher<WindowedValue<? extends T>>() {
-      @Override
-      public void describeTo(Description description) {
-        description
-            .appendText("WindowedValue(paneInfo = 
").appendValue(paneInfo).appendText(")");
-      }
-
-      @Override
-      protected boolean matchesSafely(WindowedValue<? extends T> item) {
-        return Objects.equals(item.getPane(), paneInfo);
-      }
-
-      @Override
-      protected void describeMismatchSafely(
-          WindowedValue<? extends T> item, Description mismatchDescription) {
-        mismatchDescription.appendValue(item.getPane());
-      }
-    };
-  }
-
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  @SafeVarargs
-  public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows(
-      Matcher<W>... windows) {
-    return (Matcher) Matchers.<W>containsInAnyOrder(windows);
-  }
-
-  private WindowMatchers() {}
-
-  private static class WindowedValueMatcher<T> extends 
TypeSafeMatcher<WindowedValue<? extends T>> {
-
-    private Matcher<? super T> valueMatcher;
-    private Matcher<? super Instant> timestampMatcher;
-    private Matcher<? super Collection<? extends BoundedWindow>> 
windowsMatcher;
-    private Matcher<? super PaneInfo> paneInfoMatcher;
-
-    private WindowedValueMatcher(
-        Matcher<? super T> valueMatcher,
-        Matcher<? super Instant> timestampMatcher,
-        Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
-        Matcher<? super PaneInfo> paneInfoMatcher) {
-      this.valueMatcher = valueMatcher;
-      this.timestampMatcher = timestampMatcher;
-      this.windowsMatcher = windowsMatcher;
-      this.paneInfoMatcher = paneInfoMatcher;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description
-          .appendText("a WindowedValue(").appendValue(valueMatcher)
-          .appendText(", ").appendValue(timestampMatcher)
-          .appendText(", ").appendValue(windowsMatcher)
-          .appendText(", ").appendValue(paneInfoMatcher)
-          .appendText(")");
-    }
-
-    @Override
-    protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
-      return valueMatcher.matches(windowedValue.getValue())
-          && timestampMatcher.matches(windowedValue.getTimestamp())
-          && windowsMatcher.matches(windowedValue.getWindows());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
deleted file mode 100644
index 89637e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link WindowMatchers}.
- */
-@RunWith(JUnit4.class)
-public class WindowMatchersTest {
-
-  @Test
-  public void testIsWindowedValueExact() {
-    long timestamp = 100;
-    long windowStart = 0;
-    long windowEnd = 200;
-
-    assertThat(
-        WindowedValue.of(
-            "hello",
-            new Instant(timestamp),
-            new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd)),
-            PaneInfo.NO_FIRING),
-        WindowMatchers.isWindowedValue(
-            "hello",
-            new Instant(timestamp),
-            ImmutableList.of(new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd))),
-            PaneInfo.NO_FIRING));
-  }
-
-  @Test
-  public void testIsWindowedValueReorderedWindows() {
-    long timestamp = 100;
-    long windowStart = 0;
-    long windowEnd = 200;
-    long windowStart2 = 50;
-    long windowEnd2 = 150;
-
-    assertThat(
-        WindowedValue.of(
-            "hello",
-            new Instant(timestamp),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd)),
-                new IntervalWindow(new Instant(windowStart2), new 
Instant(windowEnd2))),
-            PaneInfo.NO_FIRING),
-        WindowMatchers.isWindowedValue(
-            "hello",
-            new Instant(timestamp),
-            ImmutableList.of(
-                new IntervalWindow(new Instant(windowStart), new 
Instant(windowEnd)),
-                new IntervalWindow(new Instant(windowStart2), new 
Instant(windowEnd2))),
-            PaneInfo.NO_FIRING));
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
new file mode 100644
index 0000000..417147f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GatherAllPanes}. */
+@RunWith(JUnit4.class)
+public class GatherAllPanesTest implements Serializable {
+  @Test
+  @Category(NeedsRunner.class)
+  public void singlePaneSingleReifiedPane() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> 
accumulatedPanes =
+        p.apply(CountingInput.upTo(20000))
+            .apply(
+                WithTimestamps.of(
+                    new SerializableFunction<Long, Instant>() {
+                      @Override
+                      public Instant apply(Long input) {
+                        return new Instant(input * 10);
+                      }
+                    }))
+            .apply(
+                Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+                    .triggering(AfterWatermark.pastEndOfWindow())
+                    .withAllowedLateness(Duration.ZERO)
+                    .discardingFiredPanes())
+            .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new 
TypeDescriptor<Void>() {}))
+            .apply(GroupByKey.<Void, Long>create())
+            .apply(Values.<Iterable<Long>>create())
+            .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+    PAssert.that(accumulatedPanes)
+        .satisfies(
+            new SerializableFunction<
+                Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>>, 
Void>() {
+              @Override
+              public Void 
apply(Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>> input) {
+                for (Iterable<ValueInSingleWindow<Iterable<Long>>> 
windowedInput : input) {
+                  if (Iterables.size(windowedInput) > 1) {
+                    fail("Expected all windows to have exactly one pane, got " 
+ windowedInput);
+                    return null;
+                  }
+                }
+                return null;
+              }
+            });
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void multiplePanesMultipleReifiedPane() {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<Long> someElems = p.apply("someLongs", 
CountingInput.upTo(20000));
+    PCollection<Long> otherElems = p.apply("otherLongs", 
CountingInput.upTo(20000));
+    PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> 
accumulatedPanes =
+        PCollectionList.of(someElems)
+            .and(otherElems)
+            .apply(Flatten.<Long>pCollections())
+            .apply(
+                WithTimestamps.of(
+                    new SerializableFunction<Long, Instant>() {
+                      @Override
+                      public Instant apply(Long input) {
+                        return new Instant(input * 10);
+                      }
+                    }))
+            .apply(
+                Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+                    .triggering(
+                        AfterWatermark.pastEndOfWindow()
+                            
.withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+                    .withAllowedLateness(Duration.ZERO)
+                    .discardingFiredPanes())
+            .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new 
TypeDescriptor<Void>() {}))
+            .apply(GroupByKey.<Void, Long>create())
+            .apply(Values.<Iterable<Long>>create())
+            .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+    PAssert.that(accumulatedPanes)
+        .satisfies(
+            new SerializableFunction<
+                Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>>, 
Void>() {
+              @Override
+              public Void 
apply(Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>> input) {
+                for (Iterable<ValueInSingleWindow<Iterable<Long>>> 
windowedInput : input) {
+                  if (Iterables.size(windowedInput) > 1) {
+                    return null;
+                  }
+                }
+                fail("Expected at least one window to have multiple panes");
+                return null;
+              }
+            });
+
+    p.run();
+  }
+}

Reply via email to