Port easy parts of DataflowRunner to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fcdd15b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fcdd15b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fcdd15b8 Branch: refs/heads/gearpump-runner Commit: fcdd15b81b93f87de0aa02bfb3b09740bc259c4c Parents: a1d601a Author: Kenneth Knowles <[email protected]> Authored: Mon Aug 8 20:35:59 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Aug 9 12:41:52 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 92 ++++++++++---------- 1 file changed, 45 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fcdd15b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fadd9c7..4d34ec4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -761,31 +761,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> { /** - * A {@link OldDoFn} that for each element outputs a {@code KV} structure suitable for + * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for * grouping by the hash of the window's byte representation and sorting the grouped values * using the window's byte representation. */ @SystemDoFnInternal private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow> - extends OldDoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements - OldDoFn.RequiresWindowAccess { + extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> { private final IsmRecordCoder<?> ismCoderForHash; private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) { this.ismCoderForHash = ismCoderForHash; } - @Override - public void processElement(ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws Exception { @SuppressWarnings("unchecked") - W window = (W) c.window(); + W window = (W) untypedWindow; c.output( KV.of(ismCoderForHash.hash(ImmutableList.of(window)), KV.of(window, WindowedValue.of( c.element(), c.timestamp(), - c.window(), + window, c.pane())))); } } @@ -828,14 +827,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { extends PTransform<PCollection<T>, PCollectionView<T>> { /** - * A {@link OldDoFn} that outputs {@link IsmRecord}s. These records are structured as follows: + * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows: * <ul> * <li>Key 1: Window * <li>Value: Windowed value * </ul> */ static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmRecord<WindowedValue<T>>> { private final Coder<W> windowCoder; @@ -843,7 +842,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.windowCoder = windowCoder; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { Optional<Object> previousWindowStructuralValue = Optional.absent(); T previousValue = null; @@ -902,7 +901,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { applyForSingleton( DataflowRunner runner, PCollection<T> input, - OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmRecord<WindowedValue<FinalT>>> doFn, boolean hasDefault, FinalT defaultValue, @@ -998,7 +997,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { static class BatchViewAsList<T> extends PTransform<PCollection<T>, PCollectionView<List<T>>> { /** - * A {@link OldDoFn} which creates {@link IsmRecord}s assuming that each element is within the + * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the * global window. Each {@link IsmRecord} has * <ul> * <li>Key 1: Global window</li> @@ -1008,15 +1007,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { */ @SystemDoFnInternal static class ToIsmRecordForGlobalWindowDoFn<T> - extends OldDoFn<T, IsmRecord<WindowedValue<T>>> { + extends DoFn<T, IsmRecord<WindowedValue<T>>> { long indexInBundle; - @Override + @StartBundle public void startBundle(Context c) throws Exception { indexInBundle = 0; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { c.output(IsmRecord.of( ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle), @@ -1030,7 +1029,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows + * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows * to locate the window boundaries. The {@link IsmRecord} has: * <ul> * <li>Key 1: Window</li> @@ -1040,7 +1039,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { */ @SystemDoFnInternal static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmRecord<WindowedValue<T>>> { private final Coder<W> windowCoder; @@ -1048,7 +1047,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.windowCoder = windowCoder; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { long elementsInWindow = 0; Optional<Object> previousWindowStructuralValue = Optional.absent(); @@ -1174,7 +1173,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { /** - * A {@link OldDoFn} which groups elements by window boundaries. For each group, + * A {@link DoFn} which groups elements by window boundaries. For each group, * the group of elements is transformed into a {@link TransformedMap}. * The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>} * and contains a function {@code WindowedValue<V> -> V}. @@ -1188,7 +1187,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * </ul> */ static class ToMapDoFn<K, V, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, IsmRecord<WindowedValue<TransformedMap<K, WindowedValue<V>, V>>>> { @@ -1198,7 +1197,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.windowCoder = windowCoder; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { Optional<Object> previousWindowStructuralValue = Optional.absent(); @@ -1358,18 +1357,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { @SystemDoFnInternal private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W> - extends OldDoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> - implements OldDoFn.RequiresWindowAccess { + extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> { private final IsmRecordCoder<?> coder; private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) { this.coder = coder; } - @Override - public void processElement(ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws Exception { @SuppressWarnings("unchecked") - W window = (W) c.window(); + W window = (W) untypedWindow; c.output( KV.of(coder.hash(ImmutableList.of(c.element().getKey())), @@ -1377,7 +1375,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { WindowedValue.of( c.element().getValue(), c.timestamp(), - (BoundedWindow) window, + untypedWindow, c.pane())))); } } @@ -1412,7 +1410,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows + * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows * and keys to locate window and key boundaries. The main output {@link IsmRecord}s have: * <ul> * <li>Key 1: Window</li> @@ -1424,11 +1422,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet} * and the unique key count per window to {@code outputForSize}. * - * <p>Finally, if this OldDoFn has been requested to perform unique key checking, it will + * <p>Finally, if this {@link DoFn} has been requested to perform unique key checking, it will * throw an {@link IllegalStateException} if more than one key per window is found. */ static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>, + extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>, IsmRecord<WindowedValue<V>>> { private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize; @@ -1452,7 +1450,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.uniqueKeysExpected = uniqueKeysExpected; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { long currentKeyIndex = 0; // We use one based indexing while counting @@ -1557,7 +1555,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window of: + * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of: * <ul> * <li>Key 1: META key</li> * <li>Key 2: window</li> @@ -1565,17 +1563,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * <li>Value: sum of values for window</li> * </ul> * - * <p>This {@link OldDoFn} is meant to be used to compute the number of unique keys + * <p>This {@link DoFn} is meant to be used to compute the number of unique keys * per window for map and multimap side inputs. */ static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> { + extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> { private final Coder<W> windowCoder; ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) { this.windowCoder = windowCoder; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { Iterator<KV<W, Long>> iterator = c.element().getValue().iterator(); KV<W, Long> currentValue = iterator.next(); @@ -1606,7 +1604,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window and key pair of: + * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of: * <ul> * <li>Key 1: META key</li> * <li>Key 2: window</li> @@ -1614,11 +1612,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * <li>Value: key</li> * </ul> * - * <p>This {@link OldDoFn} is meant to be used to output index to key records + * <p>This {@link DoFn} is meant to be used to output index to key records * per window for map and multimap side inputs. */ static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> { + extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> { private final Coder<K> keyCoder; private final Coder<W> windowCoder; @@ -1627,7 +1625,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.windowCoder = windowCoder; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { Iterator<KV<W, K>> iterator = c.element().getValue().iterator(); KV<W, K> currentValue = iterator.next(); @@ -1658,7 +1656,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A {@link OldDoFn} which partitions sets of elements by window boundaries. Within each + * A {@link DoFn} which partitions sets of elements by window boundaries. Within each * partition, the set of elements is transformed into a {@link TransformedMap}. * The transformed {@code Map<K, Iterable<V>>} is backed by a * {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function @@ -1673,7 +1671,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * </ul> */ static class ToMultimapDoFn<K, V, W extends BoundedWindow> - extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, + extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>, IsmRecord<WindowedValue<TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>>>> { @@ -1683,7 +1681,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { this.windowCoder = windowCoder; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { Optional<Object> previousWindowStructuralValue = Optional.absent(); @@ -2335,8 +2333,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // WindmillSink. .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of()) .apply("StripIds", ParDo.of( - new OldDoFn<KV<Integer, ValueWithRecordId<T>>, T>() { - @Override + new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getValue().getValue()); } @@ -2372,7 +2370,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * A specialized {@link OldDoFn} for writing the contents of a {@link PCollection} + * A specialized {@link DoFn} for writing the contents of a {@link PCollection} * to a streaming {@link PCollectionView} backend implementation. */ private static class StreamingPCollectionViewWriterFn<T> @@ -2553,8 +2551,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } - private static class WrapAsList<T> extends OldDoFn<T, List<T>> { - @Override + private static class WrapAsList<T> extends DoFn<T, List<T>> { + @ProcessElement public void processElement(ProcessContext c) { c.output(Arrays.asList(c.element())); }
