Remove references to multi-window representation from model

Some areas of the Beam model in the SDK allude to the use of a
compressed representation of an element along with the set
of windows it is assigned to. However, the model itself views
elements in different windows as fully independent, so the SDK
should not place any obligation on the part of the runner or
user to use a particular representation.

This change removes those places in the SDK where an element
is treated in multiple windows at once.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08104410
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08104410
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08104410

Branch: refs/heads/master
Commit: 08104410177063b1095bd91b24b40f9961c92cf2
Parents: a3aa4c7
Author: Kenneth Knowles <[email protected]>
Authored: Mon May 9 12:17:09 2016 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 23 09:35:44 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/AssignWindowsDoFn.java | 15 ++++---
 .../apache/beam/sdk/util/DoFnRunnerBase.java    |  2 +-
 .../beam/sdk/util/ReduceFnRunnerTest.java       |  3 +-
 .../apache/beam/sdk/util/ReduceFnTester.java    | 46 +++++++++++---------
 .../runners/direct/WindowEvaluatorFactory.java  |  6 ++-
 .../direct/WindowEvaluatorFactoryTest.java      |  4 +-
 .../FlinkStreamingTransformTranslators.java     |  5 ++-
 .../functions/FlinkAssignContext.java           | 15 ++++++-
 .../functions/FlinkNoElementAssignContext.java  |  4 +-
 .../streaming/FlinkAbstractParDoWrapper.java    |  4 +-
 .../flink/streaming/GroupAlsoByWindowTest.java  |  2 +-
 .../beam/sdk/testing/WindowFnTestUtils.java     |  5 ++-
 .../sdk/transforms/windowing/GlobalWindows.java |  5 ---
 .../windowing/PartitioningWindowFn.java         |  5 ---
 .../beam/sdk/transforms/windowing/WindowFn.java | 11 +----
 .../apache/beam/sdk/util/GatherAllPanes.java    |  3 +-
 .../apache/beam/sdk/util/IdentityWindowFn.java  | 20 +++------
 .../org/apache/beam/sdk/util/Reshuffle.java     |  3 +-
 .../sdk/util/IdentitySideInputWindowFn.java     |  3 +-
 .../sdk/util/MergingActiveWindowSetTest.java    |  6 +--
 .../org/apache/beam/sdk/util/TriggerTester.java | 14 +++---
 21 files changed, 89 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
index caec40e..d40b007 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java
@@ -20,22 +20,27 @@ package org.apache.beam.sdk.util;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
 
 /**
- * {@link DoFn} that tags elements of a PCollection with windows, according
- * to the provided {@link WindowFn}.
+ * {@link DoFn} that tags elements of a {@link PCollection} with windows, 
according to the provided
+ * {@link WindowFn}.
+ *
  * @param <T> Type of elements being windowed
  * @param <W> Window type
  */
 @SystemDoFnInternal
