Port Write to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/86291de3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/86291de3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/86291de3 Branch: refs/heads/master Commit: 86291de39772765f4d6d404ac8a8430d8ad8a15f Parents: 2c6aaf7 Author: Kenneth Knowles <[email protected]> Authored: Fri Aug 5 11:49:37 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Aug 8 11:35:17 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/beam/sdk/io/Write.java | 26 ++++++++++---------- .../java/org/apache/beam/sdk/io/WriteTest.java | 22 ++++++++++------- 2 files changed, 26 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index 3e997b0..a846b7c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation; import org.apache.beam.sdk.io.Sink.Writer; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -156,7 +156,7 @@ public class Write { * Writes all the elements in a bundle using a {@link Writer} produced by the * {@link WriteOperation} associated with the {@link Sink}. */ - private class WriteBundles<WriteT> extends OldDoFn<T, WriteT> { + private class WriteBundles<WriteT> extends DoFn<T, WriteT> { // Writer that will write the records in this bundle. Lazily // initialized in processElement. private Writer<T, WriteT> writer = null; @@ -166,7 +166,7 @@ public class Write { this.writeOperationView = writeOperationView; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { // Lazily initialize the Writer if (writer == null) { @@ -182,7 +182,7 @@ public class Write { // Discard write result and close the write. try { writer.close(); - // The writer does not need to be reset, as this OldDoFn cannot be reused. + // The writer does not need to be reset, as this DoFn cannot be reused. } catch (Exception closeException) { if (closeException instanceof InterruptedException) { // Do not silently ignore interrupted state. @@ -195,7 +195,7 @@ public class Write { } } - @Override + @FinishBundle public void finishBundle(Context c) throws Exception { if (writer != null) { WriteT result = writer.close(); @@ -217,14 +217,14 @@ public class Write { * * @see WriteBundles */ - private class WriteShardedBundles<WriteT> extends OldDoFn<KV<Integer, Iterable<T>>, WriteT> { + private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> { private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView; WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) { this.writeOperationView = writeOperationView; } - @Override + @ProcessElement public void processElement(ProcessContext c) throws Exception { // In a sharded write, single input element represents one shard. We can open and close // the writer in each call to processElement. @@ -296,8 +296,8 @@ public class Write { * <p>This singleton collection containing the WriteOperation is then used as a side input to a * ParDo over the PCollection of elements to write. In this bundle-writing phase, * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}. - * {@link Writer#open} and {@link Writer#close} are called in {@link OldDoFn#startBundle} and - * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} method is called for + * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and + * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for * every element in the bundle. The output of this ParDo is a PCollection of * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for * each bundle. @@ -334,8 +334,8 @@ public class Write { // Initialize the resource in a do-once ParDo on the WriteOperation. operationCollection = operationCollection .apply("Initialize", ParDo.of( - new OldDoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { - @Override + new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { WriteOperation<T, WriteT> writeOperation = c.element(); LOG.info("Initializing write operation {}", writeOperation); @@ -388,8 +388,8 @@ public class Write { // ParDo. There is a dependency between this ParDo and the parallel write (the writer results // collection as a side input), so it will happen after the parallel write. operationCollection - .apply("Finalize", ParDo.of(new OldDoFn<WriteOperation<T, WriteT>, Integer>() { - @Override + .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { + @ProcessElement public void processElement(ProcessContext c) throws Exception { WriteOperation<T, WriteT> writeOperation = c.element(); LOG.info("Finalizing write operation {}.", writeOperation); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 4b6e749..705b77c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; +import static org.apache.beam.sdk.values.KV.of; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -29,6 +30,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static java.util.concurrent.ThreadLocalRandom.current; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -41,9 +44,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOption import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -73,7 +76,6 @@ import java.util.List; import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; /** @@ -102,16 +104,18 @@ public class WriteTest { this.window = window; } - private static class AddArbitraryKey<T> extends OldDoFn<T, KV<Integer, T>> { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element())); + private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> { + + @ProcessElement + public void processElement(ProcessContext c) { + c.output(of(current().nextInt(), c.element())); } } - private static class RemoveArbitraryKey<T> extends OldDoFn<KV<Integer, Iterable<T>>, T> { - @Override - public void processElement(ProcessContext c) throws Exception { + private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> { + + @ProcessElement + public void processElement(ProcessContext c) { for (T s : c.element().getValue()) { c.output(s); }
