Repository: beam-site Updated Branches: refs/heads/asf-site 0b21f131a -> e3423939c
add code snippet to page design-your-pipeline Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/c40342d7 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/c40342d7 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/c40342d7 Branch: refs/heads/asf-site Commit: c40342d7387dd1244a24dfdf1c594dcdab900510 Parents: 0b21f13 Author: mingmxu <[email protected]> Authored: Fri Mar 17 21:08:18 2017 -0700 Committer: Davor Bonaci <[email protected]> Committed: Wed Mar 22 10:15:48 2017 -0700 ---------------------------------------------------------------------- .../pipelines/design-your-pipeline.md | 88 ++++++++++++++++++-- 1 file changed, 82 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/c40342d7/src/documentation/pipelines/design-your-pipeline.md ---------------------------------------------------------------------- diff --git a/src/documentation/pipelines/design-your-pipeline.md b/src/documentation/pipelines/design-your-pipeline.md index 937b35c..11b38ca 100644 --- a/src/documentation/pipelines/design-your-pipeline.md +++ b/src/documentation/pipelines/design-your-pipeline.md @@ -31,7 +31,7 @@ The simplest pipelines represent a linear flow of operations, as shown in Figure </figure> Figure 1: A linear pipeline. -However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (transforms) can output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. +However, your pipeline can be significantly more complex. A pipeline represents a [Directed Acyclic Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can have multiple input sources, multiple output sinks, and its operations (`PTransform`s) can both read and output multiple `PCollection`s. The following examples show some of the different shapes your pipeline can take. ## Branching PCollections @@ -47,7 +47,28 @@ The pipeline illustrated in Figure 2 below reads its input, first names (Strings <img src="{{ site.baseurl }}/images/design-your-pipeline-multiple-pcollections.png" alt="A pipeline with multiple transforms. Note that the PCollection of table rows is processed by two transforms."> </figure> -Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database table rows is processed by two transforms. +Figure 2: A pipeline with multiple transforms. Note that the PCollection of the database table rows is processed by two transforms. See the example code below: +```java +PCollection<String> dbRowCollection = ...; + +PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new DoFn<String, String>(){ + @ProcessElement + public void processElement(ProcessContext c) { + if(c.element().startsWith("A")){ + c.output(c.element()); + } + } +})); + +PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new DoFn<String, String>(){ + @ProcessElement + public void processElement(ProcessContext c) { + if(c.element().startsWith("B")){ + c.output(c.element()); + } + } +})); +``` ### A single transform that uses side outputs @@ -75,7 +96,37 @@ The pipeline in Figure 3 performs the same operation in a different way - with o <pre>if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { outputToPCollectionB }</pre> -where each element in the input `PCollection` is processed once. +where each element in the input `PCollection` is processed once. See the example code below: +```java +//define main stream and side output +final TupleTag<String> mainStreamTag = new TupleTag<String>(){}; +final TupleTag<String> sideoutTag = new TupleTag<String>(){}; + +PCollectionTuple mixedCollection = + dbRowCollection.apply( + ParDo + // Specify the tag for the main output, wordsBelowCutoffTag. + .withOutputTags(mainStreamTag, + // Specify the tags for the two side outputs as a TupleTagList. + TupleTagList.of(sideoutTag)) + .of(new DoFn<String, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + if(c.element().startsWith("A")){//output to main stream + c.output(c.element()); + }else if(c.element().startsWith("B")){//emit as Side outputs + c.sideOutput(sideoutTag, c.element()); + } + } + } + )); + +// get subset of main stream +mixedCollection.get(mainStreamTag).apply(...); + +// get subset of Side output +mixedCollection.get(sideoutTag).apply(...); +``` You can use either mechanism to produce multiple output `PCollection`s. However, using side outputs makes more sense if the transform's computation per element is time-consuming. @@ -86,13 +137,22 @@ Often, after you've branched your `PCollection` into multiple `PCollection`s via * **Flatten** - You can use the `Flatten` transform in the Beam SDKs to merge multiple `PCollection`s of the **same type**. * **Join** - You can use the `CoGroupByKey` transform in the Beam SDK to perform a relational join between two `PCollection`s. The `PCollection`s must be keyed (i.e. they must be collections of key/value pairs) and they must use the same key type. -The example depicted in Figure 4 below is a continuation of the example illustrated in Figure 2 in the section above. After branching into two `PCollection`s, one with names that begin with 'A' and one with names that begin with 'B', the pipeline merges the two together into a single `PCollection` that now contains all names that begin with either 'A' or 'B'. Here, it makes sense to use `Flatten` because the `PCollection`s being merged both contain the same type. +The example depicted in Figure 4 below is a continuation of the example illustrated in Figure 2 in [the section above](#multiple-transforms-process-the-same-pcollection). After branching into two `PCollection`s, one with names that begin with 'A' and one with names that begin with 'B', the pipeline merges the two together into a single `PCollection` that now contains all names that begin with either 'A' or 'B'. Here, it makes sense to use `Flatten` because the `PCollection`s being merged both contain the same type. <figure id="fig4"> <img src="{{ site.baseurl }}/images/design-your-pipeline-flatten.png" alt="Part of a pipeline that merges multiple PCollections."> </figure> -Figure 4: Part of a pipeline that merges multiple PCollections. +Figure 4: Part of a pipeline that merges multiple PCollections. See the example code below: +```java +//merge the two PCollections with Flatten +PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection); +PCollection<String> mergedCollectionWithFlatten = collectionList + .apply(Flatten.<String>pCollections()); + +// continue with the new merged PCollection +mergedCollectionWithFlatten.apply(...); +``` ## Multiple sources @@ -102,7 +162,23 @@ Your pipeline can read its input from one or more sources. If your pipeline read <img src="{{ site.baseurl }}/images/design-your-pipeline-join.png" alt="A pipeline with multiple input sources."> </figure> -Figure 5: A pipeline with multiple input sources. +Figure 5: A pipeline with multiple input sources. See the example code below: +```java +PCollection<KV<String, String>> userAddress = pipeline.apply(JdbcIO.<KV<String, String>>read()...); + +PCollection<KV<String, String>> userOrder = pipeline.apply(TextIO.<KV<String, String>>read()...); + +final TupleTag<String> addressTag = new TupleTag<String>(); +final TupleTag<String> orderTag = new TupleTag<String>(); + +// Merge collection values into a CoGbkResult collection. +PCollection<KV<String, CoGbkResult>> joinedCollection = + KeyedPCollectionTuple.of(addressTag, userAddress) + .and(orderTag, userOrder) + .apply(CoGroupByKey.<String>create()); + +coGbkResultCollection.apply(...); +``` ## What's next
