Repository: beam-site
Updated Branches:
  refs/heads/asf-site 0b21f131a -> e3423939c


add code snippet to page design-your-pipeline


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/c40342d7
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/c40342d7
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/c40342d7

Branch: refs/heads/asf-site
Commit: c40342d7387dd1244a24dfdf1c594dcdab900510
Parents: 0b21f13
Author: mingmxu <[email protected]>
Authored: Fri Mar 17 21:08:18 2017 -0700
Committer: Davor Bonaci <[email protected]>
Committed: Wed Mar 22 10:15:48 2017 -0700

----------------------------------------------------------------------
 .../pipelines/design-your-pipeline.md           | 88 ++++++++++++++++++--
 1 file changed, 82 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/c40342d7/src/documentation/pipelines/design-your-pipeline.md
----------------------------------------------------------------------
diff --git a/src/documentation/pipelines/design-your-pipeline.md 
b/src/documentation/pipelines/design-your-pipeline.md
index 937b35c..11b38ca 100644
--- a/src/documentation/pipelines/design-your-pipeline.md
+++ b/src/documentation/pipelines/design-your-pipeline.md
@@ -31,7 +31,7 @@ The simplest pipelines represent a linear flow of operations, 
as shown in Figure
 </figure>
 Figure 1: A linear pipeline.
 
-However, your pipeline can be significantly more complex. A pipeline 
represents a [Directed Acyclic 
Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can 
have multiple input sources, multiple output sinks, and its operations 
(transforms) can output multiple `PCollection`s. The following examples show 
some of the different shapes your pipeline can take.
+However, your pipeline can be significantly more complex. A pipeline 
represents a [Directed Acyclic 
Graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) of steps. It can 
have multiple input sources, multiple output sinks, and its operations 
(`PTransform`s) can both read and output multiple `PCollection`s. The following 
examples show some of the different shapes your pipeline can take.
 
 ## Branching PCollections
 
@@ -47,7 +47,28 @@ The pipeline illustrated in Figure 2 below reads its input, 
first names (Strings
     <img src="{{ site.baseurl 
}}/images/design-your-pipeline-multiple-pcollections.png"
          alt="A pipeline with multiple transforms. Note that the PCollection 
of table rows is processed by two transforms.">
 </figure>
-Figure 2: A pipeline with multiple transforms. Note that the PCollection of 
the database table rows is processed by two transforms.
+Figure 2: A pipeline with multiple transforms. Note that the PCollection of 
the database table rows is processed by two transforms. See the example code 
below:
+```java
+PCollection<String> dbRowCollection = ...;
+
+PCollection<String> aCollection = dbRowCollection.apply("aTrans", ParDo.of(new 
DoFn<String, String>(){
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    if(c.element().startsWith("A")){
+      c.output(c.element());
+    }
+  }
+}));
+
+PCollection<String> bCollection = dbRowCollection.apply("bTrans", ParDo.of(new 
DoFn<String, String>(){
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    if(c.element().startsWith("B")){
+      c.output(c.element());
+    }
+  }
+}));
+```
 
 ### A single transform that uses side outputs
 
@@ -75,7 +96,37 @@ The pipeline in Figure 3 performs the same operation in a 
different way - with o
 
 <pre>if (starts with 'A') { outputToPCollectionA } else if (starts with 'B') { 
outputToPCollectionB }</pre>
 
-where each element in the input `PCollection` is processed once.
+where each element in the input `PCollection` is processed once. See the 
example code below:
+```java
+//define main stream and side output
+final TupleTag<String> mainStreamTag = new TupleTag<String>(){};
+final TupleTag<String> sideoutTag = new TupleTag<String>(){};
+
+PCollectionTuple mixedCollection =
+    dbRowCollection.apply(
+        ParDo
+        // Specify the tag for the main output, wordsBelowCutoffTag.
+        .withOutputTags(mainStreamTag,
+        // Specify the tags for the two side outputs as a TupleTagList.
+                        TupleTagList.of(sideoutTag))
+        .of(new DoFn<String, String>() {
+          @ProcessElement
+        public void processElement(ProcessContext c) {
+          if(c.element().startsWith("A")){//output to main stream
+            c.output(c.element());
+          }else if(c.element().startsWith("B")){//emit as Side outputs
+            c.sideOutput(sideoutTag, c.element());
+          }
+        }
+        }
+        ));
+
+// get subset of main stream 
+mixedCollection.get(mainStreamTag).apply(...);
+
+// get subset of Side output
+mixedCollection.get(sideoutTag).apply(...);
+```
 
 You can use either mechanism to produce multiple output `PCollection`s. 
