Repository: incubator-beam Updated Branches: refs/heads/master 267136fb6 -> d02d2de09
[flink] improve example section in README - updates the README - repairs broken exec configuration Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fe38770 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fe38770 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fe38770 Branch: refs/heads/master Commit: 2fe387707d1e115b578f5ee643bb99c0e4667ee0 Parents: cf14644 Author: Maximilian Michels <[email protected]> Authored: Wed Jul 20 16:06:06 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Jul 25 17:30:19 2016 +0200 ---------------------------------------------------------------------- runners/flink/README.md | 25 ++++++++++++-------- runners/flink/examples/pom.xml | 11 ++++----- .../beam/runners/flink/examples/WordCount.java | 4 ++-- 3 files changed, 21 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/README.md ---------------------------------------------------------------------- diff --git a/runners/flink/README.md b/runners/flink/README.md index 3348119..aeb1692 100644 --- a/runners/flink/README.md +++ b/runners/flink/README.md @@ -109,35 +109,40 @@ Next, let's run the classic WordCount example. It's semantically identically to the example provided with Apache Beam. Only this time, we chose the `FlinkRunner` to execute the WordCount on top of Flink. -Here's an excerpt from the WordCount class file: +Here's an excerpt from the [WordCount class file](examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java): ```java -Options options = PipelineOptionsFactory.fromArgs(args).as(Options.class); +Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + // yes, we want to run WordCount with Flink options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); -p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) - .apply(new CountWords()) - .apply(TextIO.Write.named("WriteCounts") - .to(options.getOutput()) - .withNumShards(options.getNumShards())); +p.apply("ReadLines", TextIO.Read.from(options.getInput())) + .apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())) + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); ``` To execute the example, let's first get some sample data: - curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > examples/kinglear.txt + cd runners/flink/examples + curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > kinglear.txt Then let's run the included WordCount locally on your machine: - cd examples - mvn exec:exec -Dinput=kinglear.txt -Doutput=wordcounts.txt + cd runners/flink/examples + mvn exec:java -Dexec.mainClass=org.apache.beam.runners.flink.examples.WordCount \ + -Dinput=kinglear.txt -Doutput=wordcounts.txt Congratulations, you have run your first Apache Beam program on top of Apache Flink! +Note, that you will find a number of `wordcounts*` output files because Flink parallelizes the +WordCount computation. You may pass an additional `-Dparallelism=1` to disable parallelization and +get a single `wordcounts.txt` file. # Running Beam programs on a Flink cluster http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml index b0ee2ed..355a6be 100644 --- a/runners/flink/examples/pom.xml +++ b/runners/flink/examples/pom.xml @@ -33,11 +33,10 @@ <packaging>jar</packaging> <properties> - <!-- Default parameters for mvn exec:exec --> - <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz> + <!-- Default parameters for mvn exec:java --> <input>kinglear.txt</input> <output>wordcounts.txt</output> - <parallelism>1</parallelism> + <parallelism>-1</parallelism> </properties> <profiles> @@ -131,12 +130,10 @@ <configuration> <executable>java</executable> <arguments> - <argument>-classpath</argument> - <classpath /> - <argument>${clazz}</argument> + <argument>--runner=org.apache.beam.runners.flink.FlinkRunner</argument> + <argument>--parallelism=${parallelism}</argument> <argument>--input=${input}</argument> <argument>--output=${output}</argument> - <argument>--parallelism=${parallelism}</argument> </arguments> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2fe38770/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index 2d95c97..c54229d 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -21,10 +21,10 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -92,11 +92,11 @@ public class WordCount { */ public interface Options extends PipelineOptions, FlinkPipelineOptions { @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInput(); void setInput(String value); @Description("Path of the file to write to") + @Validation.Required String getOutput(); void setOutput(String value); }
