Repository: beam-site Updated Branches: refs/heads/asf-site e6d68ba89 -> ef362b546
[BEAM-1949] Associated doc changes for renaming sideOutput to output Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/aa82f7d6 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/aa82f7d6 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/aa82f7d6 Branch: refs/heads/asf-site Commit: aa82f7d6619ca8e880dee6e36fbd36034f7f291b Parents: e6d68ba Author: melissa <[email protected]> Authored: Thu Apr 13 15:46:31 2017 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Apr 17 10:29:10 2017 -0700 ---------------------------------------------------------------------- src/_data/capability-matrix.yml | 2 +- src/contribute/ptransform-style-guide.md | 2 +- .../pipelines/design-your-pipeline.md | 53 ++++++++------- .../pipelines/test-your-pipeline.md | 23 +++++-- src/documentation/programming-guide.md | 67 +++++++++---------- .../design-your-pipeline-additional-outputs.png | Bin 0 -> 32797 bytes .../design-your-pipeline-side-outputs.png | Bin 36451 -> 0 bytes 7 files changed, 79 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/_data/capability-matrix.yml ---------------------------------------------------------------------- diff --git a/src/_data/capability-matrix.yml b/src/_data/capability-matrix.yml index 1dc2fad..d79bebd 100644 --- a/src/_data/capability-matrix.yml +++ b/src/_data/capability-matrix.yml @@ -178,7 +178,7 @@ categories: - class: model l1: 'Partially' l2: user-provided metrics - l3: Allow transforms to aggregate simple metrics across bundles in a <tt>DoFn</tt>. Semantically equivalent to using a side output, but support partial results as the transform executes. Will likely want to augment <tt>Aggregators</tt> to be more useful for processing unbounded data by making them windowed. + l3: Allow transforms to aggregate simple metrics across bundles in a <tt>DoFn</tt>. Semantically equivalent to using an additional output, but support partial results as the transform executes. Will likely want to augment <tt>Aggregators</tt> to be more useful for processing unbounded data by making them windowed. - class: dataflow l1: 'Partially' l2: may miscount in streaming mode http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/contribute/ptransform-style-guide.md ---------------------------------------------------------------------- diff --git a/src/contribute/ptransform-style-guide.md b/src/contribute/ptransform-style-guide.md index 8bcb2d9..92e9775 100644 --- a/src/contribute/ptransform-style-guide.md +++ b/src/contribute/ptransform-style-guide.md @@ -103,7 +103,7 @@ Do: * If the transform has side effects, strive to make them idempotent (i.e. safe to apply multiple times). Due to retries, the side effects may be executed multiple times, possibly in parallel. * If the transform can have unprocessable (permanently failing) records and you want the pipeline to proceed despite that: * If bad records are safe to ignore, count the bad records in a metric. Make sure the transform's documentation mentions this aggregator. Beware that there is no programmatic access to reading the aggregator value from inside the pipeline during execution. - * If bad records may need manual inspection by the user, emit them into a side output. + * If bad records may need manual inspection by the user, emit them into an output that contains only those records. * Alternatively take a (default zero) threshold above which element failures become bundle failures (structure the transform to count the total number of elements and of failed elements, compare them and fail if failures are above the threshold). * If the user requests a higher data consistency guarantee than you're able to provide, fail. E.g.: if a user requests QoS 2 (exactly-once delivery) from an MQTT connector, the connector should fail since Beam runners may retry writing to the connector and hence exactly-once delivery can't be done. http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/documentation/pipelines/design-your-pipeline.md ---------------------------------------------------------------------- diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index 11b38ca..c40803c 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -70,19 +70,19 @@ PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new D })); ``` -### A single transform that uses side outputs +### A single transform that produces multiple outputs -Another way to branch a pipeline is to have a **single** transform output to multiple `PCollection`s by using [side outputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-sideio). Transforms that use side outputs, process each element of the input once, and allow you to output to zero or more `PCollection`s. +Another way to branch a pipeline is to have a **single** transform output to multiple `PCollection`s by using [tagged outputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-outputs). Transforms that produce more than one output process each element of the input once, and output to zero or more `PCollection`s. -Figure 3 below illustrates the same example described above, but with one transform that uses a side output; Names that start with 'A' are added to the output `PCollection`, and names that start with 'B' are added to the side output `PCollection`. +Figure 3 below illustrates the same example described above, but with one transform that produces multiple outputs. Names that start with 'A' are added to the main output `PCollection`, and names that start with 'B' are added to an additional output `PCollection`. <figure id="fig3"> - <img src="{{ site.baseurl }}/images/design-your-pipeline-side-outputs.png" + <img src="{{ site.baseurl }}/images/design-your-pipeline-additional-outputs.png" alt="A pipeline with a transform that outputs multiple PCollections."> </figure> Figure 3: A pipeline with a transform that outputs multiple PCollections. -The pipeline in Figure 2 contains two transforms that process the elements in the same input `PCollection`. One transform uses the following logic pattern: +The pipeline in Figure 2 contains two transforms that process the elements in the same input `PCollection`. One transform uses the following logic: <pre>if (starts with 'A') { outputToPCollectionA }</pre> @@ -92,43 +92,46 @@ while the other transform uses: Because each transform reads the entire input `PCollection`, each element in the input `PCollection` is processed twice. -The pipeline in Figure 3 performs the same operation in a different way - with only one transform that uses the logic +The pipeline in Figure 3 performs the same operation in a different way - with only one transform that uses the following logic: <pre>if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }</pre> where each element in the input `PCollection` is processed once. See the example code below: ```java -//define main stream and side output -final TupleTag<String> mainStreamTag = new TupleTag<String>(){}; -final TupleTag<String> sideoutTag = new TupleTag<String>(){}; +// Define two TupleTags, one for each output. +final TupleTag<String> startsWithATag = new TupleTag<String>(){}; +final TupleTag<String> startsWithBTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = dbRowCollection.apply( ParDo - // Specify the tag for the main output, wordsBelowCutoffTag. - .withOutputTags(mainStreamTag, - // Specify the tags for the two side outputs as a TupleTagList. - TupleTagList.of(sideoutTag)) + // Specify main output. In this example, it is the output + // with tag startsWithATag. + .withOutputTags(startsWithATag, + // Specify the output with tag startsWithBTag, as a TupleTagList. + TupleTagList.of(startsWithBTag)) .of(new DoFn<String, String>() { @ProcessElement - public void processElement(ProcessContext c) { - if(c.element().startsWith("A")){//output to main stream - c.output(c.element()); - }else if(c.element().startsWith("B")){//emit as Side outputs - c.sideOutput(sideoutTag, c.element()); + public void processElement(ProcessContext c) { + if (c.element().startsWith("A")) { + // Emit to main output, which is the output with tag startsWithATag. + c.output(c.element()); + } else if(c.element().startsWith("B")) { + // Emit to output with tag startsWithBTag. + c.output(startsWithBTag, c.element()); + } } } - } )); -// get subset of main stream -mixedCollection.get(mainStreamTag).apply(...); +// Get subset of the output with tag startsWithATag. +mixedCollection.get(startsWithATag).apply(...); -// get subset of Side output -mixedCollection.get(sideoutTag).apply(...); +// Get subset of the output with tag startsWithBTag. +mixedCollection.get(startsWithBTag).apply(...); ``` -You can use either mechanism to produce multiple output `PCollection`s. However, using side outputs makes more sense if the transform's computation per element is time-consuming. +You can use either mechanism to produce multiple output `PCollection`s. However, using additional outputs makes more sense if the transform's computation per element is time-consuming. ## Merging PCollections @@ -183,4 +186,4 @@ coGbkResultCollection.apply(...); ## What's next * [Create your own pipeline]({{ site.baseurl }}/documentation/pipelines/create-your-pipeline). -* [Test your pipeline]({{ site.baseurl }}/documentation/pipelines/test-your-pipeline). \ No newline at end of file +* [Test your pipeline]({{ site.baseurl }}/documentation/pipelines/test-your-pipeline). http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/documentation/pipelines/test-your-pipeline.md ---------------------------------------------------------------------- diff --git a/src/documentation/pipelines/test-your-pipeline.md b/src/documentation/pipelines/test-your-pipeline.md index 6ce8a45..f091aa2 100644 --- a/src/documentation/pipelines/test-your-pipeline.md +++ b/src/documentation/pipelines/test-your-pipeline.md @@ -38,7 +38,7 @@ The Beam SDK for Java provides a convenient way to test an individual `DoFn` cal `DoFnTester`uses the [JUnit](http://junit.org) framework. To use `DoFnTester`, you'll need to do the following: 1. Create a `DoFnTester`. You'll need to pass an instance of the `DoFn` you want to test to the static factory method for `DoFnTester`. -2. Create one or more main test inputs of the appropriate type for your `DoFn`. If your `DoFn` takes side inputs and/or produces side outputs, you should also create the side inputs and the side output tags. +2. Create one or more main test inputs of the appropriate type for your `DoFn`. If your `DoFn` takes side inputs and/or produces [multiple outputs]({{ site.baseurl }}/documentation/programming-guide#transforms-outputs), you should also create the side inputs and the output tags. 3. Call `DoFnTester.processBundle` to process the main inputs. 4. Use JUnit's `Assert.assertThat` method to ensure the test outputs returned from `processBundle` match your expected values. @@ -65,7 +65,7 @@ DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn); String testInput = "test1"; ``` -#### Side Inputs and Outputs +#### Side Inputs If your `DoFn` accepts side inputs, you can create those side inputs by using the method `DoFnTester.setSideInputs`. @@ -79,11 +79,20 @@ Iterable<Integer> value = ...; fnTester.setSideInputInGlobalWindow(sideInput, value); ``` -If your `DoFn` produces side outputs, you'll need to set the appropriate `TupleTag` objects that you'll use to access each output. A `DoFn` with side outputs produces a `PCollectionTuple` for each side output; you'll need to provide a `TupleTagList` that corresponds to each side output in that tuple. +See the `ParDo` documentation on [side inputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-sideio) for more information. -Suppose your `DoFn` produces side outputs of type `String` and `Integer`. You create `TupleTag` objects for each, and bundle them into a `TupleTagList`, then set it for the `DoFnTester` as follows: +#### Additional Outputs -```java +If your `DoFn` produces multiple output `PCollection`s, you'll need to set the +appropriate `TupleTag` objects that you'll use to access each output. A `DoFn` +with multiple outputs produces a `PCollectionTuple` for each output; you'll need +to provide a `TupleTagList` that corresponds to each output in that tuple. + +Suppose your `DoFn` produces outputs of type `String` and `Integer`. You create +`TupleTag` objects for each, and bundle them into a `TupleTagList`, then set it +for the `DoFnTester` as follows: + +```java static class MyDoFn extends DoFn<String, Integer> { ... } MyDoFn myDoFn = ...; DoFnTester<String, Integer> fnTester = DoFnTester.of(myDoFn); @@ -92,10 +101,10 @@ TupleTag<String> tag1 = ...; TupleTag<Integer> tag2 = ...; TupleTagList tags = TupleTagList.of(tag1).and(tag2); -fnTester.setSideOutputTags(tags); +fnTester.setOutputTags(tags); ``` -See the `ParDo` documentation on [side inputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-sideio) for more information. +See the `ParDo` documentation on [additional outputs]({{ site.baseurl }}/documentation/programming-guide/#transforms-outputs) for more information. ### Processing Test Inputs and Checking Results http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/documentation/programming-guide.md ---------------------------------------------------------------------- diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md index e91a856..d5453ba 100644 --- a/src/documentation/programming-guide.md +++ b/src/documentation/programming-guide.md @@ -38,7 +38,8 @@ The **Beam Programming Guide** is intended for Beam users who want to use the Be * [Using Combine](#transforms-combine) * [Using Flatten and Partition](#transforms-flatten-partition) * [General Requirements for Writing User Code for Beam Transforms](#transforms-usercodereqs) - * [Side Inputs and Side Outputs](#transforms-sideio) + * [Side Inputs](#transforms-sideio) + * [Additional Outputs](#transforms-outputs) * [Composite Transforms](#transforms-composite) * [Pipeline I/O](#io) * [Running the Pipeline](#running) @@ -740,9 +741,7 @@ Your function object should be thread-compatible. Each instance of your function It's recommended that you make your function object idempotent--that is, that it can be repeated or retried as often as necessary without causing unintended side effects. The Beam model provides no guarantees as to the number of times your user code might be invoked or retried; as such, keeping your function object idempotent keeps your pipeline's output deterministic, and your transforms' behavior more predictable and easier to debug. -#### <a name="transforms-sideio"></a>Side Inputs and Side Outputs - -##### **Side inputs** +#### <a name="transforms-sideio"></a>Side Inputs In addition to the main input `PCollection`, you can provide additional inputs to a `ParDo` transform in the form of side inputs. A side input is an additional input that your `DoFn` can access each time it processes an element in the input `PCollection`. When you specify a side input, you create a view of some other data that can be read from within the `ParDo` transform's `DoFn` while procesing each element. @@ -809,51 +808,51 @@ If the main input element exists in more than one window, then `processElement` If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger. -##### **Side outputs** +#### <a name="transforms-outputs"></a>Additional Outputs -While `ParDo` always produces a main output `PCollection` (as the return value from apply), you can also have your `ParDo` produce any number of additional output `PCollection`s. If you choose to have multiple outputs, your `ParDo` returns all of the output `PCollection`s (including the main output) bundled together. +While `ParDo` always produces a main output `PCollection` (as the return value from `apply`), you can also have your `ParDo` produce any number of additional output `PCollection`s. If you choose to have multiple outputs, your `ParDo` returns all of the output `PCollection`s (including the main output) bundled together. -##### Tags for side outputs: +##### Tags for muitiple outputs: ```java -// To emit elements to a side output PCollection, create a TupleTag object to identify each collection that your ParDo produces. -// For example, if your ParDo produces three output PCollections (the main output and two side outputs), you must create three TupleTags. -// The following example code shows how to create TupleTags for a ParDo with a main output and two side outputs: +// To emit elements to multiple output PCollections, create a TupleTag object to identify each collection that your ParDo produces. +// For example, if your ParDo produces three output PCollections (the main output and two additional outputs), you must create three TupleTags. +// The following example code shows how to create TupleTags for a ParDo with three output PCollections. // Input PCollection to our ParDo. PCollection<String> words = ...; // The ParDo will filter words whose length is below a cutoff and add them to // the main ouput PCollection<String>. - // If a word is above the cutoff, the ParDo will add the word length to a side output - // PCollection<Integer>. - // If a word starts with the string "MARKER", the ParDo will add that word to a different - // side output PCollection<String>. + // If a word is above the cutoff, the ParDo will add the word length to an + // output PCollection<Integer>. + // If a word starts with the string "MARKER", the ParDo will add that word to an + // output PCollection<String>. final int wordLengthCutOff = 10; - // Create the TupleTags for the main and side outputs. - // Main output. + // Create three TupleTags, one for each output PCollection. + // Output that contains words below the length cutoff. final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){}; - // Word lengths side output. + // Output that contains word lengths. final TupleTag<Integer> wordLengthsAboveCutOffTag = new TupleTag<Integer>(){}; - // "MARKER" words side output. + // Output that contains "MARKER" words. final TupleTag<String> markedWordsTag = new TupleTag<String>(){}; // Passing Output Tags to ParDo: // After you specify the TupleTags for each of your ParDo outputs, pass the tags to your ParDo by invoking .withOutputTags. -// You pass the tag for the main output first, and then the tags for any side outputs in a TupleTagList. -// Building on our previous example, we pass the three TupleTags (one for the main output and two for the side outputs) to our ParDo. -// Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple. +// You pass the tag for the main output first, and then the tags for any additional outputs in a TupleTagList. +// Building on our previous example, we pass the three TupleTags for our three output PCollections +// to our ParDo. Note that all of the outputs (including the main output PCollection) are bundled into the returned PCollectionTuple. PCollectionTuple results = words.apply( ParDo - // Specify the tag for the main output, wordsBelowCutoffTag. + // Specify the tag for the main output. .withOutputTags(wordsBelowCutOffTag, - // Specify the tags for the two side outputs as a TupleTagList. + // Specify the tags for the two additional outputs as a TupleTagList. TupleTagList.of(wordLengthsAboveCutOffTag) .and(markedWordsTag)) .of(new DoFn<String, String>() { @@ -875,30 +874,30 @@ While `ParDo` always produces a main output `PCollection` (as the return value f {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_pardo_with_side_outputs_iter %}``` -##### Emitting to side outputs in your DoFn: +##### Emitting to multiple outputs in your DoFn: ```java -// Inside your ParDo's DoFn, you can emit an element to a side output by using the method ProcessContext.sideOutput. -// Pass the appropriate TupleTag for the target side output collection when you call ProcessContext.sideOutput. -// After your ParDo, extract the resulting main and side output PCollections from the returned PCollectionTuple. -// Based on the previous example, this shows the DoFn emitting to the main and side outputs. +// Inside your ParDo's DoFn, you can emit an element to a specific output PCollection by passing in the +// appropriate TupleTag when you call ProcessContext.output. +// After your ParDo, extract the resulting output PCollections from the returned PCollectionTuple. +// Based on the previous example, this shows the DoFn emitting to the main output and two additional outputs. .of(new DoFn<String, String>() { public void processElement(ProcessContext c) { String word = c.element(); if (word.length() <= wordLengthCutOff) { - // Emit this short word to the main output. + // Emit short word to the main output. + // In this example, it is the output with tag wordsBelowCutOffTag. c.output(word); } else { - // Emit this long word's length to a side output. - c.sideOutput(wordLengthsAboveCutOffTag, word.length()); + // Emit long word length to the output with tag wordLengthsAboveCutOffTag. + c.output(wordLengthsAboveCutOffTag, word.length()); } if (word.startsWith("MARKER")) { - // Emit this word to a different side output. - c.sideOutput(markedWordsTag, word); + // Emit word to the output with tag markedWordsTag. + c.output(markedWordsTag, word); } }})); - ``` ```py http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/images/design-your-pipeline-additional-outputs.png ---------------------------------------------------------------------- diff --git a/src/images/design-your-pipeline-additional-outputs.png b/src/images/design-your-pipeline-additional-outputs.png new file mode 100644 index 0000000..a4fae32 Binary files /dev/null and b/src/images/design-your-pipeline-additional-outputs.png differ http://git-wip-us.apache.org/repos/asf/beam-site/blob/aa82f7d6/src/images/design-your-pipeline-side-outputs.png ---------------------------------------------------------------------- diff --git a/src/images/design-your-pipeline-side-outputs.png b/src/images/design-your-pipeline-side-outputs.png deleted file mode 100644 index f13989d..0000000 Binary files a/src/images/design-your-pipeline-side-outputs.png and /dev/null differ
