http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/27b5e76b/content/use/wordcount-example/index.html
----------------------------------------------------------------------
diff --git a/content/use/wordcount-example/index.html 
b/content/use/wordcount-example/index.html
new file mode 100644
index 0000000..9ac76ef
--- /dev/null
+++ b/content/use/wordcount-example/index.html
@@ -0,0 +1,648 @@
+<!DOCTYPE html>
+<html lang="en">
+
+  <head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1">
+
+  <title>Beam WordCount Example</title>
+  <meta name="description" content="Apache Beam is an open source, unified 
model and set of language-specific SDKs for defining and executing data 
processing workflows, and also data ingestion and integration flows, supporting 
Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). 
Dataflow pipelines simplify the mechanics of large-scale batch and streaming 
data processing and can run on a number of runtimes like Apache Flink, Apache 
Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in 
different languages, allowing users to easily implement their data integration 
processes.
+">
+
+  <link rel="stylesheet" href="/styles/site.css">
+  <link rel="stylesheet" href="/css/theme.css">
+  <script 
src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js";></script>
+  <script src="/js/bootstrap.min.js"></script>
+  <link rel="canonical" 
href="http://beam.incubator.apache.org/use/wordcount-example/"; 
data-proofer-ignore>
+  <link rel="alternate" type="application/rss+xml" title="Apache Beam 
(incubating)" href="http://beam.incubator.apache.org/feed.xml";>
+  <script>
+    
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+    (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new 
Date();a=s.createElement(o),
+    
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+    
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+    ga('create', 'UA-73650088-1', 'auto');
+    ga('send', 'pageview');
+
+  </script>
+  <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
+</head>
+
+
+  <body role="document">
+
+    <nav class="navbar navbar-default navbar-fixed-top">
+  <div class="container">
+    <div class="navbar-header">
+      <a href="/" class="navbar-brand" >
+        <img alt="Brand" style="height: 25px" 
src="/images/beam_logo_navbar.png">
+      </a>
+      <button type="button" class="navbar-toggle collapsed" 
data-toggle="collapse" data-target="#navbar" aria-expanded="false" 
aria-controls="navbar">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+    </div>
+    <div id="navbar" class="navbar-collapse collapse">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Use <span 
class="caret"></span></a>
+                 <ul class="dropdown-menu">
+                         <li><a href="/use">User Hub</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">General</li>
+                         <li><a href="/use/beam-overview/">Beam 
Overview</a></li>
+                         <li><a href="/use/quickstart/">Quickstart</a></li>
+                         <li><a href="/use/releases">Release 
Information</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Example Walkthroughs</li>
+                         <li><a 
href="/use/wordcount-example/">WordCount</a></li>
+                         <li><a href="/use/walkthroughs/">Mobile 
Gaming</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Support</li>
+                         <li><a href="/use/mailing-lists/">Mailing 
Lists</a></li>
+              <li><a href="/use/issue-tracking/">Issue Tracking</a></li>
+                         <li><a 
href="http://stackoverflow.com/questions/tagged/apache-beam";>Beam on 
StackOverflow</a></li>
+              <li><a href="http://apachebeam.slack.com";>Beam Slack 
Channel</a></li>
+                 </ul>
+           </li>
+        <li class="dropdown">
+                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Learn <span 
class="caret"></span></a>
+                 <ul class="dropdown-menu">
+                         <li><a href="/learn">Learner Hub</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Beam Concepts</li>
+                         <li><a href="/learn/programming-guide/">Programming 
Guide</a></li>
+                         <li><a 
href="/learn/presentation-materials/">Presentation Materials</a></li>
+                         <li><a href="/learn/resources/">Additional 
Resources</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">SDKs</li>
+                         <li><a href="/learn/sdks/java/">Java SDK</a></li>
+                         <li><a href="/learn/sdks/javadoc/">Java SDK API 
Reference</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Runners</li>
+                         <li><a 
href="/learn/runners/capability-matrix/">Capability Matrix</a></li>
+                         <li><a href="/learn/runners/direct/">Direct 
Runner</a></li>
+                         <li><a href="/learn/runners/flink/">Apache Flink 
Runner</a></li>
+                         <li><a href="/learn/runners/spark/">Apache Spark 
Runner</a></li>
+                         <li><a href="/learn/runners/dataflow/">Cloud Dataflow 
Runner</a></li>
+                 </ul>
+           </li>
+        <li class="dropdown">
+                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Contribute <span 
class="caret"></span></a>
+                 <ul class="dropdown-menu">
+                         <li><a href="/contribute">Contributor Hub</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Basics</li>
+                         <li><a 
href="/contribute/contribution-guide/">Contribution Guide</a></li>
+                         <li><a href="/contribute/work-in-progress/">Work In 
Progress</a></li>
+                         <li><a href="/use/mailing-lists/">Mailing 
Lists</a></li>
+              <li><a href="/contribute/source-repository/">Source 
Repository</a></li>
+              <li><a href="/use/issue-tracking/">Issue Tracking</a></li>
+              <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Technical References</li>
+                         <li><a href="/contribute/testing/">Testing</a></li>
+              <li><a href="/contribute/design-principles/">Design 
Principles</a></li>
+                         <li><a href="https://goo.gl/nk5OM0";>Technical 
Vision</a></li>
+        <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Committer Resources</li>
+        <li><a href="/contribute/release-guide/">Release Guide</a></li>
+                 </ul>
+           </li>
+        <li><a href="/blog">Blog</a></li>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Project<span 
class="caret"></span></a>
+          <ul class="dropdown-menu">
+            <li><a href="/project/logos/">Logos and design</a></li>
+            <li><a href="/project/public-meetings/">Public Meetings</a></li>
+                       <li><a href="/project/team/">Team</a></li>
+          </ul>
+        </li>
+      </ul>
+      <ul class="nav navbar-nav navbar-right">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false"><img 
src="https://www.apache.org/foundation/press/kit/feather_small.png"; alt="Apache 
Logo" style="height:24px;">Apache Software Foundation<span 
class="caret"></span></a>
+          <ul class="dropdown-menu dropdown-menu-right">
+            <li><a href="http://www.apache.org/";>ASF Homepage</a></li>
+            <li><a href="http://www.apache.org/licenses/";>License</a></li>
+            <li><a href="http://www.apache.org/security/";>Security</a></li>
+            <li><a 
href="http://www.apache.org/foundation/thanks.html";>Thanks</a></li>
+            <li><a 
href="http://www.apache.org/foundation/sponsorship.html";>Sponsorship</a></li>
+            <li><a 
href="https://www.apache.org/foundation/policies/conduct";>Code of 
Conduct</a></li>
+          </ul>
+        </li>
+      </ul>
+    </div><!--/.nav-collapse -->
+  </div>
+</nav>
+
+
+<link rel="stylesheet" href="">
+
+
+    <div class="container" role="main">
+
+      <div class="row">
+        <h1 id="apache-beam-wordcount-example">Apache Beam WordCount 
Example</h1>
+
+<ul id="markdown-toc">
+  <li><a href="#minimalwordcount" 
id="markdown-toc-minimalwordcount">MinimalWordCount</a>    <ul>
+      <li><a href="#creating-the-pipeline" 
id="markdown-toc-creating-the-pipeline">Creating the Pipeline</a></li>
+      <li><a href="#applying-pipeline-transforms" 
id="markdown-toc-applying-pipeline-transforms">Applying Pipeline 
Transforms</a></li>
+      <li><a href="#running-the-pipeline" 
id="markdown-toc-running-the-pipeline">Running the Pipeline</a></li>
+    </ul>
+  </li>
+  <li><a href="#wordcount-example" 
id="markdown-toc-wordcount-example">WordCount Example</a>    <ul>
+      <li><a href="#specifying-explicit-dofns" 
id="markdown-toc-specifying-explicit-dofns">Specifying Explicit DoFns</a></li>
+      <li><a href="#creating-composite-transforms" 
id="markdown-toc-creating-composite-transforms">Creating Composite 
Transforms</a></li>
+      <li><a href="#using-parameterizable-pipelineoptions" 
id="markdown-toc-using-parameterizable-pipelineoptions">Using Parameterizable 
PipelineOptions</a></li>
+    </ul>
+  </li>
+  <li><a href="#debugging-wordcount-example" 
id="markdown-toc-debugging-wordcount-example">Debugging WordCount Example</a>   
 <ul>
+      <li><a href="#logging" id="markdown-toc-logging">Logging</a>        <ul>
+          <li><a href="#direct-runner" id="markdown-toc-direct-runner">Direct 
Runner</a></li>
+          <li><a href="#dataflow-runner" 
id="markdown-toc-dataflow-runner">Dataflow Runner</a></li>
+          <li><a href="#apache-spark-runner" 
id="markdown-toc-apache-spark-runner">Apache Spark Runner</a></li>
+          <li><a href="#apache-flink-runner" 
id="markdown-toc-apache-flink-runner">Apache Flink Runner</a></li>
+        </ul>
+      </li>
+      <li><a href="#testing-your-pipeline-via-passert" 
id="markdown-toc-testing-your-pipeline-via-passert">Testing your Pipeline via 
PAssert</a></li>
+    </ul>
+  </li>
+  <li><a href="#windowedwordcount" 
id="markdown-toc-windowedwordcount">WindowedWordCount</a>    <ul>
+      <li><a href="#unbounded-and-bounded-pipeline-input-modes" 
id="markdown-toc-unbounded-and-bounded-pipeline-input-modes">Unbounded and 
bounded pipeline input modes</a></li>
+      <li><a href="#adding-timestamps-to-data" 
id="markdown-toc-adding-timestamps-to-data">Adding Timestamps to Data</a></li>
+      <li><a href="#windowing" id="markdown-toc-windowing">Windowing</a></li>
+      <li><a href="#reusing-ptransforms-over-windowed-pcollections" 
id="markdown-toc-reusing-ptransforms-over-windowed-pcollections">Reusing 
PTransforms over windowed PCollections</a></li>
+    </ul>
+  </li>
+  <li><a href="#write-results-to-an-unbounded-sink" 
id="markdown-toc-write-results-to-an-unbounded-sink">Write Results to an 
Unbounded Sink</a></li>
+</ul>
+
+<blockquote>
+  <p><strong>Note:</strong> This walkthrough is still in progress. Detailed 
instructions for running the example pipelines across multiple runners are yet 
to be added. There is an open issue to finish the walkthrough (<a 
href="https://issues.apache.org/jira/browse/BEAM-664";>BEAM-664</a>).</p>
+</blockquote>
+
+<p>The WordCount examples demonstrate how to set up a processing pipeline that 
can read text, tokenize the text lines into individual words, and perform a 
frequency count on each of those words. The Beam SDKs contain a series of these 
four successively more detailed WordCount examples that build on each other. 
The input text for all the examples is a set of Shakespeare’s texts.</p>
+
+<p>Each WordCount example introduces different concepts in the Beam 
programming model. Begin by understanding Minimal WordCount, the simplest of 
the examples. Once you feel comfortable with the basic principles in building a 
pipeline, continue on to learn more concepts in the other examples.</p>
+
+<ul>
+  <li><strong>Minimal WordCount</strong> demonstrates the basic principles 
involved in building a pipeline.</li>
+  <li><strong>WordCount</strong> introduces some of the more common best 
practices in creating re-usable and maintainable pipelines.</li>
+  <li><strong>Debugging WordCount</strong> introduces logging and debugging 
practices.</li>
+  <li><strong>Windowed WordCount</strong> demonstrates how you can use 
Beam’s programming model to handle both bounded and unbounded datasets.</li>
+</ul>
+
+<h2 id="minimalwordcount">MinimalWordCount</h2>
+
+<p>Minimal WordCount demonstrates a simple pipeline that can read from a text 
file, apply transforms to tokenize and count the words, and write the data to 
an output text file. This example hard-codes the locations for its input and 
output files and doesn’t perform any error checking; it is intended to only 
show you the “bare bones” of creating a Beam pipeline. This lack of 
parameterization makes this particular pipeline less portable across different 
runners than standard Beam pipelines. In later examples, we will parameterize 
the pipeline’s input and output sources and show other best practices.</p>
+
+<p>To run this example, follow the instructions in the <a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/README.md#building-and-running";>Beam
 Examples README</a>. To view the full code, see <strong><a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java";>MinimalWordCount</a>.</strong></p>
+
+<p><strong>Key Concepts:</strong></p>
+
+<ul>
+  <li>Creating the Pipeline</li>
+  <li>Applying transforms to the Pipeline</li>
+  <li>Reading input (in this example: reading text files)</li>
+  <li>Applying ParDo transforms</li>
+  <li>Applying SDK-provided transforms (in this example: Count)</li>
+  <li>Writing output (in this example: writing to a text file)</li>
+  <li>Running the Pipeline</li>
+</ul>
+
+<p>The following sections explain these concepts in detail along with excerpts 
of the relevant code from the Minimal WordCount pipeline.</p>
+
+<h3 id="creating-the-pipeline">Creating the Pipeline</h3>
+
+<p>The first step in creating a Beam pipeline is to create a <code 
class="highlighter-rouge">PipelineOptions object</code>. This object lets us 
set various options for our pipeline, such as the pipeline runner that will 
execute our pipeline and any runner-specific configuration required by the 
chosen runner. In this example we set these options programmatically, but more 
often command-line arguments are used to set <code 
class="highlighter-rouge">PipelineOptions</code>.</p>
+
+<p>You can specify a runner for executing your pipeline, such as the <code 
class="highlighter-rouge">DataflowRunner</code> or <code 
class="highlighter-rouge">SparkRunner</code>. If you omit specifying a runner, 
as in this example, your pipeline will be executed locally using the <code 
class="highlighter-rouge">DirectRunner</code>. In the next sections, we will 
specify the pipeline’s runner.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code> 
<span class="n">PipelineOptions</span> <span class="n">options</span> <span 
class="o">=</span> <span class="n">PipelineOptionsFactory</span><span 
class="o">.</span><span class="na">create</span><span class="o">();</span>
+
+    <span class="c1">// In order to run your pipeline, you need to make 
following runner specific changes:</span>
+    <span class="c1">//</span>
+    <span class="c1">// CHANGE 1/3: Select a Beam runner, such as 
DataflowRunner or FlinkRunner.</span>
+    <span class="c1">// CHANGE 2/3: Specify runner-required options.</span>
+    <span class="c1">// For DataflowRunner, set project and temp location as 
follows:</span>
+    <span class="c1">//   DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);</span>
+    <span class="c1">//   
dataflowOptions.setRunner(DataflowRunner.class);</span>
+    <span class="c1">//   
dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");</span>
+    <span class="c1">//   
dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");</span>
+    <span class="c1">// For FlinkRunner, set the runner as follows. See {@code 
FlinkPipelineOptions}</span>
+    <span class="c1">// for more details.</span>
+    <span class="c1">//   options.setRunner(FlinkRunner.class);</span>
+</code></pre>
+</div>
+
+<p>The next step is to create a Pipeline object with the options we’ve just 
constructed. The Pipeline object builds up the graph of transformations to be 
executed, associated with that particular pipeline.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">Pipeline</span> <span 
class="n">p</span> <span class="o">=</span> <span 
class="n">Pipeline</span><span class="o">.</span><span 
class="na">create</span><span class="o">(</span><span 
class="n">options</span><span class="o">);</span>
+</code></pre>
+</div>
+
+<h3 id="applying-pipeline-transforms">Applying Pipeline Transforms</h3>
+
+<p>The Minimal WordCount pipeline contains several transforms to read data 
into the pipeline, manipulate or otherwise transform the data, and write out 
the results. Each transform represents an operation in the pipeline.</p>
+
+<p>Each transform takes some kind of input (data or otherwise), and produces 
some output data. The input and output data is represented by the SDK class 
<code class="highlighter-rouge">PCollection</code>. <code 
class="highlighter-rouge">PCollection</code> is a special class, provided by 
the Beam SDK, that you can use to represent a data set of virtually any size, 
including infinite data sets.</p>
+
+<p>&lt;img src=”/images/wordcount-pipeline.png alt=”Word Count pipeline 
diagram””&gt;
+Figure 1: The pipeline data flow.</p>
+
+<p>The Minimal WordCount pipeline contains five transforms:</p>
+
+<ol>
+  <li>
+    <p>A text file <code class="highlighter-rouge">Read</code> transform is 
applied to the Pipeline object itself, and produces a <code 
class="highlighter-rouge">PCollection</code> as output. Each element in the 
output PCollection represents one line of text from the input file. This 
example happens to use input data stored in a publicly accessible Google Cloud 
Storage bucket (“gs://”).</p>
+
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code> 
       <span class="n">p</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">TextIO</span><span class="o">.</span><span 
class="na">Read</span><span class="o">.</span><span class="na">from</span><span 
class="o">(</span><span 
class="s">"gs://apache-beam-samples/shakespeare/*"</span><span 
class="o">))</span>
+</code></pre>
+    </div>
+  </li>
+  <li>
+    <p>A <a href="/learn/programming-guide/#transforms-pardo">ParDo</a> 
transform that invokes a <code class="highlighter-rouge">DoFn</code> (defined 
in-line as an anonymous class) on each element that tokenizes the text lines 
into individual words. The input for this transform is the <code 
class="highlighter-rouge">PCollection</code> of text lines generated by the 
previous <code class="highlighter-rouge">TextIO.Read</code> transform. The 
<code class="highlighter-rouge">ParDo</code> transform outputs a new <code 
class="highlighter-rouge">PCollection</code>, where each element represents an 
individual word in the text.</p>
+
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code> 
       <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"ExtractWords"</span><span class="o">,</span> 
