Repository: incubator-beam Updated Branches: refs/heads/master 2b5544531 -> 6c34f3a34
Move GroupByKey expansion into DirectPipelineRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/589ef8a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/589ef8a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/589ef8a0 Branch: refs/heads/master Commit: 589ef8a09336f3363280e47039592b7d3afbc8f8 Parents: c1de175 Author: Kenneth Knowles <[email protected]> Authored: Thu Mar 24 15:26:37 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Mar 24 19:23:01 2016 -0700 ---------------------------------------------------------------------- .../FlinkBatchTransformTranslators.java | 13 +- .../spark/translation/TransformTranslator.java | 21 +- .../sdk/runners/DirectPipelineRunner.java | 248 +++++++++++++++ .../inprocess/GroupByKeyEvaluatorFactory.java | 2 +- .../dataflow/sdk/transforms/GroupByKey.java | 310 +------------------ .../cloud/dataflow/sdk/util/GroupByKeyOnly.java | 43 +++ .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 1 - .../sdk/util/ReifyTimestampsAndWindows.java | 48 +++ .../GroupByKeyEvaluatorFactoryTest.java | 4 +- 9 files changed, 365 insertions(+), 325 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index 48c783d..b09d033 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -53,6 +53,7 @@ import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema; import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey; import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple; import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue; +import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; @@ -96,7 +97,7 @@ public class FlinkBatchTransformTranslators { // -------------------------------------------------------------------------------------------- // Transform Translator Registry // -------------------------------------------------------------------------------------------- - + @SuppressWarnings("rawtypes") private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>(); @@ -112,7 +113,7 @@ public class FlinkBatchTransformTranslators { TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch()); - TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch()); + TRANSLATORS.put(GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch()); // TODO we're currently ignoring windows here but that has to change in the future TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); @@ -302,10 +303,10 @@ public class FlinkBatchTransformTranslators { } } - private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> { + private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKeyOnly<K, V>> { @Override - public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) { + public void translateNode(GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) { DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform)); GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>(); @@ -406,7 +407,7 @@ public class FlinkBatchTransformTranslators { // context.setOutputDataSet(transform.getOutput(), outputDataSet); // } // } - + private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> { private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class); @@ -589,6 +590,6 @@ public class FlinkBatchTransformTranslators { // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- - + private FlinkBatchTransformTranslators() {} } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 0bd047c..adb6e68 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -18,11 +18,6 @@ package org.apache.beam.runners.spark.translation; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; -import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; - import java.io.IOException; import java.lang.reflect.Field; import java.util.Arrays; @@ -30,6 +25,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate; +import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount; + import com.google.api.client.util.Maps; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -40,7 +40,6 @@ import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.Flatten; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; @@ -49,6 +48,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn; +import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -80,6 +80,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; + import scala.Tuple2; /** @@ -128,10 +129,10 @@ public final class TransformTranslator { }; } - private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() { - return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() { + private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbk() { + return new TransformEvaluator<GroupByKeyOnly<K, V>>() { @Override - public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) { + public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) { @SuppressWarnings("unchecked") JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD = (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform); @@ -768,7 +769,7 @@ public final class TransformTranslator { EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); EVALUATORS.put(ParDo.Bound.class, parDo()); EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); - EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk()); + EVALUATORS.put(GroupByKeyOnly.class, gbk()); EVALUATORS.put(Combine.GroupedValues.class, grouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); EVALUATORS.put(Combine.PerKey.class, combinePerKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java index 872cfef..629be83 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java @@ -16,6 +16,7 @@ package com.google.cloud.dataflow.sdk.runners; +import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -24,6 +25,9 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor; import com.google.cloud.dataflow.sdk.PipelineResult; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.IterableCoder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.ListCoder; import com.google.cloud.dataflow.sdk.io.AvroIO; import com.google.cloud.dataflow.sdk.io.FileBasedSink; @@ -38,19 +42,26 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.Combine; import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn; import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.Partition; import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.AppliedCombineFn; +import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn; +import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MapAggregatorValues; import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner; import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners; +import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.util.SerializableUtils; +import com.google.cloud.dataflow.sdk.util.SystemReduceFn; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.common.Counter; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.KV; @@ -71,8 +82,10 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,6 +151,8 @@ public class DirectPipelineRunner } } + ///////////////////////////////////////////////////////////////////////////// + /** * Records that instances of the specified PTransform class * should be evaluated by the corresponding TransformEvaluator. @@ -243,6 +258,8 @@ public class DirectPipelineRunner return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input); } else if (transform instanceof AvroIO.Write.Bound) { return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input); + } else if (transform instanceof GroupByKey) { + return (OutputT) ((PCollection) input).apply(new DirectGroupByKey((GroupByKey) transform)); } else { return super.apply(transform, input); } @@ -388,6 +405,43 @@ public class DirectPipelineRunner } } + private static class DirectGroupByKey<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { + + private GroupByKey<K, V> originalTransform; + + public DirectGroupByKey(GroupByKey<K, V> originalTransform) { + this.originalTransform = originalTransform; + } + + @Override + public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); + + return input + // Make each input element's timestamp and assigned windows + // explicit, in the value part. + .apply(new ReifyTimestampsAndWindows<K, V>()) + + // Group by just the key. + // Combiner lifting will not happen regardless of the disallowCombinerLifting value. + // There will be no combiners right after the GroupByKeyOnly because of the two ParDos + // introduced in here. + .apply(new GroupByKeyOnly<K, WindowedValue<V>>()) + + // Sort each key's values by timestamp. GroupAlsoByWindow requires + // its input to be sorted by timestamp. + .apply(new DirectPipelineRunner.SortValuesByTimestamp<K, V>()) + + // Group each key's values by window, merging windows as needed. + .apply(new DirectPipelineRunner.GroupAlsoByWindow<K, V>(windowingStrategy)) + + // And update the windowing strategy as appropriate. + .setWindowingStrategyInternal( + originalTransform.updateWindowingStrategy(windowingStrategy)); + } + } + /** * Apply the override for AvroIO.Write.Bound if the user requested sharding controls * greater than one. @@ -1117,6 +1171,128 @@ public class DirectPipelineRunner ///////////////////////////////////////////////////////////////////////////// + /** + * Helper transform that sorts the values associated with each key + * by timestamp. + */ + private static class SortValuesByTimestamp<K, V> + extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, + PCollection<KV<K, Iterable<WindowedValue<V>>>>> { + @Override + public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply( + PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { + return input.apply(ParDo.of( + new DoFn<KV<K, Iterable<WindowedValue<V>>>, + KV<K, Iterable<WindowedValue<V>>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); + K key = kvs.getKey(); + Iterable<WindowedValue<V>> unsortedValues = kvs.getValue(); + List<WindowedValue<V>> sortedValues = new ArrayList<>(); + for (WindowedValue<V> value : unsortedValues) { + sortedValues.add(value); + } + Collections.sort(sortedValues, + new Comparator<WindowedValue<V>>() { + @Override + public int compare(WindowedValue<V> e1, WindowedValue<V> e2) { + return e1.getTimestamp().compareTo(e2.getTimestamp()); + } + }); + c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues)); + }})) + .setCoder(input.getCoder()); + } + } + + /** + * Helper transform that takes a collection of timestamp-ordered + * values associated with each key, groups the values by window, + * combines windows as needed, and for each window in each key, + * outputs a collection of key/value-list pairs implicitly assigned + * to the window and with the timestamp derived from that window. + */ + private static class GroupAlsoByWindow<K, V> + extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, + PCollection<KV<K, Iterable<V>>>> { + private final WindowingStrategy<?, ?> windowingStrategy; + + public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) { + this.windowingStrategy = windowingStrategy; + } + + @Override + @SuppressWarnings("unchecked") + public PCollection<KV<K, Iterable<V>>> apply( + PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { + @SuppressWarnings("unchecked") + KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = + (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder(); + + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<Iterable<WindowedValue<V>>> inputValueCoder = + inputKvCoder.getValueCoder(); + + IterableCoder<WindowedValue<V>> inputIterableValueCoder = + (IterableCoder<WindowedValue<V>>) inputValueCoder; + Coder<WindowedValue<V>> inputIterableElementCoder = + inputIterableValueCoder.getElemCoder(); + WindowedValueCoder<V> inputIterableWindowedValueCoder = + (WindowedValueCoder<V>) inputIterableElementCoder; + + Coder<V> inputIterableElementValueCoder = + inputIterableWindowedValueCoder.getValueCoder(); + Coder<Iterable<V>> outputValueCoder = + IterableCoder.of(inputIterableElementValueCoder); + Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); + + return input + .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder))) + .setCoder(outputKvCoder); + } + + private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> + groupAlsoByWindowsFn( + WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) { + return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( + strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); + } + } + + /** + * The key by which GBK groups inputs - elements are grouped by the encoded form of the key, + * but the original key may be accessed as well. + */ + private static class GroupingKey<K> { + private K key; + private byte[] encodedKey; + + public GroupingKey(K key, byte[] encodedKey) { + this.key = key; + this.encodedKey = encodedKey; + } + + public K getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (o instanceof GroupingKey) { + GroupingKey<?> that = (GroupingKey<?>) o; + return Arrays.equals(this.encodedKey, that.encodedKey); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Arrays.hashCode(encodedKey); + } + } + private final DirectPipelineOptions options; private boolean testSerializability; private boolean testEncodability; @@ -1153,4 +1329,76 @@ public class DirectPipelineRunner public String toString() { return "DirectPipelineRunner#" + hashCode(); } + + public static <K, V> void evaluateGroupByKeyOnly( + GroupByKeyOnly<K, V> transform, + EvaluationContext context) { + PCollection<KV<K, V>> input = context.getInput(transform); + + List<ValueWithMetadata<KV<K, V>>> inputElems = + context.getPCollectionValuesWithMetadata(input); + + Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder()); + + Map<DirectPipelineRunner.GroupingKey<K>, List<V>> groupingMap = new HashMap<>(); + + for (ValueWithMetadata<KV<K, V>> elem : inputElems) { + K key = elem.getValue().getKey(); + V value = elem.getValue().getValue(); + byte[] encodedKey; + try { + encodedKey = encodeToByteArray(keyCoder, key); + } catch (CoderException exn) { + // TODO: Put in better element printing: + // truncate if too long. + throw new IllegalArgumentException( + "unable to encode key " + key + " of input to " + transform + + " using " + keyCoder, + exn); + } + DirectPipelineRunner.GroupingKey<K> groupingKey = + new GroupingKey<>(key, encodedKey); + List<V> values = groupingMap.get(groupingKey); + if (values == null) { + values = new ArrayList<V>(); + groupingMap.put(groupingKey, values); + } + values.add(value); + } + + List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems = + new ArrayList<>(); + for (Map.Entry<DirectPipelineRunner.GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) { + DirectPipelineRunner.GroupingKey<K> groupingKey = entry.getKey(); + K key = groupingKey.getKey(); + List<V> values = entry.getValue(); + values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); + outputElems.add(ValueWithMetadata + .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values))) + .withKey(key)); + } + + context.setPCollectionValuesWithMetadata(context.getOutput(transform), + outputElems); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public + static <K, V> void registerGroupByKeyOnly() { + registerDefaultTransformEvaluator( + GroupByKeyOnly.class, + new TransformEvaluator<GroupByKeyOnly>() { + @Override + public void evaluate( + GroupByKeyOnly transform, + EvaluationContext context) { + evaluateGroupByKeyOnly(transform, context); + } + }); + } + + static { + DirectPipelineRunner.registerGroupByKeyOnly(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java index 3ec4af1..b59ec56 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java @@ -27,7 +27,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Build import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; @@ -35,6 +34,7 @@ import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn; import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder; import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; +import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.util.SystemReduceFn; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java index 8fde3e0..490269b 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java @@ -16,40 +16,20 @@ package com.google.cloud.dataflow.sdk.transforms; -import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray; - import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException; -import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.IterableCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn; -import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn; -import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn; -import com.google.cloud.dataflow.sdk.util.SystemReduceFn; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; -import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>}, * groups the values by key and windows, and returns a @@ -234,34 +214,12 @@ public class GroupByKey<K, V> @Override public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - // This operation groups by the combination of key and window, + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the // window function associated with the input PCollection. - WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - - // By default, implement GroupByKey[AndWindow] via a series of lower-level - // operations. - return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows<K, V>()) - - // Group by just the key. - // Combiner lifting will not happen regardless of the disallowCombinerLifting value. - // There will be no combiners right after the GroupByKeyOnly because of the two ParDos - // introduced in here. - .apply(new GroupByKeyOnly<K, WindowedValue<V>>()) - - // Sort each key's values by timestamp. GroupAlsoByWindow requires - // its input to be sorted by timestamp. - .apply(new SortValuesByTimestamp<K, V>()) - - // Group each key's values by window, merging windows as needed. - .apply(new GroupAlsoByWindow<K, V>(windowingStrategy)) - - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy)); + return PCollection.createPrimitiveOutputInternal(input.getPipeline(), + updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded()); } @Override @@ -289,7 +247,7 @@ public class GroupByKey<K, V> * transform, which is also used as the {@code Coder} of the keys of * the output of this transform. */ - static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) { + public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) { return getInputKvCoder(inputCoder).getKeyCoder(); } @@ -311,265 +269,7 @@ public class GroupByKey<K, V> /** * Returns the {@code Coder} of the output of this transform. */ - static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) { + public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) { return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder)); } - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Helper transform that makes timestamps and window assignments - * explicit in the value part of each key/value pair. - */ - public static class ReifyTimestampsAndWindows<K, V> - extends PTransform<PCollection<KV<K, V>>, - PCollection<KV<K, WindowedValue<V>>>> { - @Override - public PCollection<KV<K, WindowedValue<V>>> apply( - PCollection<KV<K, V>> input) { - @SuppressWarnings("unchecked") - KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder(); - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<V> inputValueCoder = inputKvCoder.getValueCoder(); - Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of( - inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); - Coder<KV<K, WindowedValue<V>>> outputKvCoder = - KvCoder.of(keyCoder, outputValueCoder); - return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>())) - .setCoder(outputKvCoder); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Helper transform that sorts the values associated with each key - * by timestamp. - */ - public static class SortValuesByTimestamp<K, V> - extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, - PCollection<KV<K, Iterable<WindowedValue<V>>>>> { - @Override - public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply( - PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { - return input.apply(ParDo.of( - new DoFn<KV<K, Iterable<WindowedValue<V>>>, - KV<K, Iterable<WindowedValue<V>>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); - K key = kvs.getKey(); - Iterable<WindowedValue<V>> unsortedValues = kvs.getValue(); - List<WindowedValue<V>> sortedValues = new ArrayList<>(); - for (WindowedValue<V> value : unsortedValues) { - sortedValues.add(value); - } - Collections.sort(sortedValues, - new Comparator<WindowedValue<V>>() { - @Override - public int compare(WindowedValue<V> e1, WindowedValue<V> e2) { - return e1.getTimestamp().compareTo(e2.getTimestamp()); - } - }); - c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues)); - }})) - .setCoder(input.getCoder()); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Helper transform that takes a collection of timestamp-ordered - * values associated with each key, groups the values by window, - * combines windows as needed, and for each window in each key, - * outputs a collection of key/value-list pairs implicitly assigned - * to the window and with the timestamp derived from that window. - */ - public static class GroupAlsoByWindow<K, V> - extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>, - PCollection<KV<K, Iterable<V>>>> { - private final WindowingStrategy<?, ?> windowingStrategy; - - public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; - } - - @Override - @SuppressWarnings("unchecked") - public PCollection<KV<K, Iterable<V>>> apply( - PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { - @SuppressWarnings("unchecked") - KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = - (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder(); - - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<Iterable<WindowedValue<V>>> inputValueCoder = - inputKvCoder.getValueCoder(); - - IterableCoder<WindowedValue<V>> inputIterableValueCoder = - (IterableCoder<WindowedValue<V>>) inputValueCoder; - Coder<WindowedValue<V>> inputIterableElementCoder = - inputIterableValueCoder.getElemCoder(); - WindowedValueCoder<V> inputIterableWindowedValueCoder = - (WindowedValueCoder<V>) inputIterableElementCoder; - - Coder<V> inputIterableElementValueCoder = - inputIterableWindowedValueCoder.getValueCoder(); - Coder<Iterable<V>> outputValueCoder = - IterableCoder.of(inputIterableElementValueCoder); - Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); - - return input - .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder))) - .setCoder(outputKvCoder); - } - - private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> - groupAlsoByWindowsFn( - WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) { - return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( - strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - /** - * Primitive helper transform that groups by key only, ignoring any - * window assignments. - */ - public static class GroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, V>>, - PCollection<KV<K, Iterable<V>>>> { - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - } - - /** - * Returns the {@code Coder} of the input to this transform, which - * should be a {@code KvCoder}. - */ - @SuppressWarnings("unchecked") - KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) { - if (!(inputCoder instanceof KvCoder)) { - throw new IllegalStateException( - "GroupByKey requires its input to use KvCoder"); - } - return (KvCoder<K, V>) inputCoder; - } - - @Override - protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) { - return GroupByKey.getOutputKvCoder(input.getCoder()); - } - } - - - ///////////////////////////////////////////////////////////////////////////// - - static { - registerWithDirectPipelineRunner(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - private static <K, V> void registerWithDirectPipelineRunner() { - DirectPipelineRunner.registerDefaultTransformEvaluator( - GroupByKeyOnly.class, - new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() { - @Override - public void evaluate( - GroupByKeyOnly transform, - DirectPipelineRunner.EvaluationContext context) { - evaluateHelper(transform, context); - } - }); - } - - private static <K, V> void evaluateHelper( - GroupByKeyOnly<K, V> transform, - DirectPipelineRunner.EvaluationContext context) { - PCollection<KV<K, V>> input = context.getInput(transform); - - List<ValueWithMetadata<KV<K, V>>> inputElems = - context.getPCollectionValuesWithMetadata(input); - - Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder()); - - Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>(); - - for (ValueWithMetadata<KV<K, V>> elem : inputElems) { - K key = elem.getValue().getKey(); - V value = elem.getValue().getValue(); - byte[] encodedKey; - try { - encodedKey = encodeToByteArray(keyCoder, key); - } catch (CoderException exn) { - // TODO: Put in better element printing: - // truncate if too long. - throw new IllegalArgumentException( - "unable to encode key " + key + " of input to " + transform + - " using " + keyCoder, - exn); - } - GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey); - List<V> values = groupingMap.get(groupingKey); - if (values == null) { - values = new ArrayList<V>(); - groupingMap.put(groupingKey, values); - } - values.add(value); - } - - List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems = - new ArrayList<>(); - for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) { - GroupingKey<K> groupingKey = entry.getKey(); - K key = groupingKey.getKey(); - List<V> values = entry.getValue(); - values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */); - outputElems.add(ValueWithMetadata - .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values))) - .withKey(key)); - } - - context.setPCollectionValuesWithMetadata(context.getOutput(transform), - outputElems); - } - - private static class GroupingKey<K> { - private K key; - private byte[] encodedKey; - - public GroupingKey(K key, byte[] encodedKey) { - this.key = key; - this.encodedKey = encodedKey; - } - - public K getKey() { - return key; - } - - @Override - public boolean equals(Object o) { - if (o instanceof GroupingKey) { - GroupingKey<?> that = (GroupingKey<?>) o; - return Arrays.equals(this.encodedKey, that.encodedKey); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Arrays.hashCode(encodedKey); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java new file mode 100644 index 0000000..8db87d2 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * Runner-specific primitive that groups by key only, ignoring any window assignments. + */ +public class GroupByKeyOnly<K, V> + extends PTransform<PCollection<KV<K, V>>, + PCollection<KV<K, Iterable<V>>>> { + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + } + + @Override + public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) { + return GroupByKey.getOutputKvCoder(input.getCoder()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2e2d1f6..8a4d7ac 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -18,7 +18,6 @@ package com.google.cloud.dataflow.sdk.util; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java new file mode 100644 index 0000000..1a6cf9a --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.util; + +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * Helper transform that makes timestamps and window assignments + * explicit in the value part of each key/value pair. + */ +public class ReifyTimestampsAndWindows<K, V> + extends PTransform<PCollection<KV<K, V>>, + PCollection<KV<K, WindowedValue<V>>>> { + @Override + public PCollection<KV<K, WindowedValue<V>>> apply( + PCollection<KV<K, V>> input) { + @SuppressWarnings("unchecked") + KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder(); + Coder<K> keyCoder = inputKvCoder.getKeyCoder(); + Coder<V> inputValueCoder = inputKvCoder.getValueCoder(); + Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of( + inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); + Coder<KV<K, WindowedValue<V>>> outputKvCoder = + KvCoder.of(keyCoder, outputValueCoder); + return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>())) + .setCoder(outputKvCoder); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 4ced82f..a683b31 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -26,9 +26,9 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.util.KeyedWorkItem; import com.google.cloud.dataflow.sdk.util.KeyedWorkItems; +import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -60,7 +60,7 @@ public class GroupByKeyEvaluatorFactoryTest { PCollection<KV<String, Integer>> values = p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo)); PCollection<KV<String, WindowedValue<Integer>>> kvs = - values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, Integer>()); + values.apply(new ReifyTimestampsAndWindows<String, Integer>()); PCollection<KeyedWorkItem<String, Integer>> groupedKvs = kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
