Add GroupByKey InProcess override

This takes a GroupByKey primitive and implements it as a sequence of
composite transforms, including a simpler GroupByKeyOnly primitive.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115404181


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96b02f47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96b02f47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96b02f47

Branch: refs/heads/master
Commit: 96b02f4737457d158db60ae467e7098839c383ab
Parents: 01a0da0
Author: tgroh <[email protected]>
Authored: Tue Feb 23 17:52:25 2016 -0800
Committer: Davor Bonaci <[email protected]>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 .../inprocess/GroupByKeyEvaluatorFactory.java   | 128 ++++++++++++++++---
 .../inprocess/InProcessPipelineRunner.java      |   9 ++
 .../dataflow/sdk/transforms/GroupByKey.java     |   2 +-
 .../GroupByKeyEvaluatorFactoryTest.java         | 114 ++++++++++-------
 4 files changed, 190 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 44d6909..0347281 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -19,18 +19,29 @@ import static 
com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
 
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import 
com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
+import 
com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
+import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.annotations.VisibleForTesting;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,52 +64,56 @@ class GroupByKeyEvaluatorFactory implements 
TransformEvaluatorFactory {
         (AppliedPTransform) application, (CommittedBundle) inputBundle, 
evaluationContext);
   }
 
-  private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(
+  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
       final AppliedPTransform<
-              PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupByKeyOnly<K, V>>
+              PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>,
+              InProcessGroupByKeyOnly<K, V>>
           application,
       final CommittedBundle<KV<K, V>> inputBundle,
       final InProcessEvaluationContext evaluationContext) {
     return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, 
application);
   }
 
-  private static class GroupByKeyEvaluator<K, V> implements 
TransformEvaluator<KV<K, V>> {
+  private static class GroupByKeyEvaluator<K, V>
+      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
     private final InProcessEvaluationContext evaluationContext;
 
     private final CommittedBundle<KV<K, V>> inputBundle;
     private final AppliedPTransform<
-            PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupByKeyOnly<K, V>>
+            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, 
V>>,
+            InProcessGroupByKeyOnly<K, V>>
         application;
     private final Coder<K> keyCoder;
-    private Map<GroupingKey<K>, List<V>> groupingMap;
+    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
 
     public GroupByKeyEvaluator(
         InProcessEvaluationContext evaluationContext,
         CommittedBundle<KV<K, V>> inputBundle,
         AppliedPTransform<
-                PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, 
GroupByKeyOnly<K, V>>
+                PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>,
+                InProcessGroupByKeyOnly<K, V>>
             application) {
       this.evaluationContext = evaluationContext;
       this.inputBundle = inputBundle;
       this.application = application;
 
-      PCollection<KV<K, V>> input = application.getInput();
+      PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
       keyCoder = getKeyCoder(input.getCoder());
       groupingMap = new HashMap<>();
     }
 
-    private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
+    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
       if (!(coder instanceof KvCoder)) {
         throw new IllegalStateException();
       }
       @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, V>) coder).getKeyCoder();
+      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
       return keyCoder;
     }
 
     @Override
-    public void processElement(WindowedValue<KV<K, V>> element) {
-      KV<K, V> kv = element.getValue();
+    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) 
{
+      KV<K, WindowedValue<V>> kv = element.getValue();
       K key = kv.getKey();
       byte[] encodedKey;
       try {
@@ -111,20 +126,23 @@ class GroupByKeyEvaluatorFactory implements 
TransformEvaluatorFactory {
             exn);
       }
       GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      if (!groupingMap.containsKey(groupingKey)) {
-        groupingMap.put(groupingKey, new ArrayList<V>());
+      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
+      if (values == null) {
+        values = new ArrayList<WindowedValue<V>>();
+        groupingMap.put(groupingKey, values);
       }
-      List<V> values = groupingMap.get(groupingKey);
       values.add(kv.getValue());
     }
 
     @Override
     public InProcessTransformResult finishBundle() {
       Builder resultBuilder = StepTransformResult.withoutHold(application);
-      for (Map.Entry<GroupingKey<K>, List<V>> groupedEntry : 
groupingMap.entrySet()) {
+      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
+          groupingMap.entrySet()) {
         K key = groupedEntry.getKey().key;
-        KV<K, Iterable<V>> groupedKv = KV.<K, Iterable<V>>of(key, 
groupedEntry.getValue());
-        UncommittedBundle<KV<K, Iterable<V>>> bundle =
+        KeyedWorkItem<K, V> groupedKv =
+            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
+        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
             evaluationContext.createKeyedBundle(inputBundle, key, 
application.getOutput());
         bundle.add(WindowedValue.valueInEmptyWindows(groupedKv));
         resultBuilder.addOutput(bundle);
@@ -157,4 +175,78 @@ class GroupByKeyEvaluatorFactory implements 
TransformEvaluatorFactory {
       }
     }
   }
+
+  /**
+   * An in-memory implementation of the {@link GroupByKey} primitive as a 
composite
+   * {@link PTransform}.
+   */
+  public static final class InProcessGroupByKey<K, V>
+      extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+    private final GroupByKey<K, V> original;
+
+    public InProcessGroupByKey(GroupByKey<K, V> from) {
+      this.original = from;
+    }
+
+    @Override
+    public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> 
delegate() {
+      return original;
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+
+      // This operation groups by the combination of key and window,
+      // merging windows as needed, using the windows assigned to the
+      // key/value input elements and the window merge operation of the
+      // window function associated with the input PCollection.
+      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+      // Use the default GroupAlsoByWindow implementation
+      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
+          groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
+
+      // By default, implement GroupByKey via a series of lower-level 
operations.
+      return input
+          // Make each input element's timestamp and assigned windows
+          // explicit, in the value part.
+          .apply(new ReifyTimestampsAndWindows<K, V>())
+
+          .apply(new InProcessGroupByKeyOnly<K, V>())
+          .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
+              inputCoder.getValueCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
+
+          // Group each key's values by window, merging windows as needed.
+          .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
+
+          // And update the windowing strategy as appropriate.
+          
.setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
+          .setCoder(
+              KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getValueCoder())));
+    }
+
+    private <W extends BoundedWindow>
+        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
+            final WindowingStrategy<?, W> windowingStrategy, final Coder<V> 
inputCoder) {
+      return GroupAlsoByWindowViaWindowSetDoFn.create(
+          windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
+    }
+  }
+
+  /**
+   * An implementation primitive to use in the evaluation of a {@link 
GroupByKey}
+   * {@link PTransform}.
+   */
+  public static final class InProcessGroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>> {
+    @Override
+    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, 
WindowedValue<V>>> input) {
+      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
+    }
+
+    @VisibleForTesting
+    InProcessGroupByKeyOnly() {}
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 7747839..72642da 100644
--- 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -19,6 +19,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import com.google.cloud.dataflow.sdk.Pipeline;
 import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import 
