Remove inaccurate pluralization from GABWViaOutputBufferDoFn

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a27bb240
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a27bb240
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a27bb240

Branch: refs/heads/master
Commit: a27bb240dfd9d414506b19116cffd1f900b8f0e7
Parents: c2925fe
Author: Kenneth Knowles <[email protected]>
Authored: Mon Feb 6 08:35:51 2017 -0800
Committer: Kenneth Knowles <[email protected]>
Committed: Mon Feb 6 08:35:51 2017 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   | 115 +++++++++++++++++++
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 115 -------------------
 ...roupAlsoByWindowViaOutputBufferDoFnTest.java | 110 ++++++++++++++++++
 ...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 110 ------------------
 .../translation/SparkGroupAlsoByWindowFn.java   |   3 +-
 5 files changed, 227 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a27bb240/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
new file mode 100644
index 0000000..85df98c
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.joda.time.Instant;
+
+/**
+ * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no 
specialized "fast path"
+ * implementation is applicable.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
+    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
+
+  private final WindowingStrategy<?, W> strategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
+  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+
+  public GroupAlsoByWindowViaOutputBufferDoFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
+      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
+    this.strategy = windowingStrategy;
+    this.reduceFn = reduceFn;
+    this.stateInternalsFactory = stateInternalsFactory;
+  }
+
+  @Override
+  public void processElement(ProcessContext c) throws Exception {
+    K key = c.element().getKey();
+    // Used with Batch, we know that all the data is available for this key. 
We can't use the
+    // timer manager from the context because it doesn't exist. So we create 
one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+    StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
+
+    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+        new ReduceFnRunner<>(
+            key,
+            strategy,
+            ExecutableTriggerStateMachine.create(
+                
TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
+            stateInternals,
+            timerInternals,
+            
WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
+            WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
+            droppedDueToClosedWindow,
+            reduceFn,
+            c.getPipelineOptions());
+
+    // Process the elements.
+    reduceFnRunner.processElements(c.element().getValue());
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    fireEligibleTimers(timerInternals, reduceFnRunner);
+
+    reduceFnRunner.persist();
+  }
+
+  private void fireEligibleTimers(
+      InMemoryTimerInternals timerInternals, ReduceFnRunner<K, InputT, 
OutputT, W> reduceFnRunner)
+      throws Exception {
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) 
!= null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a27bb240/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
deleted file mode 100644
index bc02679..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
-import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.joda.time.Instant;
-
-/**
- * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no 
specialized "fast path"
- * implementation is applicable.
- */
-@SystemDoFnInternal
-public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
-    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
-
-  private final WindowingStrategy<?, W> strategy;
-  private final StateInternalsFactory<K> stateInternalsFactory;
-  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
-
-  public GroupAlsoByWindowsViaOutputBufferDoFn(
-      WindowingStrategy<?, W> windowingStrategy,
-      StateInternalsFactory<K> stateInternalsFactory,
-      SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
-    this.strategy = windowingStrategy;
-    this.reduceFn = reduceFn;
-    this.stateInternalsFactory = stateInternalsFactory;
-  }
-
-  @Override
-  public void processElement(ProcessContext c) throws Exception {
-    K key = c.element().getKey();
-    // Used with Batch, we know that all the data is available for this key. 
We can't use the
-    // timer manager from the context because it doesn't exist. So we create 
one and emulate the
-    // watermark, knowing that we have all data and it is in timestamp order.
-    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-    timerInternals.advanceProcessingTime(Instant.now());
-    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
-
-    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
-        new ReduceFnRunner<>(
-            key,
-            strategy,
-            ExecutableTriggerStateMachine.create(
-                
TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
-            stateInternals,
-            timerInternals,
-            
WindowingInternalsAdapters.outputWindowedValue(c.windowingInternals()),
-            WindowingInternalsAdapters.sideInputReader(c.windowingInternals()),
-            droppedDueToClosedWindow,
-            reduceFn,
-            c.getPipelineOptions());
-
-    // Process the elements.
-    reduceFnRunner.processElements(c.element().getValue());
-
-    // Finish any pending windows by advancing the input watermark to infinity.
-    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    // Finally, advance the processing time to infinity to fire any timers.
-    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    
timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    fireEligibleTimers(timerInternals, reduceFnRunner);
-
-    reduceFnRunner.persist();
-  }
-
-  private void fireEligibleTimers(
-      InMemoryTimerInternals timerInternals, ReduceFnRunner<K, InputT, 
OutputT, W> reduceFnRunner)
-      throws Exception {
-    List<TimerInternals.TimerData> timers = new ArrayList<>();
-    while (true) {
-      TimerInternals.TimerData timer;
-      while ((timer = timerInternals.removeNextEventTimer()) != null) {
-        timers.add(timer);
-      }
-      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-        timers.add(timer);
-      }
-      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) 
!= null) {
-        timers.add(timer);
-      }
-      if (timers.isEmpty()) {
-        break;
-      }
-      reduceFnRunner.onTimers(timers);
-      timers.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a27bb240/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
new file mode 100644
index 0000000..681c316
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import 
org.apache.beam.runners.core.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link GroupAlsoByWindowViaOutputBufferDoFn}.
+ */
+@RunWith(JUnit4.class)
+public class GroupAlsoByWindowViaOutputBufferDoFnTest {
+
+  private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
+  implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
+
+    private final Coder<InputT> inputCoder;
+
+    public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
+      this.inputCoder = inputCoder;
+    }
+
+    @Override
+    public <W extends BoundedWindow>
+    GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
+            WindowingStrategy<?, W> windowingStrategy,
+            StateInternalsFactory<K> stateInternalsFactory) {
+      return new GroupAlsoByWindowViaOutputBufferDoFn<K, InputT, 
Iterable<InputT>, W>(
+          windowingStrategy,
+          stateInternalsFactory,
+          SystemReduceFn.<K, InputT, W>buffering(inputCoder));
+    }
+  }
+
+  @Test
+  public void testEmptyInputEmptyOutput() throws Exception {
+    GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
+        new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsElementsIntoFixedWindows() throws Exception {
+    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsElementsIntoSlidingWindows() throws Exception {
+    
GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
+    GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsIntoSessions() throws Exception {
+    GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() 
throws Exception {
+    
GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws 
Exception {
+    
GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws 
Exception {
+    
GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+
+  @Test
+  public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws 
Exception {
+    
GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
+        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a27bb240/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
deleted file mode 100644
index 1fad1fb..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import 
org.apache.beam.runners.core.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link GroupAlsoByWindowsViaOutputBufferDoFn}.
- */
-@RunWith(JUnit4.class)
-public class GroupAlsoByWindowsViaOutputBufferDoFnTest {
-
-  private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT>
-  implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> {
-
-    private final Coder<InputT> inputCoder;
-
-    public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) {
-      this.inputCoder = inputCoder;
-    }
-
-    @Override
-    public <W extends BoundedWindow>
-    GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy(
-            WindowingStrategy<?, W> windowingStrategy,
-            StateInternalsFactory<K> stateInternalsFactory) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, 
Iterable<InputT>, W>(
-          windowingStrategy,
-          stateInternalsFactory,
-          SystemReduceFn.<K, InputT, W>buffering(inputCoder));
-    }
-  }
-
-  @Test
-  public void testEmptyInputEmptyOutput() throws Exception {
-    GroupAlsoByWindowsProperties.emptyInputEmptyOutput(
-        new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSlidingWindows() throws Exception {
-    
GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoOverlappingNonmergingWindows() throws Exception {
-    GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsIntoSessions() throws Exception {
-    GroupAlsoByWindowsProperties.groupsElementsInMergedSessions(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() 
throws Exception {
-    
GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws 
Exception {
-    
GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws 
Exception {
-    
GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-
-  @Test
-  public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws 
Exception {
-    
GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp(
-        new BufferingGABWViaOutputBufferDoFnFactory<String, 
String>(StringUtf8Coder.of()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a27bb240/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index b615132..34eea65 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
 import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
@@ -50,7 +51,7 @@ import org.joda.time.Instant;
 
 
 /**
- * An implementation of {@link 
org.apache.beam.runners.core.GroupAlsoByWindowsViaOutputBufferDoFn}
+ * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
  * for the Spark runner.
  */
 public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>

Reply via email to