<span class="n">ParDo</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">DoFn</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+            <span class="nd">@ProcessElement</span>
+            <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+                <span class="k">for</span> <span class="o">(</span><span 
class="n">String</span> <span class="n">word</span> <span class="o">:</span> 
<span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">().</span><span 
class="na">split</span><span class="o">(</span><span 
class="s">"[^a-zA-Z']+"</span><span class="o">))</span> <span class="o">{</span>
+                    <span class="k">if</span> <span class="o">(!</span><span 
class="n">word</span><span class="o">.</span><span 
class="na">isEmpty</span><span class="o">())</span> <span class="o">{</span>
+                        <span class="n">c</span><span class="o">.</span><span 
class="na">output</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
+                    <span class="o">}</span>
+                <span class="o">}</span>
+            <span class="o">}</span>
+        <span class="o">}))</span>
+</code></pre>
+    </div>
+  </li>
+  <li>
+    <p>The SDK-provided <code class="highlighter-rouge">Count</code> transform 
is a generic transform that takes a <code 
class="highlighter-rouge">PCollection</code> of any type, and returns a <code 
class="highlighter-rouge">PCollection</code> of key/value pairs. Each key 
represents a unique element from the input collection, and each value 
represents the number of times that key appeared in the input collection.</p>
+
+    <p>In this pipeline, the input for <code 
class="highlighter-rouge">Count</code> is the <code 
class="highlighter-rouge">PCollection</code> of individual words generated by 
the previous <code class="highlighter-rouge">ParDo</code>, and the output is a 
<code class="highlighter-rouge">PCollection</code> of key/value pairs where 
each key represents a unique word in the text and the associated value is the 
occurrence count for each.</p>
+
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code> 
       <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">Count</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">perElement</span><span class="o">())</span>