com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
@@ -38,6 +39,7 @@ import 
com.google.cloud.dataflow.sdk.util.state.StateInternals;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
 import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.collect.ImmutableMap;
 
 import org.joda.time.Instant;
 
@@ -53,6 +55,13 @@ import javax.annotation.Nullable;
  */
 @Experimental
 public class InProcessPipelineRunner {
+  @SuppressWarnings({"rawtypes", "unused"})
+  private static Map<Class<? extends PTransform>, Class<? extends PTransform>>
+      defaultTransformOverrides =
+          ImmutableMap.<Class<? extends PTransform>, Class<? extends 
PTransform>>builder()
+              .put(GroupByKey.class, InProcessGroupByKey.class)
+              .build();
+
   private static Map<Class<?>, TransformEvaluatorFactory> 
defaultEvaluatorFactories =
       new ConcurrentHashMap<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
index dee7d95..8fde3e0 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
@@ -296,7 +296,7 @@ public class GroupByKey<K, V>
   /**
    * Returns the {@code Coder} of the values of the input to this transform.
    */
-  static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) {
+  public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) 
{
     return getInputKvCoder(inputCoder).getValueCoder();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index d8cb237..bb8a15d 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -28,8 +28,9 @@ import 
com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U
 import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
+import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -58,20 +59,22 @@ public class GroupByKeyEvaluatorFactoryTest {
     KV<String, Integer> firstBar = KV.of("bar", 22);
     KV<String, Integer> secondBar = KV.of("bar", 12);
     KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
-    PCollection<KV<String, Integer>> kvs =
+    PCollection<KV<String, Integer>> values =
         p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, 
thirdFoo));
-    PCollection<KV<String, Iterable<Integer>>> groupedKvs =
-        kvs.apply(new GroupByKeyOnly<String, Integer>());
+    PCollection<KV<String, WindowedValue<Integer>>> kvs =
+        values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, 
Integer>());
+    PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
+        kvs.apply(new 
GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
 
-    CommittedBundle<KV<String, Integer>> inputBundle =
-        InProcessBundle.unkeyed(kvs).commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
+        InProcessBundle.unkeyed(kvs).commit(Instant.now());
     InProcessEvaluationContext evaluationContext = 
mock(InProcessEvaluationContext.class);
 
-    UncommittedBundle<KV<String, Iterable<Integer>>> fooBundle =
+    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
         InProcessBundle.keyed(groupedKvs, "foo");
-    UncommittedBundle<KV<String, Iterable<Integer>>> barBundle =
+    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
         InProcessBundle.keyed(groupedKvs, "bar");
-    UncommittedBundle<KV<String, Iterable<Integer>>> bazBundle =
+    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
         InProcessBundle.keyed(groupedKvs, "baz");
 
     when(evaluationContext.createKeyedBundle(inputBundle, "foo", 
groupedKvs)).thenReturn(fooBundle);
@@ -80,41 +83,64 @@ public class GroupByKeyEvaluatorFactoryTest {
 
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")
-    Coder<String> keyCoder = ((KvCoder<String, Integer>) 
kvs.getCoder()).getKeyCoder();
-    TransformEvaluator<KV<String, Integer>> evaluator =
-        new GroupByKeyEvaluatorFactory().forApplication(
-            groupedKvs.getProducingTransformInternal(), inputBundle, 
evaluationContext);
-
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
-    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
+    Coder<String> keyCoder =
+        ((KvCoder<String, WindowedValue<Integer>>) 
kvs.getCoder()).getKeyCoder();
+    TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+        new GroupByKeyEvaluatorFactory()
+            .forApplication(
+                groupedKvs.getProducingTransformInternal(), inputBundle, 
evaluationContext);
+
+    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
+    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
+    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
+    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
+    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
+    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
 
     evaluator.finishBundle();
 
     assertThat(
         fooBundle.commit(Instant.now()).getElements(),
-        contains(new KIterVMatcher<String, Integer>(
-            KV.<String, Iterable<Integer>>of("foo", ImmutableSet.of(-1, 1, 
3)), keyCoder)));
+        contains(
+            new KeyedWorkItemMatcher<String, Integer>(
+                KeyedWorkItems.elementsWorkItem(
+                    "foo",
+                    ImmutableSet.of(
+                        WindowedValue.valueInGlobalWindow(-1),
+                        WindowedValue.valueInGlobalWindow(1),
+                        WindowedValue.valueInGlobalWindow(3))),
+                keyCoder)));
     assertThat(
         barBundle.commit(Instant.now()).getElements(),
-        contains(new KIterVMatcher<String, Integer>(
-            KV.<String, Iterable<Integer>>of("bar", ImmutableSet.of(12, 22)), 
keyCoder)));
+        contains(
+            new KeyedWorkItemMatcher<String, Integer>(
+                KeyedWorkItems.elementsWorkItem(
+                    "bar",
+                    ImmutableSet.of(
+                        WindowedValue.valueInGlobalWindow(12),
+                        WindowedValue.valueInGlobalWindow(22))),
+                keyCoder)));
     assertThat(
         bazBundle.commit(Instant.now()).getElements(),
-        contains(new KIterVMatcher<String, Integer>(
-            KV.<String, Iterable<Integer>>of("baz", 
ImmutableSet.of(Integer.MAX_VALUE)),
-            keyCoder)));
+        contains(
+            new KeyedWorkItemMatcher<String, Integer>(
+                KeyedWorkItems.elementsWorkItem(
+                    "baz",
+                    
ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
+                keyCoder)));
   }
 
