Repository: incubator-beam Updated Branches: refs/heads/master fcf6b1d34 -> 8daf518bc
Port easy transforms to new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47341e11 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47341e11 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47341e11 Branch: refs/heads/master Commit: 47341e113334827101ddbf775c69ae34d178cd8f Parents: 269fbf3 Author: Kenneth Knowles <[email protected]> Authored: Wed Aug 3 20:27:28 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Aug 4 14:56:42 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/transforms/Count.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/Create.java | 4 ++-- .../apache/beam/sdk/transforms/FlatMapElements.java | 4 ++-- .../org/apache/beam/sdk/transforms/Flatten.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/Keys.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/KvSwap.java | 4 ++-- .../org/apache/beam/sdk/transforms/MapElements.java | 4 ++-- .../org/apache/beam/sdk/transforms/Partition.java | 4 ++-- .../beam/sdk/transforms/RemoveDuplicates.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/Sample.java | 6 +++--- .../java/org/apache/beam/sdk/transforms/Values.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/View.java | 8 ++++---- .../org/apache/beam/sdk/transforms/WithKeys.java | 4 ++-- .../apache/beam/sdk/transforms/WithTimestamps.java | 6 +++--- .../beam/sdk/transforms/join/CoGroupByKey.java | 16 ++++++++-------- 15 files changed, 40 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index 7601ffc..ac59c76 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -107,8 +107,8 @@ public class Count { public PCollection<KV<T, Long>> apply(PCollection<T> input) { return input - .apply("Init", ParDo.of(new OldDoFn<T, KV<T, Void>>() { - @Override + .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), (Void) null)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index fb7f784..08d0a7a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -486,8 +486,8 @@ public class Create<T> { this.elementCoder = elementCoder; } - private static class ConvertTimestamps<T> extends OldDoFn<TimestampedValue<T>, T> { - @Override + private static class ConvertTimestamps<T> extends DoFn<TimestampedValue<T>, T> { + @ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index b48da38..694592e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -133,9 +133,9 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { @Override public PCollection<OutputT> apply(PCollection<InputT> input) { - return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() { + return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() { private static final long serialVersionUID = 0L; - @Override + @ProcessElement public void processElement(ProcessContext c) { for (OutputT element : fn.apply(c.element())) { c.output(element); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 53e898e..7e09d7e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -174,8 +174,8 @@ public class Flatten { Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder(); return in.apply("FlattenIterables", ParDo.of( - new OldDoFn<Iterable<T>, T>() { - @Override + new DoFn<Iterable<T>, T>() { + @ProcessElement public void processElement(ProcessContext c) { for (T i : c.element()) { c.output(i); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java index c8cbce8..5ac1866 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java @@ -58,8 +58,8 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>, @Override public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) { return - in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() { - @Override + in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getKey()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java index 430d37b..d4386d2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java @@ -62,8 +62,8 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>, @Override public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) { return - in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() { - @Override + in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() { + @ProcessElement public void processElement(ProcessContext c) { KV<K, V> e = c.element(); c.output(KV.of(e.getValue(), e.getKey())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index c83c39f..b7b9a5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -104,8 +104,8 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { @Override public PCollection<OutputT> apply(PCollection<InputT> input) { - return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() { - @Override + return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(fn.apply(c.element())); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 2ddcc29..05c9470 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> this.partitionDoFn = partitionDoFn; } - private static class PartitionDoFn<X> extends OldDoFn<X, Void> { + private static class PartitionDoFn<X> extends DoFn<X, Void> { private final int numPartitions; private final PartitionFn<? super X> partitionFn; private final TupleTagList outputTags; @@ -163,7 +163,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> return outputTags; } - @Override + @ProcessElement public void processElement(ProcessContext c) { X input = c.element(); int partition = partitionFn.partitionFor(input, numPartitions); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java index d82c457..bba4b51 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java @@ -85,8 +85,8 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>, @Override public PCollection<T> apply(PCollection<T> in) { return in - .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() { - @Override + .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(c.element(), (Void) null)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java index 724b252..12ff2b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java @@ -164,9 +164,9 @@ public class Sample { } /** - * A {@link OldDoFn} that returns up to limit elements from the side input PCollection. + * A {@link DoFn} that returns up to limit elements from the side input PCollection. */ - private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> { + private static class SampleAnyDoFn<T> extends DoFn<Void, T> { long limit; final PCollectionView<Iterable<T>> iterableView; @@ -175,7 +175,7 @@ public class Sample { this.iterableView = iterableView; } - @Override + @ProcessElement public void processElement(ProcessContext c) { for (T i : c.sideInput(iterableView)) { if (limit-- <= 0) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java index 856e32a..34342db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java @@ -58,8 +58,8 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>, @Override public PCollection<V> apply(PCollection<? extends KV<?, V>> in) { return - in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() { - @Override + in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 8a61637..7a97c13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -38,7 +38,7 @@ import java.util.Map; * * <p>When a {@link ParDo} tranform is processing a main input * element in a window {@code w} and a {@link PCollectionView} is read via - * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is + * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is * returned. * * <p>The SDK supports viewing a {@link PCollection}, per window, as a single value, @@ -118,7 +118,7 @@ import java.util.Map; * * PCollection PageVisits = urlVisits * .apply(ParDo.withSideInputs(urlToPage) - * .of(new OldDoFn<UrlVisit, PageVisit>() { + * .of(new DoFn<UrlVisit, PageVisit>() { * {@literal @}Override * void processElement(ProcessContext context) { * UrlVisit urlVisit = context.element(); @@ -154,11 +154,11 @@ public class View { * * <p>If the input {@link PCollection} is empty, * throws {@link java.util.NoSuchElementException} in the consuming - * {@link OldDoFn}. + * {@link DoFn}. * * <p>If the input {@link PCollection} contains more than one * element, throws {@link IllegalArgumentException} in the - * consuming {@link OldDoFn}. + * consuming {@link DoFn}. */ public static <T> AsSingleton<T> asSingleton() { return new AsSingleton<>(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java index 37d45aa..2a44963 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java @@ -113,8 +113,8 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>, @Override public PCollection<KV<K, V>> apply(PCollection<V> in) { PCollection<KV<K, V>> result = - in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() { - @Override + in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() { + @ProcessElement public void processElement(ProcessContext c) { c.output(KV.of(fn.apply(c.element()), c.element())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index 41b549b..7b395f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * Returns the allowed timestamp skew duration, which is the maximum * duration that timestamps can be shifted backwards from the timestamp of the input element. * - * @see OldDoFn#getAllowedTimestampSkew() + * @see DoFn#getAllowedTimestampSkew() */ public Duration getAllowedTimestampSkew() { return allowedTimestampSkew; @@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> .setTypeDescriptorInternal(input.getTypeDescriptor()); } - private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> { + private static class AddTimestampsDoFn<T> extends DoFn<T, T> { private final SerializableFunction<T, Instant> fn; private final Duration allowedTimestampSkew; @@ -114,7 +114,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> this.allowedTimestampSkew = allowedTimestampSkew; } - @Override + @ProcessElement public void processElement(ProcessContext c) { Instant timestamp = fn.apply(c.element()); checkNotNull( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47341e11/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java index 1bd9f4a..cb06f95 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java @@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder; @@ -57,7 +57,7 @@ import java.util.List; * * PCollection<T> finalResultCollection = * coGbkResultCollection.apply(ParDo.of( - * new OldDoFn<KV<K, CoGbkResult>, T>() { + * new DoFn<KV<K, CoGbkResult>, T>() { * @Override * public void processElement(ProcessContext c) { * KV<K, CoGbkResult> e = c.element(); @@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends } /** - * A OldDoFn to construct a UnionTable (i.e., a + * A DoFn to construct a UnionTable (i.e., a * {@code PCollection<KV<K, RawUnionValue>>} from a * {@code PCollection<KV<K, V>>}. */ private static class ConstructUnionTableFn<K, V> extends - OldDoFn<KV<K, V>, KV<K, RawUnionValue>> { + DoFn<KV<K, V>, KV<K, RawUnionValue>> { private final int index; @@ -180,7 +180,7 @@ public class CoGroupByKey<K> extends this.index = index; } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV<K, ?> e = c.element(); c.output(KV.of(e.getKey(), new RawUnionValue(index, e.getValue()))); @@ -188,11 +188,11 @@ public class CoGroupByKey<K> extends } /** - * A OldDoFn to construct a CoGbkResult from an input grouped union + * A DoFn to construct a CoGbkResult from an input grouped union * table. */ private static class ConstructCoGbkResultFn<K> - extends OldDoFn<KV<K, Iterable<RawUnionValue>>, + extends DoFn<KV<K, Iterable<RawUnionValue>>, KV<K, CoGbkResult>> { private final CoGbkResultSchema schema; @@ -201,7 +201,7 @@ public class CoGroupByKey<K> extends this.schema = schema; } - @Override + @ProcessElement public void processElement(ProcessContext c) { KV<K, Iterable<RawUnionValue>> e = c.element(); c.output(KV.of(e.getKey(), new CoGbkResult(schema, e.getValue())));