However, using side outputs makes more sense if the transform's computation per 
element is time-consuming.
 
@@ -86,13 +137,22 @@ Often, after you've branched your `PCollection` into 
multiple `PCollection`s via
 *   **Flatten** - You can use the `Flatten` transform in the Beam SDKs to 
merge multiple `PCollection`s of the **same type**.
 *   **Join** - You can use the `CoGroupByKey` transform in the Beam SDK to 
perform a relational join between two `PCollection`s. The `PCollection`s must 
be keyed (i.e. they must be collections of key/value pairs) and they must use 
the same key type.
 
-The example depicted in Figure 4 below is a continuation of the example 
illustrated in Figure 2 in the section above. After branching into two 
`PCollection`s, one with names that begin with 'A' and one with names that 
begin with 'B', the pipeline merges the two together into a single 
`PCollection` that now contains all names that begin with either 'A' or 'B'. 
Here, it makes sense to use `Flatten` because the `PCollection`s being merged 
both contain the same type.
+The example depicted in Figure 4 below is a continuation of the example 
illustrated in Figure 2 in [the section 
above](#multiple-transforms-process-the-same-pcollection). After branching into 
two `PCollection`s, one with names that begin with 'A' and one with names that 
begin with 'B', the pipeline merges the two together into a single 
`PCollection` that now contains all names that begin with either 'A' or 'B'. 
Here, it makes sense to use `Flatten` because the `PCollection`s being merged 
both contain the same type.
 
 <figure id="fig4">
     <img src="{{ site.baseurl }}/images/design-your-pipeline-flatten.png"
          alt="Part of a pipeline that merges multiple PCollections.">
 </figure>
-Figure 4: Part of a pipeline that merges multiple PCollections.
+Figure 4: Part of a pipeline that merges multiple PCollections. See the 
example code below:
+```java
+//merge the two PCollections with Flatten
+PCollectionList<String> collectionList = 
PCollectionList.of(aCollection).and(bCollection);
+PCollection<String> mergedCollectionWithFlatten = collectionList
+    .apply(Flatten.<String>pCollections());
+
+// continue with the new merged PCollection            
+mergedCollectionWithFlatten.apply(...);
+```
 
 ## Multiple sources
 
@@ -102,7 +162,23 @@ Your pipeline can read its input from one or more sources. 
If your pipeline read
     <img src="{{ site.baseurl }}/images/design-your-pipeline-join.png"
          alt="A pipeline with multiple input sources.">
 </figure>
-Figure 5: A pipeline with multiple input sources.
+Figure 5: A pipeline with multiple input sources. See the example code below:
+```java
+PCollection<KV<String, String>> userAddress = 
pipeline.apply(JdbcIO.<KV<String, String>>read()...);
+
+PCollection<KV<String, String>> userOrder = pipeline.apply(TextIO.<KV<String, 
String>>read()...);
+
+final TupleTag<String> addressTag = new TupleTag<String>();
+final TupleTag<String> orderTag = new TupleTag<String>();
+
+// Merge collection values into a CoGbkResult collection.
+PCollection<KV<String, CoGbkResult>> joinedCollection =
+  KeyedPCollectionTuple.of(addressTag, userAddress)
+                       .and(orderTag, userOrder)
+                       .apply(CoGroupByKey.<String>create());
+
+coGbkResultCollection.apply(...);
+```
 
 ## What's next
 

Reply via email to