+</code></pre>
+    </div>
+  </li>
+  <li>
+    <p>The next transform formats each of the key/value pairs of unique words 
and occurrence counts into a printable string suitable for writing to an output 
file.</p>
+
+    <p><code class="highlighter-rouge">MapElements</code> is a higher-level 
composite transform that encapsulates a simple <code 
class="highlighter-rouge">ParDo</code>; for each element in the input <code 
class="highlighter-rouge">PCollection</code>, <code 
class="highlighter-rouge">MapElements</code> applies a function that produces 
exactly one output element. In this example, <code 
class="highlighter-rouge">MapElements</code> invokes a <code 
class="highlighter-rouge">SimpleFunction</code> (defined in-line as an 
anonymous class) that does the formatting. As input, <code 
class="highlighter-rouge">MapElements</code> takes a <code 
class="highlighter-rouge">PCollection</code> of key/value pairs generated by 
<code class="highlighter-rouge">Count</code>, and produces a new <code 
class="highlighter-rouge">PCollection</code> of printable strings.</p>
+
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code> 
       <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="s">"FormatResults"</span><span 
class="o">,</span> <span class="n">MapElements</span><span 
class="o">.</span><span class="na">via</span><span class="o">(</span><span 
class="k">new</span> <span class="n">SimpleFunction</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;,</span> <span 
class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+            <span class="nd">@Override</span>
+            <span class="kd">public</span> <span class="n">String</span> <span 
class="nf">apply</span><span class="o">(</span><span class="n">KV</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">input</span><span class="o">)</span> <span class="o">{</span>
+                <span class="k">return</span> <span 
class="n">input</span><span class="o">.</span><span 
class="na">getKey</span><span class="o">()</span> <span class="o">+</span> 
<span class="s">": "</span> <span class="o">+</span> <span 
class="n">input</span><span class="o">.</span><span 
class="na">getValue</span><span class="o">();</span>
+            <span class="o">}</span>
+        <span class="o">}))</span>
+</code></pre>
+    </div>
+  </li>
+  <li>
+    <p>A text file <code class="highlighter-rouge">Write</code>. This 
transform takes the final <code class="highlighter-rouge">PCollection</code> of 
formatted Strings as input and writes each element to an output text file. Each 
element in the input <code class="highlighter-rouge">PCollection</code> 
represents one line of text in the resulting output file.</p>
+
+    <div class="language-java highlighter-rouge"><pre class="highlight"><code> 
       <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span 
