Add GroupByKey InProcess override This takes a GroupByKey primitive and implements it as a sequence of composite transforms, including a simpler GroupByKeyOnly primitive.
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115404181 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96b02f47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96b02f47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96b02f47 Branch: refs/heads/master Commit: 96b02f4737457d158db60ae467e7098839c383ab Parents: 01a0da0 Author: tgroh <[email protected]> Authored: Tue Feb 23 17:52:25 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:27 2016 -0800 ---------------------------------------------------------------------- .../inprocess/GroupByKeyEvaluatorFactory.java | 128 ++++++++++++++++--- .../inprocess/InProcessPipelineRunner.java | 9 ++ .../dataflow/sdk/transforms/GroupByKey.java | 2 +- .../GroupByKeyEvaluatorFactoryTest.java | 114 ++++++++++------- 4 files changed, 190 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java index 44d6909..0347281 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java @@ -19,18 +19,29 @@ import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Builder; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; +import com.google.cloud.dataflow.sdk.util.SystemReduceFn; import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Arrays; @@ -53,52 +64,56 @@ class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext); } - private <K, V> TransformEvaluator<KV<K, V>> createEvaluator( + private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator( final AppliedPTransform< - PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKeyOnly<K, V>> + PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, + InProcessGroupByKeyOnly<K, V>> application, final CommittedBundle<KV<K, V>> inputBundle, final InProcessEvaluationContext evaluationContext) { return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application); } - private static class GroupByKeyEvaluator<K, V> implements TransformEvaluator<KV<K, V>> { + private static class GroupByKeyEvaluator<K, V> + implements TransformEvaluator<KV<K, WindowedValue<V>>> { private final InProcessEvaluationContext evaluationContext; private final CommittedBundle<KV<K, V>> inputBundle; private final AppliedPTransform< - PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKeyOnly<K, V>> + PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, + InProcessGroupByKeyOnly<K, V>> application; private final Coder<K> keyCoder; - private Map<GroupingKey<K>, List<V>> groupingMap; + private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap; public GroupByKeyEvaluator( InProcessEvaluationContext evaluationContext, CommittedBundle<KV<K, V>> inputBundle, AppliedPTransform< - PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupByKeyOnly<K, V>> + PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, + InProcessGroupByKeyOnly<K, V>> application) { this.evaluationContext = evaluationContext; this.inputBundle = inputBundle; this.application = application; - PCollection<KV<K, V>> input = application.getInput(); + PCollection<KV<K, WindowedValue<V>>> input = application.getInput(); keyCoder = getKeyCoder(input.getCoder()); groupingMap = new HashMap<>(); } - private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) { + private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) { if (!(coder instanceof KvCoder)) { throw new IllegalStateException(); } @SuppressWarnings("unchecked") - Coder<K> keyCoder = ((KvCoder<K, V>) coder).getKeyCoder(); + Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder(); return keyCoder; } @Override - public void processElement(WindowedValue<KV<K, V>> element) { - KV<K, V> kv = element.getValue(); + public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) { + KV<K, WindowedValue<V>> kv = element.getValue(); K key = kv.getKey(); byte[] encodedKey; try { @@ -111,20 +126,23 @@ class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { exn); } GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey); - if (!groupingMap.containsKey(groupingKey)) { - groupingMap.put(groupingKey, new ArrayList<V>()); + List<WindowedValue<V>> values = groupingMap.get(groupingKey); + if (values == null) { + values = new ArrayList<WindowedValue<V>>(); + groupingMap.put(groupingKey, values); } - List<V> values = groupingMap.get(groupingKey); values.add(kv.getValue()); } @Override public InProcessTransformResult finishBundle() { Builder resultBuilder = StepTransformResult.withoutHold(application); - for (Map.Entry<GroupingKey<K>, List<V>> groupedEntry : groupingMap.entrySet()) { + for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry : + groupingMap.entrySet()) { K key = groupedEntry.getKey().key; - KV<K, Iterable<V>> groupedKv = KV.<K, Iterable<V>>of(key, groupedEntry.getValue()); - UncommittedBundle<KV<K, Iterable<V>>> bundle = + KeyedWorkItem<K, V> groupedKv = + KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue()); + UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput()); bundle.add(WindowedValue.valueInEmptyWindows(groupedKv)); resultBuilder.addOutput(bundle); @@ -157,4 +175,78 @@ class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory { } } } + + /** + * An in-memory implementation of the {@link GroupByKey} primitive as a composite + * {@link PTransform}. + */ + public static final class InProcessGroupByKey<K, V> + extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + private final GroupByKey<K, V> original; + + public InProcessGroupByKey(GroupByKey<K, V> from) { + this.original = from; + } + + @Override + public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() { + return original; + } + + @Override + public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder(); + + // This operation groups by the combination of key and window, + // merging windows as needed, using the windows assigned to the + // key/value input elements and the window merge operation of the + // window function associated with the input PCollection. + WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + + // Use the default GroupAlsoByWindow implementation + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow = + groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder()); + + // By default, implement GroupByKey via a series of lower-level operations. + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows<K, V>()) + + .apply(new InProcessGroupByKeyOnly<K, V>()) + .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(), + inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) + + // Group each key's values by window, merging windows as needed. + .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy)) + .setCoder( + KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()))); + } + + private <W extends BoundedWindow> + DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow( + final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) { + return GroupAlsoByWindowViaWindowSetDoFn.create( + windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder)); + } + } + + /** + * An implementation primitive to use in the evaluation of a {@link GroupByKey} + * {@link PTransform}. + */ + public static final class InProcessGroupByKeyOnly<K, V> + extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> { + @Override + public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) { + return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + @VisibleForTesting + InProcessGroupByKeyOnly() {} + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 7747839..72642da 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.annotations.Experimental; +import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; @@ -38,6 +39,7 @@ import com.google.cloud.dataflow.sdk.util.state.StateInternals; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.PValue; +import com.google.common.collect.ImmutableMap; import org.joda.time.Instant; @@ -53,6 +55,13 @@ import javax.annotation.Nullable; */ @Experimental public class InProcessPipelineRunner { + @SuppressWarnings({"rawtypes", "unused"}) + private static Map<Class<? extends PTransform>, Class<? extends PTransform>> + defaultTransformOverrides = + ImmutableMap.<Class<? extends PTransform>, Class<? extends PTransform>>builder() + .put(GroupByKey.class, InProcessGroupByKey.class) + .build(); + private static Map<Class<?>, TransformEvaluatorFactory> defaultEvaluatorFactories = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java index dee7d95..8fde3e0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java @@ -296,7 +296,7 @@ public class GroupByKey<K, V> /** * Returns the {@code Coder} of the values of the input to this transform. */ - static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) { + public static <K, V> Coder<V> getInputValueCoder(Coder<KV<K, V>> inputCoder) { return getInputKvCoder(inputCoder).getValueCoder(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96b02f47/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index d8cb237..bb8a15d 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -28,8 +28,9 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; +import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -58,20 +59,22 @@ public class GroupByKeyEvaluatorFactoryTest { KV<String, Integer> firstBar = KV.of("bar", 22); KV<String, Integer> secondBar = KV.of("bar", 12); KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE); - PCollection<KV<String, Integer>> kvs = + PCollection<KV<String, Integer>> values = p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); - PCollection<KV<String, Iterable<Integer>>> groupedKvs = - kvs.apply(new GroupByKeyOnly<String, Integer>()); + PCollection<KV<String, WindowedValue<Integer>>> kvs = + values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, Integer>()); + PCollection<KeyedWorkItem<String, Integer>> groupedKvs = + kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>()); - CommittedBundle<KV<String, Integer>> inputBundle = - InProcessBundle.unkeyed(kvs).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle = + InProcessBundle.unkeyed(kvs).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Iterable<Integer>>> fooBundle = + UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = InProcessBundle.keyed(groupedKvs, "foo"); - UncommittedBundle<KV<String, Iterable<Integer>>> barBundle = + UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = InProcessBundle.keyed(groupedKvs, "bar"); - UncommittedBundle<KV<String, Iterable<Integer>>> bazBundle = + UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = InProcessBundle.keyed(groupedKvs, "baz"); when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); @@ -80,41 +83,64 @@ public class GroupByKeyEvaluatorFactoryTest { // The input to a GroupByKey is assumed to be a KvCoder @SuppressWarnings("unchecked") - Coder<String> keyCoder = ((KvCoder<String, Integer>) kvs.getCoder()).getKeyCoder(); - TransformEvaluator<KV<String, Integer>> evaluator = - new GroupByKeyEvaluatorFactory().forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); - evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); - evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo)); - evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar)); - evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar)); - evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz)); + Coder<String> keyCoder = + ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder(); + TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator = + new GroupByKeyEvaluatorFactory() + .forApplication( + groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar))); + evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz))); evaluator.finishBundle(); assertThat( fooBundle.commit(Instant.now()).getElements(), - contains(new KIterVMatcher<String, Integer>( - KV.<String, Iterable<Integer>>of("foo", ImmutableSet.of(-1, 1, 3)), keyCoder))); + contains( + new KeyedWorkItemMatcher<String, Integer>( + KeyedWorkItems.elementsWorkItem( + "foo", + ImmutableSet.of( + WindowedValue.valueInGlobalWindow(-1), + WindowedValue.valueInGlobalWindow(1), + WindowedValue.valueInGlobalWindow(3))), + keyCoder))); assertThat( barBundle.commit(Instant.now()).getElements(), - contains(new KIterVMatcher<String, Integer>( - KV.<String, Iterable<Integer>>of("bar", ImmutableSet.of(12, 22)), keyCoder))); + contains( + new KeyedWorkItemMatcher<String, Integer>( + KeyedWorkItems.elementsWorkItem( + "bar", + ImmutableSet.of( + WindowedValue.valueInGlobalWindow(12), + WindowedValue.valueInGlobalWindow(22))), + keyCoder))); assertThat( bazBundle.commit(Instant.now()).getElements(), - contains(new KIterVMatcher<String, Integer>( - KV.<String, Iterable<Integer>>of("baz", ImmutableSet.of(Integer.MAX_VALUE)), - keyCoder))); + contains( + new KeyedWorkItemMatcher<String, Integer>( + KeyedWorkItems.elementsWorkItem( + "baz", + ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))), + keyCoder))); } - private static class KIterVMatcher<K, V> extends BaseMatcher<WindowedValue<KV<K, Iterable<V>>>> { - private final KV<K, Iterable<V>> myKv; + private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) { + return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue())); + } + + private static class KeyedWorkItemMatcher<K, V> + extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> { + private final KeyedWorkItem<K, V> myWorkItem; private final Coder<K> keyCoder; - public KIterVMatcher(KV<K, Iterable<V>> myKv, Coder<K> keyCoder) { - this.myKv = myKv; + public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) { + this.myWorkItem = myWorkItem; this.keyCoder = keyCoder; } @@ -123,20 +149,20 @@ public class GroupByKeyEvaluatorFactoryTest { if (item == null || !(item instanceof WindowedValue)) { return false; } - @SuppressWarnings("unchecked") - WindowedValue<KV<K, Iterable<V>>> that = (WindowedValue<KV<K, Iterable<V>>>) item; - Multiset<V> myValues = HashMultiset.create(); - Multiset<V> thatValues = HashMultiset.create(); - for (V value : myKv.getValue()) { + WindowedValue<KeyedWorkItem<K, V>> that = (WindowedValue<KeyedWorkItem<K, V>>) item; + Multiset<WindowedValue<V>> myValues = HashMultiset.create(); + Multiset<WindowedValue<V>> thatValues = HashMultiset.create(); + for (WindowedValue<V> value : myWorkItem.elementsIterable()) { myValues.add(value); } - for (V value : that.getValue().getValue()) { + for (WindowedValue<V> value : that.getValue().elementsIterable()) { thatValues.add(value); } try { return myValues.equals(thatValues) - && keyCoder.structuralValue(myKv.getKey()) - .equals(keyCoder.structuralValue(that.getValue().getKey())); + && keyCoder + .structuralValue(myWorkItem.key()) + .equals(keyCoder.structuralValue(that.getValue().key())); } catch (Exception e) { return false; } @@ -144,11 +170,11 @@ public class GroupByKeyEvaluatorFactoryTest { @Override public void describeTo(Description description) { - description.appendText("KV<K, Iterable<V>> containing key ") - .appendValue(myKv.getKey()) + description + .appendText("KeyedWorkItem<K, V> containing key ") + .appendValue(myWorkItem.key()) .appendText(" and values ") - .appendValueList("[", ", ", "]", myKv.getValue()); + .appendValueList("[", ", ", "]", myWorkItem.elementsIterable()); } } } -
