Repository: incubator-beam Updated Branches: refs/heads/master e35f571b0 -> 084a5e8ae
Hardcode MinimalWordCount to the DirectRunner This makes it easy to immediately run, and removes various non-portable instructions and others that aren't the easiest for a "Getting Started" scenario. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c64cf367 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c64cf367 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c64cf367 Branch: refs/heads/master Commit: c64cf367299b6fdbe25c62eec9840b02fbc9d518 Parents: e35f571 Author: Kenneth Knowles <[email protected]> Authored: Thu Nov 3 14:18:43 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Nov 8 13:51:23 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/examples/MinimalWordCount.java | 50 +++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c64cf367/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 6fc873e..6085539 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -37,46 +37,33 @@ import org.apache.beam.sdk.values.KV; * argument processing, and focus on construction of the pipeline, which chains together the * application of core transforms. * - * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally - * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional + * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally the + * {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional * concepts. * * <p>Concepts: + * * <pre> * 1. Reading data from text files * 2. Specifying 'inline' transforms - * 3. Counting a PCollection - * 4. Writing data to Cloud Storage as text files + * 3. Counting items in a PCollection + * 4. Writing data to text files * </pre> * - * <p>To execute this pipeline, first edit the code to set your project ID, the temp - * location, and the output location. The specified GCS bucket(s) must already exist. - * - * <p>Then, run the pipeline as described in the README. It will be deployed and run with the - * selected runner. No args are required to run the pipeline. You can see the results in your - * output bucket in the GCS browser. + * <p>No arguments are required to run this pipeline. It will be executed with the DirectRunner. You + * can see the results in the output files in your current working directory, with names like + * "wordcounts-00001-of-00005. When running on a distributed service, you would use an appropriate + * file service. */ public class MinimalWordCount { public static void main(String[] args) { // Create a PipelineOptions object. This object lets us set various execution - // options for our pipeline, such as the associated Cloud Platform project and the location - // in Google Cloud Storage to stage files. + // options for our pipeline, such as the runner you wish to use. This example + // will run with the DirectRunner by default, based on the class path configured + // in its dependencies. PipelineOptions options = PipelineOptionsFactory.create(); - // In order to run your pipeline, you need to make following runner specific changes: - // - // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner. - // CHANGE 2/3: Specify runner-required options. - // For DataflowRunner, set project and temp location as follows: - // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - // dataflowOptions.setRunner(DataflowRunner.class); - // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); - // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); - // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} - // for more details. - // options.setRunner(FlinkRunner.class); - // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(options); @@ -85,7 +72,10 @@ public class MinimalWordCount { // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set // of input text files. TextIO.Read returns a PCollection where each element is one line from // the input text (a set of Shakespeare's texts). + + // This example reads a public data set consisting of the complete works of Shakespeare. p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) + // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection<String>, where each element is an individual word in @@ -100,10 +90,12 @@ public class MinimalWordCount { } } })) + // Concept #3: Apply the Count transform to our PCollection of individual words. The Count // transform returns a new PCollection of key/value pairs, where each key represents a unique // word in the text. The associated value is the occurrence count for that word. .apply(Count.<String>perElement()) + // Apply a MapElements transform that formats our PCollection of word counts into a printable // string, suitable for writing to an output file. .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { @@ -112,11 +104,13 @@ public class MinimalWordCount { return input.getKey() + ": " + input.getValue(); } })) + // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of - // formatted strings) to a series of text files in Google Cloud Storage. - // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); + // formatted strings) to a series of text files. + // + // By default, it will write to a set of files with names like wordcount-00001-of-00005 + .apply(TextIO.Write.to("wordcounts")); // Run the pipeline. p.run().waitUntilFinish();
