This is an automated email from the ASF dual-hosted git repository. mergebot-role pushed a commit to branch mergebot in repository https://gitbox.apache.org/repos/asf/beam-site.git
commit 30d5e8ff774cbf26e9b27b05bd160e4ed95e1c4a Author: melissa <[email protected]> AuthorDate: Thu Sep 21 13:17:07 2017 -0700 Update with review feedback --- src/get-started/wordcount-example.md | 88 ++++++++++++++++++++---------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md index 859fd3c..10fc220 100644 --- a/src/get-started/wordcount-example.md +++ b/src/get-started/wordcount-example.md @@ -1,11 +1,11 @@ --- layout: default -title: "Beam WordCount Example" +title: "Beam WordCount Examples" permalink: get-started/wordcount-example/ redirect_from: /use/wordcount-example/ --- -# Apache Beam WordCount Example +# Apache Beam WordCount Examples * TOC {:toc} @@ -37,7 +37,7 @@ continue on to learn more concepts in the other examples. * **Windowed WordCount** demonstrates how you can use Beam's programming model to handle both bounded and unbounded datasets. -## MinimalWordCount +## MinimalWordCount example Minimal WordCount demonstrates a simple pipeline that can read from a text file, apply transforms to tokenize and count the words, and write the data to an @@ -147,7 +147,7 @@ To view the full code in Python, see The following sections explain these concepts in detail, using the relevant code excerpts from the Minimal WordCount pipeline. -### Creating the Pipeline +### Creating the pipeline In this example, the code first creates a `PipelineOptions` object. This object lets us set various options for our pipeline, such as the pipeline runner that @@ -193,17 +193,17 @@ Pipeline p = Pipeline.create(options); {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:examples_wordcount_minimal_create %}``` -### Applying Pipeline Transforms +### Applying pipeline transforms The Minimal WordCount pipeline contains several transforms to read data into the pipeline, manipulate or otherwise transform the data, and write out the results. -Each transform represents an operation in the pipeline. +Transforms can consist of an individual operation, or can contain multiple +nested transforms (which is a [composite transform]({{ site.baseurl }}/documentation/programming-guide#transforms-composite)). -Each transform takes some kind of input (data or otherwise), and produces some -output data. The input and output data is represented by the SDK class -`PCollection`. `PCollection` is a special class, provided by the Beam SDK, that -you can use to represent a data set of virtually any size, including unbounded -data sets. +Each transform takes some kind of input data and produces some output data. The +input and output data is often represented by the SDK class `PCollection`. +`PCollection` is a special class, provided by the Beam SDK, that you can use to +represent a data set of virtually any size, including unbounded data sets. <img src="{{ "/images/wordcount-pipeline.png" | prepend: site.baseurl }}" alt="Word Count pipeline diagram"> Figure 1: The pipeline data flow. @@ -306,11 +306,10 @@ The Minimal WordCount pipeline contains five transforms: Note that the `Write` transform produces a trivial result value of type `PDone`, which in this case is ignored. -### Running the Pipeline +### Running the pipeline Run the pipeline by calling the `run` method, which sends your pipeline to be -executed by the pipeline runner that you specified when you created your -pipeline. +executed by the pipeline runner that you specified in your `PipelineOptions`. ```java p.run().waitUntilFinish(); @@ -320,11 +319,12 @@ p.run().waitUntilFinish(); {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:examples_wordcount_minimal_run %}``` -Note that the `run` method is asynchronous. For a blocking execution, run your -pipeline appending the <span class="language-java">`waitUntilFinish`</span> -<span class="language-py">`wait_until_finish`</span> method. +Note that the `run` method is asynchronous. For a blocking execution, call the +<span class="language-java">`waitUntilFinish`</span> +<span class="language-py">`wait_until_finish`</span> method on the result object +returned by the call to `run`. -## WordCount Example +## WordCount example This WordCount example introduces a few recommended programming practices that can make your pipeline easier to read, write, and maintain. While not explicitly @@ -333,7 +333,7 @@ your pipeline, and help make your pipeline's code reusable. This section assumes that you have a good understanding of the basic concepts in building a pipeline. If you feel that you aren't at that point yet, read the -above section, [Minimal WordCount](#minimalwordcount). +above section, [Minimal WordCount](#minimalwordcount-example). **To run this example in Java:** @@ -431,7 +431,7 @@ To view the full code in Python, see The following sections explain these key concepts in detail, and break down the pipeline code into smaller sections. -### Specifying Explicit DoFns +### Specifying explicit DoFns When using `ParDo` transforms, you need to specify the processing operation that gets applied to each element in the input `PCollection`. This processing @@ -460,11 +460,11 @@ static class ExtractWordsFn extends DoFn<String, String> { {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:examples_wordcount_wordcount_dofn %}``` -### Creating Composite Transforms +### Creating composite transforms If you have a processing operation that consists of multiple transforms or `ParDo` steps, you can create it as a subclass of `PTransform`. Creating a -`PTransform` subclass allows you to create complex reusable transforms, can make +`PTransform` subclass allows you to encapsulate complex transforms, can make your pipeline's structure more clear and modular, and makes unit testing easier. In this example, two transforms are encapsulated as the `PTransform` subclass @@ -506,7 +506,7 @@ public static void main(String[] args) throws IOException { {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:examples_wordcount_wordcount_composite %}``` -### Using Parameterizable PipelineOptions +### Using parameterizable PipelineOptions You can hard-code various execution options when you run your pipeline. However, the more common way is to define your own configuration options via command-line @@ -537,7 +537,7 @@ public static void main(String[] args) { {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:examples_wordcount_wordcount_options %}``` -## Debugging WordCount Example +## Debugging WordCount example The Debugging WordCount example demonstrates some best practices for instrumenting your pipeline code. @@ -715,7 +715,7 @@ or DEBUG significantly increases the amount of logs output. > **Note:** This section is yet to be added. There is an open issue for this > ([BEAM-2285](https://issues.apache.org/jira/browse/BEAM-2285)). -### Testing your Pipeline via PAssert +### Testing your pipeline via PAssert `PAssert` is a set of convenient PTransforms in the style of Hamcrest's collection matchers that can be used when writing Pipeline level tests to @@ -743,7 +743,7 @@ public static void main(String[] args) { # This feature is not yet available in the Beam SDK for Python. ``` -## WindowedWordCount +## WindowedWordCount example This example, `WindowedWordCount`, counts words in text just as the previous examples did, but introduces several advanced concepts. @@ -809,10 +809,14 @@ To view the full code in Java, see ### Unbounded and bounded pipeline input modes Beam allows you to create a single pipeline that can handle both bounded and -unbounded types of input. If the input is unbounded, then all PCollections of -the pipeline will be unbounded as well. The same goes for bounded input. If your -input has a fixed number of elements, it's considered a 'bounded' data set. If -your input is continuously updating, then it's considered 'unbounded'. +unbounded types of input. If your input has a fixed number of elements, it's +considered a 'bounded' data set. If your input is continuously updating, then +it's considered 'unbounded' and you must use a runner that supports streaming. + +If your pipeline's input is bounded, then all downstream PCollections will also be +bounded. Similarly, if the input is unbounded, then all downstream PCollections +of the pipeline will be unbounded, though separate branches may be independently +bounded. Recall that the input for this example is a set of Shakespeare's texts, which is a finite set of data. Therefore, this example reads bounded data from a text @@ -832,14 +836,18 @@ public static void main(String[] args) throws IOException { # This feature is not yet available in the Beam SDK for Python. ``` -### Adding Timestamps to Data +### Adding timestamps to data + +Each element in a `PCollection` has an associated [timestamp]({{ site.baseurl }}/documentation/programming-guide#pctimestamps). +The timestamp for each element is initially assigned by the source that creates +the `PCollection`. Some sources that create unbounded PCollections can assign +each new element a timestamp that corresponds to when the element was read or +added. You can manually assign or adjust timestamps with a `DoFn`; however, you +can only move timestamps forward in time. -Each element in a `PCollection` has an associated **timestamp**. The timestamp -for each element is initially assigned by the source that creates the -`PCollection` and can be adjusted by a `DoFn`. In this example the input is -bounded. For the purpose of the example, the `DoFn` method named -`AddTimestampsFn` (invoked by `ParDo`) will set a timestamp for each element in -the `PCollection`. +In this example the input is bounded. For the purpose of the example, the `DoFn` +method named `AddTimestampsFn` (invoked by `ParDo`) will set a timestamp for +each element in the `PCollection`. ```java .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp))); @@ -888,8 +896,8 @@ static class AddTimestampFn extends DoFn<String, String> { ### Windowing -Beam uses a concept called **Windowing** to subdivide a `PCollection` into a -bounded set of elements. PTransforms that aggregate multiple elements process +Beam uses a concept called **Windowing** to subdivide a `PCollection` into +bounded sets of elements. PTransforms that aggregate multiple elements process each `PCollection` as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded). @@ -920,7 +928,7 @@ PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.Cou # This feature is not yet available in the Beam SDK for Python. ``` -### Write Results to an Unbounded Sink +### Writing results to an unbounded sink When our input is unbounded, the same is true of our output `PCollection`. We need to make sure that we choose an appropriate, unbounded sink. Some output -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