-public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> {
+public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T>
+    implements RequiresWindowAccess {
   private WindowFn<? super T, W> fn;
 
   public AssignWindowsDoFn(WindowFn<? super T, W> fn) {
@@ -64,8 +69,8 @@ public class AssignWindowsDoFn<T, W extends BoundedWindow> 
extends DoFn<T, T> {
                 }
 
                 @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
+                public BoundedWindow window() {
+                  return 
Iterables.getOnlyElement(c.windowingInternals().windows());
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 1ebe72b..a849eb2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -254,7 +254,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> 
implements DoFnRunner<Inpu
             }
 
             @Override
-            public Collection<? extends BoundedWindow> windows() {
+            public W window() {
               throw new UnsupportedOperationException(
                   "WindowFn attempted to access input windows when none were 
available");
             }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
index b7ec540..64fcae3 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java
@@ -82,7 +82,6 @@ import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -258,7 +257,7 @@ public class ReduceFnRunnerTest {
               }
 
               @Override
-              public Collection<? extends BoundedWindow> windows() {
+              public BoundedWindow window() {
                 throw new UnsupportedOperationException();
               }
             }));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
index 9916c5c..e897f54 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java
@@ -401,21 +401,25 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
       WindowTracing.trace("TriggerTester.injectElements: {}", value);
     }
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
-    runner.processElements(Iterables.transform(
-        Arrays.asList(values), new Function<TimestampedValue<InputT>, 
WindowedValue<InputT>>() {
-          @Override
-          public WindowedValue<InputT> apply(TimestampedValue<InputT> input) {
-            try {
-              InputT value = input.getValue();
-              Instant timestamp = input.getTimestamp();
-              Collection<W> windows = windowFn.assignWindows(new 
TestAssignContext<W>(
-                  windowFn, value, timestamp, 
Arrays.asList(GlobalWindow.INSTANCE)));
-              return WindowedValue.of(value, timestamp, windows, 
PaneInfo.NO_FIRING);
-            } catch (Exception e) {
-              throw new RuntimeException(e);
-            }
-          }
-        }));
+    runner.processElements(
+        Iterables.transform(
+            Arrays.asList(values),
+            new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() {
+              @Override
+              public WindowedValue<InputT> apply(TimestampedValue<InputT> 
input) {
+                try {
+                  InputT value = input.getValue();
+                  Instant timestamp = input.getTimestamp();
+                  Collection<W> windows =
+                      windowFn.assignWindows(
+                          new TestAssignContext<W>(
+                              windowFn, value, timestamp, 
GlobalWindow.INSTANCE));
+                  return WindowedValue.of(value, timestamp, windows, 
PaneInfo.NO_FIRING);
+                } catch (Exception e) {
+                  throw new RuntimeException(e);
+                }
+              }
+            }));
 
     // Persist after each bundle.
     runner.persist();
@@ -538,14 +542,14 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
     private Instant timestamp;
-    private Collection<? extends BoundedWindow> windows;
+    private BoundedWindow window;
 
-    public TestAssignContext(WindowFn<Object, W> windowFn, Object element, 
Instant timestamp,
-        Collection<? extends BoundedWindow> windows) {
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, 
BoundedWindow window) {
       windowFn.super();
       this.element = element;
       this.timestamp = timestamp;
-      this.windows = windows;
+      this.window = window;
     }
 
     @Override
@@ -559,8 +563,8 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return windows;
+    public BoundedWindow window() {
+      return window;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 6045912..67c2f17 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -125,8 +127,8 @@ class WindowEvaluatorFactory implements 
TransformEvaluatorFactory {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return value.getWindows();
+    public BoundedWindow window() {
+      return Iterables.getOnlyElement(value.getWindows());
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index c5faa5a..65dcfeb 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -326,11 +326,11 @@ public class WindowEvaluatorFactoryTest {
   private static class EvaluatorTestWindowFn extends NonMergingWindowFn<Long, 
BoundedWindow> {
     @Override
     public Collection<BoundedWindow> assignWindows(AssignContext c) throws 
Exception {
-      if (c.windows().contains(GlobalWindow.INSTANCE)) {
+      if (c.window().equals(GlobalWindow.INSTANCE)) {
         return Collections.<BoundedWindow>singleton(new 
IntervalWindow(c.timestamp(),
             c.timestamp().plus(1L)));
       }
-      return (Collection<BoundedWindow>) c.windows();
+      return Collections.singleton(c.window());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index b3fed99..5d04068 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 import com.google.api.client.util.Maps;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.functions.FilterFunction;
@@ -359,8 +360,8 @@ public class FlinkStreamingTransformTranslators {
                 }
 
                 @Override
-                public Collection<? extends BoundedWindow> windows() {
-                  return c.windowingInternals().windows();
+                public BoundedWindow window() {
+                  return 
Iterables.getOnlyElement(c.windowingInternals().windows());
                 }
               });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
index 7ea8c20..6abb8ff 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -17,10 +17,14 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
 
 import java.util.Collection;
@@ -35,6 +39,13 @@ class FlinkAssignContext<InputT, W extends BoundedWindow>
 
   FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
     fn.super();
+    checkArgument(
+        Iterables.size(value.getWindows()) == 1,
+        String.format(
+            "%s passed to window assignment must be in a single window, but it 
was in %s: %s",
+            WindowedValue.class.getSimpleName(),
+            Iterables.size(value.getWindows()),
+            value.getWindows()));
     this.value = value;
   }
 
@@ -49,8 +60,8 @@ class FlinkAssignContext<InputT, W extends BoundedWindow>
   }
 
   @Override
-  public Collection<? extends BoundedWindow> windows() {
-    return value.getWindows();
+  public BoundedWindow window() {
+    return Iterables.getOnlyElement(value.getWindows());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
index 892f7a1..d49821b 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -65,7 +65,7 @@ class FlinkNoElementAssignContext<InputT, W extends 
BoundedWindow>
   }
 
   @Override
-  public Collection<? extends BoundedWindow> windows() {
-    throw new UnsupportedOperationException("No windows available.");
+  public BoundedWindow window() {
+    throw new UnsupportedOperationException("No window available.");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index 3c37aa9..f68a519 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -245,9 +245,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, 
OUTFL> extends RichFl
           }
 
           @Override
-          public Collection<? extends BoundedWindow> windows() {
+          public BoundedWindow window() {
             throw new UnsupportedOperationException(
-                "WindowFn attempted to access input windows when none were 
available");
+                "WindowFn attempted to access input window when none was 
available");
           }
         });
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
index 3e5a17d..207fb5a 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java
@@ -508,7 +508,7 @@ public class GroupAlsoByWindowTest extends 
StreamingMultipleProgramsTestBase {
           }
 
           @Override
-          public Collection<? extends BoundedWindow> windows() {
+          public BoundedWindow window() {
             throw new UnsupportedOperationException(
                 "WindowFn attempted to access input windows when none were 
available");
           }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
index a4130df..127721a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -115,8 +116,8 @@ public class WindowFnTestUtils {
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return null;
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 499ffeb..002bf2e 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -53,11 +53,6 @@ public class GlobalWindows extends 
NonMergingWindowFn<Object, GlobalWindow> {
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
   public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) {
     return inputTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index b0dd8b9..da2f38c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -51,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends 
BoundedWindow>
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
   public Instant getOutputTime(Instant inputTimestamp, W window) {
     return inputTimestamp;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 41833f8..d84866b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -65,10 +65,10 @@ public abstract class WindowFn<T, W extends BoundedWindow>
     public abstract Instant timestamp();
 
     /**
-     * Returns the windows the current element was in, prior to this
+     * Returns the window of the current element prior to this
      * {@code WindowFn} being called.
      */
-    public abstract Collection<? extends BoundedWindow> windows();
+    public abstract BoundedWindow window();
   }
 
   /**
@@ -161,13 +161,6 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   }
 
   /**
-   * Returns true if this {@code WindowFn} assigns each element to a single 
window.
-   */
-  public boolean assignsToSingleWindow() {
-    return false;
-  }
-
-  /**
    * {@inheritDoc}
    *
    * <p>By default, does not register any display data. Implementors may 
override this method

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index 5a01c28..ab40678 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -62,8 +62,7 @@ public class GatherAllPanes<T>
         .apply(
             Window.into(
                     new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
-                        originalWindowFn.windowCoder(),
-                        
input.getWindowingStrategy().getWindowFn().assignsToSingleWindow()))
+                        originalWindowFn.windowCoder()))
                 .triggering(Never.ever()))
         // all values have the same key so they all appear as a single output 
element
         .apply(GroupByKey.<Void, WindowedValue<T>>create())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index 91e5609..a3477e9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
 
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * A {@link WindowFn} that leaves all associations between elements and 
windows unchanged.
@@ -55,25 +56,21 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, 
BoundedWindow> {
    * these windows.
    */
   private final Coder<BoundedWindow> coder;
-  private final boolean assignsToSingleWindow;
 
-  public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean 
assignsToSingleWindow) {
+  public IdentityWindowFn(Coder<? extends BoundedWindow> coder) {
     // Safe because it is only used privately here.
     // At every point where a window is returned or accepted, it has been 
provided
-    // by priorWindowFn, so it is of the expected type.
+    // by the prior WindowFn, so it is of the expected type.
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder;
     this.coder = windowCoder;
-    this.assignsToSingleWindow = assignsToSingleWindow;
   }
 
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<T, 
BoundedWindow>.AssignContext c)
       throws Exception {
-    // The windows are provided by priorWindowFn, which also provides the 
coder for them
-    @SuppressWarnings("unchecked")
-    Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) 
c.windows();
-    return priorWindows;
+    // The window is provided by the prior WindowFn, which also provides the 
coder for them
+    return Collections.singleton(c.window());
   }
 
   @Override
@@ -88,17 +85,12 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, 
BoundedWindow> {
 
   @Override
   public Coder<BoundedWindow> windowCoder() {
-    // Safe because the previous WindowFn provides both the windows and the 
coder.
+    // Safe because the prior WindowFn provides both the windows and the coder.
     // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
     return coder;
   }
 
   @Override
-  public boolean assignsToSingleWindow() {
-    return assignsToSingleWindow;
-  }
-
-  @Override
   public BoundedWindow getSideInputWindow(BoundedWindow window) {
     throw new UnsupportedOperationException(
         String.format(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 5c91326..c0d159b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -58,8 +58,7 @@ public class Reshuffle<K, V> extends 
PTransform<PCollection<KV<K, V>>, PCollecti
     Window.Bound<KV<K, V>> rewindow =
         Window.<KV<K, V>>into(
                 new IdentityWindowFn<>(
-                    originalStrategy.getWindowFn().windowCoder(),
-                    originalStrategy.getWindowFn().assignsToSingleWindow()))
+                    originalStrategy.getWindowFn().windowCoder()))
             .triggering(new ReshuffleTrigger<>())
             .discardingFiredPanes()
             
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index db6f425..705003e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -24,6 +24,7 @@ import 
org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * A {@link WindowFn} for use during tests that returns the input window for 
calls to
@@ -33,7 +34,7 @@ public class IdentitySideInputWindowFn extends 
NonMergingWindowFn<Integer, Bound
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<Integer, 
BoundedWindow>.AssignContext c)
       throws Exception {
-    return (Collection<BoundedWindow>) c.windows();
+    return Collections.singleton(c.window());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
index 84699d6..4750af1 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
@@ -39,7 +40,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,8 +87,8 @@ public class MergingActiveWindowSetTest {
         }
 
         @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return ImmutableList.of();
+        public BoundedWindow window() {
+          return GlobalWindow.INSTANCE;
         }
       };
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index a1e376e..c495712 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -245,7 +245,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> 
{
         InputT value = input.getValue();
         Instant timestamp = input.getTimestamp();
         Collection<W> assignedWindows = windowFn.assignWindows(new 
TestAssignContext<W>(
-            windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE)));
+            windowFn, value, timestamp, GlobalWindow.INSTANCE));
 
         for (W window : assignedWindows) {
           activeWindows.addActiveForTesting(window);
@@ -401,14 +401,14 @@ public class TriggerTester<InputT, W extends 
BoundedWindow> {
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
     private Instant timestamp;
-    private Collection<? extends BoundedWindow> windows;
+    private BoundedWindow window;
 
-    public TestAssignContext(WindowFn<Object, W> windowFn, Object element, 
Instant timestamp,
-        Collection<? extends BoundedWindow> windows) {
+    public TestAssignContext(
+        WindowFn<Object, W> windowFn, Object element, Instant timestamp, 
BoundedWindow window) {
       windowFn.super();
       this.element = element;
       this.timestamp = timestamp;
-      this.windows = windows;
+      this.window = window;
     }
 
     @Override
@@ -422,8 +422,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> 
{
     }
 
     @Override
-    public Collection<? extends BoundedWindow> windows() {
-      return windows;
+    public BoundedWindow window() {
+      return window;
     }
   }
 

Reply via email to