-  private static class KIterVMatcher<K, V> extends 
BaseMatcher<WindowedValue<KV<K, Iterable<V>>>> {
-    private final KV<K, Iterable<V>> myKv;
+  private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
+    return KV.of(kv.getKey(), 
WindowedValue.valueInGlobalWindow(kv.getValue()));
+  }
+
+  private static class KeyedWorkItemMatcher<K, V>
+      extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
+    private final KeyedWorkItem<K, V> myWorkItem;
     private final Coder<K> keyCoder;
 
-    public KIterVMatcher(KV<K, Iterable<V>> myKv, Coder<K> keyCoder) {
-      this.myKv = myKv;
+    public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> 
keyCoder) {
+      this.myWorkItem = myWorkItem;
       this.keyCoder = keyCoder;
     }
 
@@ -123,20 +149,20 @@ public class GroupByKeyEvaluatorFactoryTest {
       if (item == null || !(item instanceof WindowedValue)) {
         return false;
       }
-      @SuppressWarnings("unchecked")
-      WindowedValue<KV<K, Iterable<V>>> that = (WindowedValue<KV<K, 
Iterable<V>>>) item;
-      Multiset<V> myValues = HashMultiset.create();
-      Multiset<V> thatValues = HashMultiset.create();
-      for (V value : myKv.getValue()) {
+      WindowedValue<KeyedWorkItem<K, V>> that = 
(WindowedValue<KeyedWorkItem<K, V>>) item;
+      Multiset<WindowedValue<V>> myValues = HashMultiset.create();
+      Multiset<WindowedValue<V>> thatValues = HashMultiset.create();
+      for (WindowedValue<V> value : myWorkItem.elementsIterable()) {
         myValues.add(value);
       }
-      for (V value : that.getValue().getValue()) {
+      for (WindowedValue<V> value : that.getValue().elementsIterable()) {
         thatValues.add(value);
       }
       try {
         return myValues.equals(thatValues)
-            && keyCoder.structuralValue(myKv.getKey())
-                   .equals(keyCoder.structuralValue(that.getValue().getKey()));
+            && keyCoder
+                .structuralValue(myWorkItem.key())
+                .equals(keyCoder.structuralValue(that.getValue().key()));
       } catch (Exception e) {
         return false;
       }
@@ -144,11 +170,11 @@ public class GroupByKeyEvaluatorFactoryTest {
 
     @Override
     public void describeTo(Description description) {
-      description.appendText("KV<K, Iterable<V>> containing key ")
-          .appendValue(myKv.getKey())
+      description
+          .appendText("KeyedWorkItem<K, V> containing key ")
+          .appendValue(myWorkItem.key())
           .appendText(" and values ")
-          .appendValueList("[", ", ", "]", myKv.getValue());
+          .appendValueList("[", ", ", "]", myWorkItem.elementsIterable());
     }
   }
 }
-

Reply via email to