class="na">Write</span><span class="o">.</span><span class="na">to</span><span 
class="o">(</span><span 
class="s">"gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"</span><span 
class="o">));</span>
+</code></pre>
+    </div>
+  </li>
+</ol>
+
+<p>Note that the <code class="highlighter-rouge">Write</code> transform 
produces a trivial result value of type <code 
class="highlighter-rouge">PDone</code>, which in this case is ignored.</p>
+
+<h3 id="running-the-pipeline">Running the Pipeline</h3>
+
+<p>Run the pipeline by calling the <code class="highlighter-rouge">run</code> 
method, which sends your pipeline to be executed by the pipeline runner that 
you specified when you created your pipeline.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">p</span><span class="o">.</span><span 
class="na">run</span><span class="o">();</span>
+</code></pre>
+</div>
+
+<h2 id="wordcount-example">WordCount Example</h2>
+
+<p>This WordCount example introduces a few recommended programming practices 
that can make your pipeline easier to read, write, and maintain. While not 
explicitly required, they can make your pipeline’s execution more flexible, 
aid in testing your pipeline, and help make your pipeline’s code reusable.</p>
+
+<p>This section assumes that you have a good understanding of the basic 
concepts in building a pipeline. If you feel that you aren’t at that point 
yet, read the above section, <a href="#minimalwordcount">Minimal 
WordCount</a>.</p>
+
+<p>To run this example, follow the instructions in the <a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/README.md#building-and-running";>Beam
 Examples README</a>. To view the full code, see <strong><a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java";>WordCount</a>.</strong></p>
