Update StarterPipeline Convert StarterPipeline ParDo to MapElements.
Use the new DoFn for non-outputting transforms. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c80554b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c80554b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c80554b8 Branch: refs/heads/gearpump-runner Commit: c80554b83426a585c762143e0ad533a73c2c3f0f Parents: e53d6d4 Author: Scott Wegner <[email protected]> Authored: Mon Nov 21 16:33:07 2016 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Nov 22 10:09:12 2016 -0800 ---------------------------------------------------------------------- .../src/main/java/StarterPipeline.java | 18 ++++++++++-------- .../src/main/java/it/pkg/StarterPipeline.java | 18 ++++++++++-------- 2 files changed, 20 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c80554b8/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java index 0b21aa6..d6afdec 100644 --- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java @@ -20,13 +20,15 @@ package ${package}; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A starter example for writing Google Cloud Dataflow programs. + * A starter example for writing Beam programs. * * <p>The example takes two strings, converts them to their upper-case * representation and logs them. @@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory; * Platform, you should specify the following command-line options: * --project=<YOUR_PROJECT_ID> * --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> - * --runner=BlockingDataflowRunner + * --runner=DataflowRunner */ public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); @@ -49,14 +51,14 @@ public class StarterPipeline { PipelineOptionsFactory.fromArgs(args).withValidation().create()); p.apply(Create.of("Hello", "World")) - .apply(ParDo.of(new OldDoFn<String, String>() { + .apply(MapElements.via(new SimpleFunction<String, String>() { @Override - public void processElement(ProcessContext c) { - c.output(c.element().toUpperCase()); + public String apply(String input) { + return input.toUpperCase(); } })) - .apply(ParDo.of(new OldDoFn<String, Void>() { - @Override + .apply(ParDo.of(new DoFn<String, Void>() { + @ProcessElement public void processElement(ProcessContext c) { LOG.info(c.element()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c80554b8/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java index b332442..4ae92e8 100644 --- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java +++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java @@ -20,13 +20,15 @@ package it.pkg; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A starter example for writing Google Cloud Dataflow programs. + * A starter example for writing Beam programs. * * <p>The example takes two strings, converts them to their upper-case * representation and logs them. @@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory; * Platform, you should specify the following command-line options: * --project=<YOUR_PROJECT_ID> * --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE> - * --runner=BlockingDataflowRunner + * --runner=DataflowRunner */ public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); @@ -49,14 +51,14 @@ public class StarterPipeline { PipelineOptionsFactory.fromArgs(args).withValidation().create()); p.apply(Create.of("Hello", "World")) - .apply(ParDo.of(new OldDoFn<String, String>() { + .apply(MapElements.via(new SimpleFunction<String, String>() { @Override - public void processElement(ProcessContext c) { - c.output(c.element().toUpperCase()); + public String apply(String input) { + return input.toUpperCase(); } })) - .apply(ParDo.of(new OldDoFn<String, Void>() { - @Override + .apply(ParDo.of(new DoFn<String, Void>() { + @ProcessElement public void processElement(ProcessContext c) { LOG.info(c.element()); }