+
+<p><strong>New Concepts:</strong></p>
+
+<ul>
+  <li>Applying <code class="highlighter-rouge">ParDo</code> with an explicit 
<code class="highlighter-rouge">DoFn</code></li>
+  <li>Creating Composite Transforms</li>
+  <li>Using Parameterizable <code 
class="highlighter-rouge">PipelineOptions</code></li>
+</ul>
+
+<p>The following sections explain these key concepts in detail, and break down 
the pipeline code into smaller sections.</p>
+
+<h3 id="specifying-explicit-dofns">Specifying Explicit DoFns</h3>
+
+<p>When using <code class="highlighter-rouge">ParDo</code> transforms, you 
need to specify the processing operation that gets applied to each element in 
the input <code class="highlighter-rouge">PCollection</code>. This processing 
operation is a subclass of the SDK class <code 
class="highlighter-rouge">DoFn</code>. You can create the <code 
class="highlighter-rouge">DoFn</code> subclasses for each <code 
class="highlighter-rouge">ParDo</code> inline, as an anonymous inner class 
instance, as is done in the previous example (Minimal WordCount). However, 
it’s often a good idea to define the <code 
class="highlighter-rouge">DoFn</code> at the global level, which makes it 
easier to unit test and can make the <code 
class="highlighter-rouge">ParDo</code> code more readable.</p>
+
+<p>In this example, <code class="highlighter-rouge">ExtractWordsFn</code> is a 
<code class="highlighter-rouge">DoFn</code> that is defined as a static 
class:</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">ExtractWordsFn</span> <span 
class="kd">extends</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="o">{</span>
+    <span class="o">...</span>
+
+    <span class="nd">@ProcessElement</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+        <span class="o">...</span>
+    <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<p>This <code class="highlighter-rouge">DoFn</code> (<code 
class="highlighter-rouge">ExtractWordsFn</code>) is the processing operation 
that <code class="highlighter-rouge">ParDo</code> applies to the <code 
class="highlighter-rouge">PCollection</code> of words:</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">words</span> <span class="o">=</span> <span 
class="n">lines</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">ExtractWordsFn</span><span class="o">()));</span>
+</code></pre>
+</div>
+
+<h3 id="creating-composite-transforms">Creating Composite Transforms</h3>
+
+<p>If you have a processing operation that consists of multiple transforms or 
<code class="highlighter-rouge">ParDo</code> steps, you can create it as a 
subclass of <code class="highlighter-rouge">PTransform</code>. Creating a <code 
class="highlighter-rouge">PTransform</code> subclass allows you to create 
complex reusable transforms, can make your pipeline’s structure more clear 
and modular, and makes unit testing easier.</p>
+
+<p>In this example, two transforms are encapsulated as the <code 
class="highlighter-rouge">PTransform</code> subclass <code 
class="highlighter-rouge">CountWords</code>. <code 
class="highlighter-rouge">CountWords</code> contains the <code 
class="highlighter-rouge">ParDo</code> that runs <code 
class="highlighter-rouge">ExtractWordsFn</code> and the SDK-provided <code 
class="highlighter-rouge">Count</code> transform.</p>
+
+<p>When <code class="highlighter-rouge">CountWords</code> is defined, we 
specify its ultimate input and output; the input is the <code 
class="highlighter-rouge">PCollection&lt;String&gt;</code> for the extraction 
operation, and the output is the <code 
class="highlighter-rouge">PCollection&lt;KV&lt;String, Long&gt;&gt;</code> 
produced by the count operation.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">static</span> <span class="kd">class</span> <span 
class="nc">CountWords</span> <span class="kd">extends</span> <span 
class="n">PTransform</span><span class="o">&lt;</span><span 
class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;,</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;&gt;</span> <span 
class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span 
class="nf">apply</span><span class="o">(</span><span 
class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">lines</span><span class="o">)</span> <span class="o">{</span>
+
+    <span class="c1">// Convert lines of text into individual words.</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> 
<span class="o">=</span> <span class="n">lines</span><span 
class="o">.</span><span class="na">apply</span><span class="o">(</span>
+        <span class="n">ParDo</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">ExtractWordsFn</span><span class="o">()));</span>
+
+    <span class="c1">// Count the number of times each word occurs.</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span 
class="n">wordCounts</span> <span class="o">=</span>
+        <span class="n">words</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">Count</span><span class="o">.&lt;</span><span 
class="n">String</span><span class="o">&gt;</span><span 
class="n">perElement</span><span class="o">());</span>
+
+    <span class="k">return</span> <span class="n">wordCounts</span><span 
class="o">;</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kt">void</span> <span class="nf">main</span><span 
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">IOException</span> <span class="o">{</span>
+  <span class="n">Pipeline</span> <span class="n">p</span> <span 
class="o">=</span> <span class="o">...</span>
+
+  <span class="n">p</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(...)</span>
+   <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">CountWords</span><span class="o">())</span>
+   <span class="o">...</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h3 id="using-parameterizable-pipelineoptions">Using Parameterizable 
PipelineOptions</h3>
+
+<p>You can hard-code various execution options when you run your pipeline. 
However, the more common way is to define your own configuration options via 
command-line argument parsing. Defining your configuration options via the 
command-line makes the code more easily portable across different runners.</p>
+
+<p>Add arguments to be processed by the command-line parser, and specify 
default values for them. You can then access the options values in your 
pipeline code.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">static</span> <span class="kd">interface</span> <span 
class="nc">WordCountOptions</span> <span class="kd">extends</span> <span 
class="n">PipelineOptions</span> <span class="o">{</span>
+  <span class="nd">@Description</span><span class="o">(</span><span 
class="s">"Path of the file to read from"</span><span class="o">)</span>
+  <span class="nd">@Default</span><span class="o">.</span><span 
class="na">String</span><span class="o">(</span><span 
class="s">"gs://dataflow-samples/shakespeare/kinglear.txt"</span><span 
class="o">)</span>
+  <span class="n">String</span> <span class="nf">getInputFile</span><span 
class="o">();</span>
+  <span class="kt">void</span> <span class="nf">setInputFile</span><span 
class="o">(</span><span class="n">String</span> <span 
class="n">value</span><span class="o">);</span>
+  <span class="o">...</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kt">void</span> <span class="nf">main</span><span 
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="o">{</span>
+  <span class="n">WordCountOptions</span> <span class="n">options</span> <span 
class="o">=</span> <span class="n">PipelineOptionsFactory</span><span 
class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span 
class="n">args</span><span class="o">).</span><span 
class="na">withValidation</span><span class="o">()</span>
+      <span class="o">.</span><span class="na">as</span><span 
class="o">(</span><span class="n">WordCountOptions</span><span 
class="o">.</span><span class="na">class</span><span class="o">);</span>
+  <span class="n">Pipeline</span> <span class="n">p</span> <span 
class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span 
class="na">create</span><span class="o">(</span><span 
class="n">options</span><span class="o">);</span>
+  <span class="o">...</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h2 id="debugging-wordcount-example">Debugging WordCount Example</h2>
+
+<p>The Debugging WordCount example demonstrates some best practices for 
instrumenting your pipeline code.</p>
+
+<p>To run this example, follow the instructions in the <a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/README.md#building-and-running";>Beam
 Examples README</a>. To view the full code, see <strong><a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java";>DebuggingWordCount</a>.</strong></p>
+
+<p><strong>New Concepts:</strong></p>
+
+<ul>
+  <li>Logging</li>
+  <li>Testing your Pipeline via <code 
class="highlighter-rouge">PAssert</code></li>
+</ul>
+
+<p>The following sections explain these key concepts in detail, and break down 
the pipeline code into smaller sections.</p>
+
+<h3 id="logging">Logging</h3>
+
+<p>Each runner may choose to handle logs in its own way.</p>
+
+<h4 id="direct-runner">Direct Runner</h4>
+
+<p>If you execute your pipeline using <code 
class="highlighter-rouge">DirectRunner</code>, it will print the log messages 
directly to your local console.</p>
+
+<h4 id="dataflow-runner">Dataflow Runner</h4>
+
+<p>If you execute your pipeline using <code 
class="highlighter-rouge">DataflowRunner</code>, you can use Google Cloud 
Logging. Google Cloud Logging (currently in beta) aggregates the logs from all 
of your Dataflow job’s workers to a single location in the Google Cloud 
Platform Console. You can use Cloud Logging to search and access the logs from 
all of the Compute Engine instances that Dataflow has spun up to complete your 
Dataflow job. You can add logging statements into your pipeline’s <code 
class="highlighter-rouge">DoFn</code> instances that will appear in Cloud 
Logging as your pipeline runs.</p>
+
+<p>In this example, we use <code class="highlighter-rouge">.trace</code> and 
<code class="highlighter-rouge">.debug</code>:</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">class</span> <span class="nc">DebuggingWordCount</span> <span 
class="o">{</span>
+
+  <span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">FilterTextFn</span> <span 
class="kd">extends</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;,</span> <span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
+    <span class="o">...</span>
+
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+      <span class="k">if</span> <span class="o">(...)</span> <span 
class="o">{</span>
+        <span class="o">...</span>
+        <span class="n">LOG</span><span class="o">.</span><span 
class="na">debug</span><span class="o">(</span><span class="s">"Matched: 
"</span> <span class="o">+</span> <span class="n">c</span><span 
class="o">.</span><span class="na">element</span><span 
class="o">().</span><span class="na">getKey</span><span class="o">());</span>
+      <span class="o">}</span> <span class="k">else</span> <span 
class="o">{</span>        
+        <span class="o">...</span>
+        <span class="n">LOG</span><span class="o">.</span><span 
class="na">trace</span><span class="o">(</span><span class="s">"Did not match: 
"</span> <span class="o">+</span> <span class="n">c</span><span 
class="o">.</span><span class="na">element</span><span 
class="o">().</span><span class="na">getKey</span><span class="o">());</span>
+      <span class="o">}</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+
+</code></pre>
+</div>
+
+<p>If you execute your pipeline using <code 
class="highlighter-rouge">DataflowRunner</code>, you can control the worker log 
levels. Dataflow workers that execute user code are configured to log to Cloud 
Logging by default at “INFO” log level and higher. You can override log 
levels for specific logging namespaces by specifying: <code 
class="highlighter-rouge">--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>.
 For example, by specifying <code 
class="highlighter-rouge">--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code>
 when executing this pipeline using the Dataflow service, Cloud Logging would 
contain only “DEBUG” or higher level logs for the package in addition to 
the default “INFO” or higher level logs.</p>
+
+<p>The default Dataflow worker logging configuration can be overridden by 
specifying <code class="highlighter-rouge">--defaultWorkerLogLevel=&lt;one of 
TRACE, DEBUG, INFO, WARN, ERROR&gt;</code>. For example, by specifying <code 
class="highlighter-rouge">--defaultWorkerLogLevel=DEBUG</code> when executing 
this pipeline with the Dataflow service, Cloud Logging would contain all 
“DEBUG” or higher level logs. Note that changing the default worker log 
level to TRACE or DEBUG will significantly increase the amount of logs 
output.</p>
+
+<h4 id="apache-spark-runner">Apache Spark Runner</h4>
+
+<blockquote>
+  <p><strong>Note:</strong> This section is yet to be added. There is an open 
issue for this (<a 
href="https://issues.apache.org/jira/browse/BEAM-792";>BEAM-792</a>).</p>
+</blockquote>
+
+<h4 id="apache-flink-runner">Apache Flink Runner</h4>
+
+<blockquote>
+  <p><strong>Note:</strong> This section is yet to be added. There is an open 
issue for this (<a 
href="https://issues.apache.org/jira/browse/BEAM-791";>BEAM-791</a>).</p>
+</blockquote>
+
+<h3 id="testing-your-pipeline-via-passert">Testing your Pipeline via 
PAssert</h3>
+
+<p><code class="highlighter-rouge">PAssert</code> is a set of convenient <code 
class="highlighter-rouge">PTransform</code>s in the style of Hamcrest’s 
collection matchers that can be used when writing Pipeline level tests to 
validate the contents of PCollections. <code 
class="highlighter-rouge">PAssert</code> is best used in unit tests with small 
data sets, but is demonstrated here as a teaching tool.</p>
+
+<p>Below, we verify that the set of filtered words matches our expected 
counts. Note that <code class="highlighter-rouge">PAssert</code> does not 
provide any output, and that successful completion of the pipeline implies that 
the expectations were met. See <a 
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java";>DebuggingWordCountTest</a>
 for an example unit test.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">static</span> <span class="kt">void</span> <span 
class="nf">main</span><span class="o">(</span><span 
class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="o">{</span>
+  <span class="o">...</span>
+  <span class="n">List</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span 
class="n">expectedResults</span> <span class="o">=</span> <span 
class="n">Arrays</span><span class="o">.</span><span 
class="na">asList</span><span class="o">(</span>
+        <span class="n">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="s">"Flourish"</span><span class="o">,</span> <span 
class="mi">3L</span><span class="o">),</span>
+        <span class="n">KV</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="s">"stomach"</span><span class="o">,</span> <span 
class="mi">1L</span><span class="o">));</span>
+  <span class="n">PAssert</span><span class="o">.</span><span 
class="na">that</span><span class="o">(</span><span 
class="n">filteredWords</span><span class="o">).</span><span 
class="na">containsInAnyOrder</span><span class="o">(</span><span 
class="n">expectedResults</span><span class="o">);</span>
+  <span class="o">...</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h2 id="windowedwordcount">WindowedWordCount</h2>
+
+<p>This example, <code class="highlighter-rouge">WindowedWordCount</code>, 
counts words in text just as the previous examples did, but introduces several 
advanced concepts.</p>
+
+<p><strong>New Concepts:</strong></p>
+
+<ul>
+  <li>Unbounded and bounded pipeline input modes</li>
+  <li>Adding timestamps to data</li>
+  <li>Windowing</li>
+  <li>Reusing PTransforms over windowed PCollections</li>
+</ul>
+
+<p>The following sections explain these key concepts in detail, and break down 
the pipeline code into smaller sections.</p>
+
+<h3 id="unbounded-and-bounded-pipeline-input-modes">Unbounded and bounded 
pipeline input modes</h3>
+
+<p>Beam allows you to create a single pipeline that can handle both bounded 
and unbounded types of input. If the input is unbounded, then all <code 
class="highlighter-rouge">PCollections</code> of the pipeline will be unbounded 
as well. The same goes for bounded input. If your input has a fixed number of 
elements, it’s considered a ‘bounded’ data set. If your input is 
continuously updating, then it’s considered ‘unbounded’.</p>
+
+<p>Recall that the input for this example is a a set of Shakespeare’s texts, 
finite data. Therefore, this example reads bounded data from a text file:</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">static</span> <span class="kt">void</span> <span 
class="nf">main</span><span class="o">(</span><span 
class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="kd">throws</span> 
<span class="n">IOException</span> <span class="o">{</span>
+    <span class="n">Options</span> <span class="n">options</span> <span 
class="o">=</span> <span class="o">...</span>
+    <span class="n">Pipeline</span> <span class="n">pipeline</span> <span 
class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span 
class="na">create</span><span class="o">(</span><span 
class="n">options</span><span class="o">);</span>
+
+    <span class="cm">/**
+     * Concept #1: The Beam SDK allows running the same pipeline with a 
bounded or unbounded input source.
+     */</span>
+    <span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span class="n">input</span> 
<span class="o">=</span> <span class="n">pipeline</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span 
class="na">Read</span><span class="o">.</span><span class="na">from</span><span 
class="o">(</span><span class="n">options</span><span class="o">.</span><span 
class="na">getInputFile</span><span class="o">()))</span>
+
+</code></pre>
+</div>
+
+<h3 id="adding-timestamps-to-data">Adding Timestamps to Data</h3>
+
+<p>Each element in a <code class="highlighter-rouge">PCollection</code> has an 
associated <strong>timestamp</strong>. The timestamp for each element is 
initially assigned by the source that creates the <code 
class="highlighter-rouge">PCollection</code> and can be adjusted by a <code 
class="highlighter-rouge">DoFn</code>. In this example the input is bounded. 
For the purpose of the example, the <code class="highlighter-rouge">DoFn</code> 
method named <code class="highlighter-rouge">AddTimestampsFn</code> (invoked by 
<code class="highlighter-rouge">ParDo</code>) will set a timestamp for each 
element in the <code class="highlighter-rouge">PCollection</code>.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// Concept #2: Add an element 
timestamp, using an artificial time just to show windowing.</span>
+<span class="c1">// See AddTimestampFn for more details on this.</span>
+<span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span class="k">new</span> <span 
class="n">AddTimestampFn</span><span class="o">()));</span>
+</code></pre>
+</div>
+
+<p>Below is the code for <code 
class="highlighter-rouge">AddTimestampFn</code>, a <code 
class="highlighter-rouge">DoFn</code> invoked by <code 
class="highlighter-rouge">ParDo</code>, that sets the data element of the 
timestamp given the element itself. For example, if the elements were log 
lines, this <code class="highlighter-rouge">ParDo</code> could parse the time 
out of the log string and set it as the element’s timestamp. There are no 
timestamps inherent in the works of Shakespeare, so in this case we’ve made 
up random timestamps just to illustrate the concept. Each line of the input 
text will get a random associated timestamp sometime in a 2-hour period.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="cm">/**
+   * Concept #2: A DoFn that sets the data element timestamp. This is a silly 
method, just for
+   * this example, for the bounded data case. Imagine that many ghosts of 
Shakespeare are all 
+   * typing madly at the same time to recreate his masterworks. Each line of 
the corpus will 
+   * get a random associated timestamp somewhere in a 2-hour period.
+   */</span>
+  <span class="kd">static</span> <span class="kd">class</span> <span 
class="nc">AddTimestampFn</span> <span class="kd">extends</span> <span 
class="n">DoFn</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
+    <span class="kd">private</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="n">Duration</span> <span 
class="n">RAND_RANGE</span> <span class="o">=</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardHours</span><span class="o">(</span><span 
class="mi">2</span><span class="o">);</span>
+    <span class="kd">private</span> <span class="kd">final</span> <span 
class="n">Instant</span> <span class="n">minTimestamp</span><span 
class="o">;</span>
+
+    <span class="n">AddTimestampFn</span><span class="o">()</span> <span 
class="o">{</span>
+      <span class="k">this</span><span class="o">.</span><span 
class="na">minTimestamp</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Instant</span><span 
class="o">(</span><span class="n">System</span><span class="o">.</span><span 
class="na">currentTimeMillis</span><span class="o">());</span>
+    <span class="o">}</span>
+
+    <span class="nd">@ProcessElement</span>
+    <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
+      <span class="c1">// Generate a timestamp that falls somewhere in the 
past two hours.</span>
+      <span class="kt">long</span> <span class="n">randMillis</span> <span 
class="o">=</span> <span class="o">(</span><span class="kt">long</span><span 
class="o">)</span> <span class="o">(</span><span class="n">Math</span><span 
class="o">.</span><span class="na">random</span><span class="o">()</span> <span 
class="o">*</span> <span class="n">RAND_RANGE</span><span 
class="o">.</span><span class="na">getMillis</span><span class="o">());</span>
+      <span class="n">Instant</span> <span class="n">randomTimestamp</span> 
<span class="o">=</span> <span class="n">minTimestamp</span><span 
class="o">.</span><span class="na">plus</span><span class="o">(</span><span 
class="n">randMillis</span><span class="o">);</span>
+      <span class="cm">/**
+       * Set the data element with that timestamp.
+       */</span>
+      <span class="n">c</span><span class="o">.</span><span 
class="na">outputWithTimestamp</span><span class="o">(</span><span 
class="n">c</span><span class="o">.</span><span class="na">element</span><span 
class="o">(),</span> <span class="k">new</span> <span 
class="n">Instant</span><span class="o">(</span><span 
class="n">randomTimestamp</span><span class="o">));</span>
+    <span class="o">}</span>
+  <span class="o">}</span>
+</code></pre>
+</div>
+
+<h3 id="windowing">Windowing</h3>
+
+<p>Beam uses a concept called <strong>Windowing</strong> to subdivide a <code 
class="highlighter-rouge">PCollection</code> according to the timestamps of its 
individual elements. <code class="highlighter-rouge">PTransforms</code> that 
aggregate multiple elements, process each <code 
class="highlighter-rouge">PCollection</code> as a succession of multiple, 
finite windows, even though the entire collection itself may be of infinite 
size (unbounded).</p>
+
+<p>The <code class="highlighter-rouge">WindowingWordCount</code> example 
applies fixed-time windowing, wherein each window represents a fixed time 
interval. The fixed window size for this example defaults to 1 minute (you can 
change this with a command-line option).</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="cm">/**
+ * Concept #3: Window into fixed windows. The fixed window size for this 
example defaults to 1
+ * minute (you can change this with a command-line option).
+ */</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">&gt;</span> <span 
class="n">windowedWords</span> <span class="o">=</span> <span 
class="n">input</span>
+  <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">Window</span><span 
class="o">.&lt;</span><span class="n">String</span><span 
class="o">&gt;</span><span class="n">into</span><span class="o">(</span>
+    <span class="n">FixedWindows</span><span class="o">.</span><span 
class="na">of</span><span class="o">(</span><span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardMinutes</span><span class="o">(</span><span 
class="n">options</span><span class="o">.</span><span 
class="na">getWindowSize</span><span class="o">()))));</span>
+</code></pre>
+</div>
+
+<h3 id="reusing-ptransforms-over-windowed-pcollections">Reusing PTransforms 
over windowed PCollections</h3>
+
+<p>You can reuse existing <code class="highlighter-rouge">PTransform</code>s, 
that were created for manipulating simple <code 
class="highlighter-rouge">PCollection</code>s, over windowed <code 
class="highlighter-rouge">PCollection</code>s as well.</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>/**
+ * Concept #4: Re-use our existing CountWords transform that does not have 
knowledge of
+ * windows over a PCollection containing windowed values.
+ */
+PCollection&lt;KV&lt;String, Long&gt;&gt; wordCounts = windowedWords.apply(new 
WordCount.CountWords());
+</code></pre>
+</div>
+
+<h2 id="write-results-to-an-unbounded-sink">Write Results to an Unbounded 
Sink</h2>
+
+<p>Since our input is unbounded, the same is true of our output <code 
class="highlighter-rouge">PCollection</code>. We need to make sure that we 
choose an appropriate, unbounded sink. Some output sinks support only bounded 
output, such as a text file. Google Cloud BigQuery is an output source that 
supports both bounded and unbounded input.</p>
+
+<p>In this example, we stream the results to a BigQuery table. The results are 
then formatted for a BigQuery table, and then written to BigQuery using 
BigQueryIO.Write.</p>
+
+<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="cm">/**
+ * Concept #5: Format the results for a BigQuery table, then write to BigQuery.
+ * The BigQuery output source supports both bounded and unbounded data.
+ */</span>
+<span class="n">wordCounts</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">FormatAsTableRowFn</span><span class="o">()))</span>
+    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">BigQueryIO</span><span 
class="o">.</span><span class="na">Write</span>
+      <span class="o">.</span><span class="na">to</span><span 
class="o">(</span><span class="n">getTableReference</span><span 
class="o">(</span><span class="n">options</span><span class="o">))</span>
+      <span class="o">.</span><span class="na">withSchema</span><span 
class="o">(</span><span class="n">getSchema</span><span class="o">())</span>
+      <span class="o">.</span><span 
class="na">withCreateDisposition</span><span class="o">(</span><span 
class="n">BigQueryIO</span><span class="o">.</span><span 
class="na">Write</span><span class="o">.</span><span 
class="na">CreateDisposition</span><span class="o">.</span><span 
class="na">CREATE_IF_NEEDED</span><span class="o">)</span>
+      <span class="o">.</span><span 
class="na">withWriteDisposition</span><span class="o">(</span><span 
class="n">BigQueryIO</span><span class="o">.</span><span 
class="na">Write</span><span class="o">.</span><span 
class="na">WriteDisposition</span><span class="o">.</span><span 
class="na">WRITE_APPEND</span><span class="o">));</span>
+</code></pre>
+</div>
+
+
+      </div>
+
+
+    <hr>
+  <div class="row">
+      <div class="col-xs-12">
+          <footer>
+              <p class="text-center">&copy; Copyright 2016
+                <a href="http://www.apache.org";>The Apache Software 
Foundation.</a> All Rights Reserved.</p>
+                <p class="text-center"><a href="/privacy_policy">Privacy 
Policy</a> |
+                <a href="/feed.xml">RSS Feed</a></p>
+          </footer>
+      </div>
+  </div>
+  <!-- container div end -->
+</div>
+
+
+  </body>
+
+</html>

Reply via email to