http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/27b5e76b/content/use/wordcount-example/index.html
----------------------------------------------------------------------
diff --git a/content/use/wordcount-example/index.html
b/content/use/wordcount-example/index.html
new file mode 100644
index 0000000..9ac76ef
--- /dev/null
+++ b/content/use/wordcount-example/index.html
@@ -0,0 +1,648 @@
+<!DOCTYPE html>
+<html lang="en">
+
+ <head>
+ <meta charset="utf-8">
+ <meta http-equiv="X-UA-Compatible" content="IE=edge">
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+
+ <title>Beam WordCount Example</title>
+ <meta name="description" content="Apache Beam is an open source, unified
model and set of language-specific SDKs for defining and executing data
processing workflows, and also data ingestion and integration flows, supporting
Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs).
Dataflow pipelines simplify the mechanics of large-scale batch and streaming
data processing and can run on a number of runtimes like Apache Flink, Apache
Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in
different languages, allowing users to easily implement their data integration
processes.
+">
+
+ <link rel="stylesheet" href="/styles/site.css">
+ <link rel="stylesheet" href="/css/theme.css">
+ <script
src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script>
+ <script src="/js/bootstrap.min.js"></script>
+ <link rel="canonical"
href="http://beam.incubator.apache.org/use/wordcount-example/"
data-proofer-ignore>
+ <link rel="alternate" type="application/rss+xml" title="Apache Beam
(incubating)" href="http://beam.incubator.apache.org/feed.xml">
+ <script>
+
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+ (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new
Date();a=s.createElement(o),
+
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+ ga('create', 'UA-73650088-1', 'auto');
+ ga('send', 'pageview');
+
+ </script>
+ <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
+</head>
+
+
+ <body role="document">
+
+ <nav class="navbar navbar-default navbar-fixed-top">
+ <div class="container">
+ <div class="navbar-header">
+ <a href="/" class="navbar-brand" >
+ <img alt="Brand" style="height: 25px"
src="/images/beam_logo_navbar.png">
+ </a>
+ <button type="button" class="navbar-toggle collapsed"
data-toggle="collapse" data-target="#navbar" aria-expanded="false"
aria-controls="navbar">
+ <span class="sr-only">Toggle navigation</span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ </button>
+ </div>
+ <div id="navbar" class="navbar-collapse collapse">
+ <ul class="nav navbar-nav">
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown"
role="button" aria-haspopup="true" aria-expanded="false">Use <span
class="caret"></span></a>
+ <ul class="dropdown-menu">
+ <li><a href="/use">User Hub</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">General</li>
+ <li><a href="/use/beam-overview/">Beam
Overview</a></li>
+ <li><a href="/use/quickstart/">Quickstart</a></li>
+ <li><a href="/use/releases">Release
Information</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Example Walkthroughs</li>
+ <li><a
href="/use/wordcount-example/">WordCount</a></li>
+ <li><a href="/use/walkthroughs/">Mobile
Gaming</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Support</li>
+ <li><a href="/use/mailing-lists/">Mailing
Lists</a></li>
+ <li><a href="/use/issue-tracking/">Issue Tracking</a></li>
+ <li><a
href="http://stackoverflow.com/questions/tagged/apache-beam">Beam on
StackOverflow</a></li>
+ <li><a href="http://apachebeam.slack.com">Beam Slack
Channel</a></li>
+ </ul>
+ </li>
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown"
role="button" aria-haspopup="true" aria-expanded="false">Learn <span
class="caret"></span></a>
+ <ul class="dropdown-menu">
+ <li><a href="/learn">Learner Hub</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Beam Concepts</li>
+ <li><a href="/learn/programming-guide/">Programming
Guide</a></li>
+ <li><a
href="/learn/presentation-materials/">Presentation Materials</a></li>
+ <li><a href="/learn/resources/">Additional
Resources</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">SDKs</li>
+ <li><a href="/learn/sdks/java/">Java SDK</a></li>
+ <li><a href="/learn/sdks/javadoc/">Java SDK API
Reference</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Runners</li>
+ <li><a
href="/learn/runners/capability-matrix/">Capability Matrix</a></li>
+ <li><a href="/learn/runners/direct/">Direct
Runner</a></li>
+ <li><a href="/learn/runners/flink/">Apache Flink
Runner</a></li>
+ <li><a href="/learn/runners/spark/">Apache Spark
Runner</a></li>
+ <li><a href="/learn/runners/dataflow/">Cloud Dataflow
Runner</a></li>
+ </ul>
+ </li>
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown"
role="button" aria-haspopup="true" aria-expanded="false">Contribute <span
class="caret"></span></a>
+ <ul class="dropdown-menu">
+ <li><a href="/contribute">Contributor Hub</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Basics</li>
+ <li><a
href="/contribute/contribution-guide/">Contribution Guide</a></li>
+ <li><a href="/contribute/work-in-progress/">Work In
Progress</a></li>
+ <li><a href="/use/mailing-lists/">Mailing
Lists</a></li>
+ <li><a href="/contribute/source-repository/">Source
Repository</a></li>
+ <li><a href="/use/issue-tracking/">Issue Tracking</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Technical References</li>
+ <li><a href="/contribute/testing/">Testing</a></li>
+ <li><a href="/contribute/design-principles/">Design
Principles</a></li>
+ <li><a href="https://goo.gl/nk5OM0">Technical
Vision</a></li>
+ <li role="separator" class="divider"></li>
+ <li class="dropdown-header">Committer Resources</li>
+ <li><a href="/contribute/release-guide/">Release Guide</a></li>
+ </ul>
+ </li>
+ <li><a href="/blog">Blog</a></li>
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown"
role="button" aria-haspopup="true" aria-expanded="false">Project<span
class="caret"></span></a>
+ <ul class="dropdown-menu">
+ <li><a href="/project/logos/">Logos and design</a></li>
+ <li><a href="/project/public-meetings/">Public Meetings</a></li>
+ <li><a href="/project/team/">Team</a></li>
+ </ul>
+ </li>
+ </ul>
+ <ul class="nav navbar-nav navbar-right">
+ <li class="dropdown">
+ <a href="#" class="dropdown-toggle" data-toggle="dropdown"
role="button" aria-haspopup="true" aria-expanded="false"><img
src="https://www.apache.org/foundation/press/kit/feather_small.png" alt="Apache
Logo" style="height:24px;">Apache Software Foundation<span
class="caret"></span></a>
+ <ul class="dropdown-menu dropdown-menu-right">
+ <li><a href="http://www.apache.org/">ASF Homepage</a></li>
+ <li><a href="http://www.apache.org/licenses/">License</a></li>
+ <li><a href="http://www.apache.org/security/">Security</a></li>
+ <li><a
href="http://www.apache.org/foundation/thanks.html">Thanks</a></li>
+ <li><a
href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li>
+ <li><a
href="https://www.apache.org/foundation/policies/conduct">Code of
Conduct</a></li>
+ </ul>
+ </li>
+ </ul>
+ </div><!--/.nav-collapse -->
+ </div>
+</nav>
+
+
+<link rel="stylesheet" href="">
+
+
+ <div class="container" role="main">
+
+ <div class="row">
+ <h1 id="apache-beam-wordcount-example">Apache Beam WordCount
Example</h1>
+
+<ul id="markdown-toc">
+ <li><a href="#minimalwordcount"
id="markdown-toc-minimalwordcount">MinimalWordCount</a> <ul>
+ <li><a href="#creating-the-pipeline"
id="markdown-toc-creating-the-pipeline">Creating the Pipeline</a></li>
+ <li><a href="#applying-pipeline-transforms"
id="markdown-toc-applying-pipeline-transforms">Applying Pipeline
Transforms</a></li>
+ <li><a href="#running-the-pipeline"
id="markdown-toc-running-the-pipeline">Running the Pipeline</a></li>
+ </ul>
+ </li>
+ <li><a href="#wordcount-example"
id="markdown-toc-wordcount-example">WordCount Example</a> <ul>
+ <li><a href="#specifying-explicit-dofns"
id="markdown-toc-specifying-explicit-dofns">Specifying Explicit DoFns</a></li>
+ <li><a href="#creating-composite-transforms"
id="markdown-toc-creating-composite-transforms">Creating Composite
Transforms</a></li>
+ <li><a href="#using-parameterizable-pipelineoptions"
id="markdown-toc-using-parameterizable-pipelineoptions">Using Parameterizable
PipelineOptions</a></li>
+ </ul>
+ </li>
+ <li><a href="#debugging-wordcount-example"
id="markdown-toc-debugging-wordcount-example">Debugging WordCount Example</a>
<ul>
+ <li><a href="#logging" id="markdown-toc-logging">Logging</a> <ul>
+ <li><a href="#direct-runner" id="markdown-toc-direct-runner">Direct
Runner</a></li>
+ <li><a href="#dataflow-runner"
id="markdown-toc-dataflow-runner">Dataflow Runner</a></li>
+ <li><a href="#apache-spark-runner"
id="markdown-toc-apache-spark-runner">Apache Spark Runner</a></li>
+ <li><a href="#apache-flink-runner"
id="markdown-toc-apache-flink-runner">Apache Flink Runner</a></li>
+ </ul>
+ </li>
+ <li><a href="#testing-your-pipeline-via-passert"
id="markdown-toc-testing-your-pipeline-via-passert">Testing your Pipeline via
PAssert</a></li>
+ </ul>
+ </li>
+ <li><a href="#windowedwordcount"
id="markdown-toc-windowedwordcount">WindowedWordCount</a> <ul>
+ <li><a href="#unbounded-and-bounded-pipeline-input-modes"
id="markdown-toc-unbounded-and-bounded-pipeline-input-modes">Unbounded and
bounded pipeline input modes</a></li>
+ <li><a href="#adding-timestamps-to-data"
id="markdown-toc-adding-timestamps-to-data">Adding Timestamps to Data</a></li>
+ <li><a href="#windowing" id="markdown-toc-windowing">Windowing</a></li>
+ <li><a href="#reusing-ptransforms-over-windowed-pcollections"
id="markdown-toc-reusing-ptransforms-over-windowed-pcollections">Reusing
PTransforms over windowed PCollections</a></li>
+ </ul>
+ </li>
+ <li><a href="#write-results-to-an-unbounded-sink"
id="markdown-toc-write-results-to-an-unbounded-sink">Write Results to an
Unbounded Sink</a></li>
+</ul>
+
+<blockquote>
+ <p><strong>Note:</strong> This walkthrough is still in progress. Detailed
instructions for running the example pipelines across multiple runners are yet
to be added. There is an open issue to finish the walkthrough (<a
href="https://issues.apache.org/jira/browse/BEAM-664">BEAM-664</a>).</p>
+</blockquote>
+
+<p>The WordCount examples demonstrate how to set up a processing pipeline that
can read text, tokenize the text lines into individual words, and perform a
frequency count on each of those words. The Beam SDKs contain a series of these
four successively more detailed WordCount examples that build on each other.
The input text for all the examples is a set of Shakespeareâs texts.</p>
+
+<p>Each WordCount example introduces different concepts in the Beam
programming model. Begin by understanding Minimal WordCount, the simplest of
the examples. Once you feel comfortable with the basic principles in building a
pipeline, continue on to learn more concepts in the other examples.</p>
+
+<ul>
+ <li><strong>Minimal WordCount</strong> demonstrates the basic principles
involved in building a pipeline.</li>
+ <li><strong>WordCount</strong> introduces some of the more common best
practices in creating re-usable and maintainable pipelines.</li>
+ <li><strong>Debugging WordCount</strong> introduces logging and debugging
practices.</li>
+ <li><strong>Windowed WordCount</strong> demonstrates how you can use
Beamâs programming model to handle both bounded and unbounded datasets.</li>
+</ul>
+
+<h2 id="minimalwordcount">MinimalWordCount</h2>
+
+<p>Minimal WordCount demonstrates a simple pipeline that can read from a text
file, apply transforms to tokenize and count the words, and write the data to
an output text file. This example hard-codes the locations for its input and
output files and doesnât perform any error checking; it is intended to only
show you the âbare bonesâ of creating a Beam pipeline. This lack of
parameterization makes this particular pipeline less portable across different
runners than standard Beam pipelines. In later examples, we will parameterize
the pipelineâs input and output sources and show other best practices.</p>
+
+<p>To run this example, follow the instructions in the <a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/README.md#building-and-running">Beam
Examples README</a>. To view the full code, see <strong><a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java">MinimalWordCount</a>.</strong></p>
+
+<p><strong>Key Concepts:</strong></p>
+
+<ul>
+ <li>Creating the Pipeline</li>
+ <li>Applying transforms to the Pipeline</li>
+ <li>Reading input (in this example: reading text files)</li>
+ <li>Applying ParDo transforms</li>
+ <li>Applying SDK-provided transforms (in this example: Count)</li>
+ <li>Writing output (in this example: writing to a text file)</li>
+ <li>Running the Pipeline</li>
+</ul>
+
+<p>The following sections explain these concepts in detail along with excerpts
of the relevant code from the Minimal WordCount pipeline.</p>
+
+<h3 id="creating-the-pipeline">Creating the Pipeline</h3>
+
+<p>The first step in creating a Beam pipeline is to create a <code
class="highlighter-rouge">PipelineOptions object</code>. This object lets us
set various options for our pipeline, such as the pipeline runner that will
execute our pipeline and any runner-specific configuration required by the
chosen runner. In this example we set these options programmatically, but more
often command-line arguments are used to set <code
class="highlighter-rouge">PipelineOptions</code>.</p>
+
+<p>You can specify a runner for executing your pipeline, such as the <code
class="highlighter-rouge">DataflowRunner</code> or <code
class="highlighter-rouge">SparkRunner</code>. If you omit specifying a runner,
as in this example, your pipeline will be executed locally using the <code
class="highlighter-rouge">DirectRunner</code>. In the next sections, we will
specify the pipelineâs runner.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>
<span class="n">PipelineOptions</span> <span class="n">options</span> <span
class="o">=</span> <span class="n">PipelineOptionsFactory</span><span
class="o">.</span><span class="na">create</span><span class="o">();</span>
+
+ <span class="c1">// In order to run your pipeline, you need to make
following runner specific changes:</span>
+ <span class="c1">//</span>
+ <span class="c1">// CHANGE 1/3: Select a Beam runner, such as
DataflowRunner or FlinkRunner.</span>
+ <span class="c1">// CHANGE 2/3: Specify runner-required options.</span>
+ <span class="c1">// For DataflowRunner, set project and temp location as
follows:</span>
+ <span class="c1">// DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);</span>
+ <span class="c1">//
dataflowOptions.setRunner(DataflowRunner.class);</span>
+ <span class="c1">//
dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");</span>
+ <span class="c1">//
dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");</span>
+ <span class="c1">// For FlinkRunner, set the runner as follows. See {@code
FlinkPipelineOptions}</span>
+ <span class="c1">// for more details.</span>
+ <span class="c1">// options.setRunner(FlinkRunner.class);</span>
+</code></pre>
+</div>
+
+<p>The next step is to create a Pipeline object with the options weâve just
constructed. The Pipeline object builds up the graph of transformations to be
executed, associated with that particular pipeline.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="n">Pipeline</span> <span
class="n">p</span> <span class="o">=</span> <span
class="n">Pipeline</span><span class="o">.</span><span
class="na">create</span><span class="o">(</span><span
class="n">options</span><span class="o">);</span>
+</code></pre>
+</div>
+
+<h3 id="applying-pipeline-transforms">Applying Pipeline Transforms</h3>
+
+<p>The Minimal WordCount pipeline contains several transforms to read data
into the pipeline, manipulate or otherwise transform the data, and write out
the results. Each transform represents an operation in the pipeline.</p>
+
+<p>Each transform takes some kind of input (data or otherwise), and produces
some output data. The input and output data is represented by the SDK class
<code class="highlighter-rouge">PCollection</code>. <code
class="highlighter-rouge">PCollection</code> is a special class, provided by
the Beam SDK, that you can use to represent a data set of virtually any size,
including infinite data sets.</p>
+
+<p><img src=â/images/wordcount-pipeline.png alt=âWord Count pipeline
diagramââ>
+Figure 1: The pipeline data flow.</p>
+
+<p>The Minimal WordCount pipeline contains five transforms:</p>
+
+<ol>
+ <li>
+ <p>A text file <code class="highlighter-rouge">Read</code> transform is
applied to the Pipeline object itself, and produces a <code
class="highlighter-rouge">PCollection</code> as output. Each element in the
output PCollection represents one line of text from the input file. This
example happens to use input data stored in a publicly accessible Google Cloud
Storage bucket (âgs://â).</p>
+
+ <div class="language-java highlighter-rouge"><pre class="highlight"><code>
<span class="n">p</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">TextIO</span><span class="o">.</span><span
class="na">Read</span><span class="o">.</span><span class="na">from</span><span
class="o">(</span><span
class="s">"gs://apache-beam-samples/shakespeare/*"</span><span
class="o">))</span>
+</code></pre>
+ </div>
+ </li>
+ <li>
+ <p>A <a href="/learn/programming-guide/#transforms-pardo">ParDo</a>
transform that invokes a <code class="highlighter-rouge">DoFn</code> (defined
in-line as an anonymous class) on each element that tokenizes the text lines
into individual words. The input for this transform is the <code
class="highlighter-rouge">PCollection</code> of text lines generated by the
previous <code class="highlighter-rouge">TextIO.Read</code> transform. The
<code class="highlighter-rouge">ParDo</code> transform outputs a new <code
class="highlighter-rouge">PCollection</code>, where each element represents an
individual word in the text.</p>
+
+ <div class="language-java highlighter-rouge"><pre class="highlight"><code>
<span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="s">"ExtractWords"</span><span class="o">,</span>
<span class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="k">new</span> <span
class="n">DoFn</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">String</span><span class="o">>()</span> <span class="o">{</span>
+ <span class="nd">@ProcessElement</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">processElement</span><span class="o">(</span><span
class="n">ProcessContext</span> <span class="n">c</span><span
class="o">)</span> <span class="o">{</span>
+ <span class="k">for</span> <span class="o">(</span><span
class="n">String</span> <span class="n">word</span> <span class="o">:</span>
<span class="n">c</span><span class="o">.</span><span
class="na">element</span><span class="o">().</span><span
class="na">split</span><span class="o">(</span><span
class="s">"[^a-zA-Z']+"</span><span class="o">))</span> <span class="o">{</span>
+ <span class="k">if</span> <span class="o">(!</span><span
class="n">word</span><span class="o">.</span><span
class="na">isEmpty</span><span class="o">())</span> <span class="o">{</span>
+ <span class="n">c</span><span class="o">.</span><span
class="na">output</span><span class="o">(</span><span
class="n">word</span><span class="o">);</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+ <span class="o">}))</span>
+</code></pre>
+ </div>
+ </li>
+ <li>
+ <p>The SDK-provided <code class="highlighter-rouge">Count</code> transform
is a generic transform that takes a <code
class="highlighter-rouge">PCollection</code> of any type, and returns a <code
class="highlighter-rouge">PCollection</code> of key/value pairs. Each key
represents a unique element from the input collection, and each value
represents the number of times that key appeared in the input collection.</p>
+
+ <p>In this pipeline, the input for <code
class="highlighter-rouge">Count</code> is the <code
class="highlighter-rouge">PCollection</code> of individual words generated by
the previous <code class="highlighter-rouge">ParDo</code>, and the output is a
<code class="highlighter-rouge">PCollection</code> of key/value pairs where
each key represents a unique word in the text and the associated value is the
occurrence count for each.</p>
+
+ <div class="language-java highlighter-rouge"><pre class="highlight"><code>
<span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">Count</span><span class="o">.<</span><span
class="n">String</span><span class="o">></span><span
class="n">perElement</span><span class="o">())</span>
+</code></pre>
+ </div>
+ </li>
+ <li>
+ <p>The next transform formats each of the key/value pairs of unique words
and occurrence counts into a printable string suitable for writing to an output
file.</p>
+
+ <p><code class="highlighter-rouge">MapElements</code> is a higher-level
composite transform that encapsulates a simple <code
class="highlighter-rouge">ParDo</code>; for each element in the input <code
class="highlighter-rouge">PCollection</code>, <code
class="highlighter-rouge">MapElements</code> applies a function that produces
exactly one output element. In this example, <code
class="highlighter-rouge">MapElements</code> invokes a <code
class="highlighter-rouge">SimpleFunction</code> (defined in-line as an
anonymous class) that does the formatting. As input, <code
class="highlighter-rouge">MapElements</code> takes a <code
class="highlighter-rouge">PCollection</code> of key/value pairs generated by
<code class="highlighter-rouge">Count</code>, and produces a new <code
class="highlighter-rouge">PCollection</code> of printable strings.</p>
+
+ <div class="language-java highlighter-rouge"><pre class="highlight"><code>
<span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="s">"FormatResults"</span><span
class="o">,</span> <span class="n">MapElements</span><span
class="o">.</span><span class="na">via</span><span class="o">(</span><span
class="k">new</span> <span class="n">SimpleFunction</span><span
class="o"><</span><span class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>,</span> <span
class="n">String</span><span class="o">>()</span> <span class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">String</span> <span
class="nf">apply</span><span class="o">(</span><span class="n">KV</span><span
class="o"><</span><span class="n">String</span><span class="o">,</span>
<span class="n">Long</span><span class="o">></span> <span
class="n">input</span><span class="o">)</span> <span class="o">{</span>
+ <span class="k">return</span> <span
class="n">input</span><span class="o">.</span><span
class="na">getKey</span><span class="o">()</span> <span class="o">+</span>
<span class="s">": "</span> <span class="o">+</span> <span
class="n">input</span><span class="o">.</span><span
class="na">getValue</span><span class="o">();</span>
+ <span class="o">}</span>
+ <span class="o">}))</span>
+</code></pre>
+ </div>
+ </li>
+ <li>
+ <p>A text file <code class="highlighter-rouge">Write</code>. This
transform takes the final <code class="highlighter-rouge">PCollection</code> of
formatted Strings as input and writes each element to an output text file. Each
element in the input <code class="highlighter-rouge">PCollection</code>
represents one line of text in the resulting output file.</p>
+
+ <div class="language-java highlighter-rouge"><pre class="highlight"><code>
<span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span
class="na">Write</span><span class="o">.</span><span class="na">to</span><span
class="o">(</span><span
class="s">"gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"</span><span
class="o">));</span>
+</code></pre>
+ </div>
+ </li>
+</ol>
+
+<p>Note that the <code class="highlighter-rouge">Write</code> transform
produces a trivial result value of type <code
class="highlighter-rouge">PDone</code>, which in this case is ignored.</p>
+
+<h3 id="running-the-pipeline">Running the Pipeline</h3>
+
+<p>Run the pipeline by calling the <code class="highlighter-rouge">run</code>
method, which sends your pipeline to be executed by the pipeline runner that
you specified when you created your pipeline.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="n">p</span><span class="o">.</span><span
class="na">run</span><span class="o">();</span>
+</code></pre>
+</div>
+
+<h2 id="wordcount-example">WordCount Example</h2>
+
+<p>This WordCount example introduces a few recommended programming practices
that can make your pipeline easier to read, write, and maintain. While not
explicitly required, they can make your pipelineâs execution more flexible,
aid in testing your pipeline, and help make your pipelineâs code reusable.</p>
+
+<p>This section assumes that you have a good understanding of the basic
concepts in building a pipeline. If you feel that you arenât at that point
yet, read the above section, <a href="#minimalwordcount">Minimal
WordCount</a>.</p>
+
+<p>To run this example, follow the instructions in the <a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/README.md#building-and-running">Beam
Examples README</a>. To view the full code, see <strong><a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java">WordCount</a>.</strong></p>
+
+<p><strong>New Concepts:</strong></p>
+
+<ul>
+ <li>Applying <code class="highlighter-rouge">ParDo</code> with an explicit
<code class="highlighter-rouge">DoFn</code></li>
+ <li>Creating Composite Transforms</li>
+ <li>Using Parameterizable <code
class="highlighter-rouge">PipelineOptions</code></li>
+</ul>
+
+<p>The following sections explain these key concepts in detail, and break down
the pipeline code into smaller sections.</p>
+
+<h3 id="specifying-explicit-dofns">Specifying Explicit DoFns</h3>
+
+<p>When using <code class="highlighter-rouge">ParDo</code> transforms, you
need to specify the processing operation that gets applied to each element in
the input <code class="highlighter-rouge">PCollection</code>. This processing
operation is a subclass of the SDK class <code
class="highlighter-rouge">DoFn</code>. You can create the <code
class="highlighter-rouge">DoFn</code> subclasses for each <code
class="highlighter-rouge">ParDo</code> inline, as an anonymous inner class
instance, as is done in the previous example (Minimal WordCount). However,
itâs often a good idea to define the <code
class="highlighter-rouge">DoFn</code> at the global level, which makes it
easier to unit test and can make the <code
class="highlighter-rouge">ParDo</code> code more readable.</p>
+
+<p>In this example, <code class="highlighter-rouge">ExtractWordsFn</code> is a
<code class="highlighter-rouge">DoFn</code> that is defined as a static
class:</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="kd">static</span> <span
class="kd">class</span> <span class="nc">ExtractWordsFn</span> <span
class="kd">extends</span> <span class="n">DoFn</span><span
class="o"><</span><span class="n">String</span><span class="o">,</span>
<span class="n">String</span><span class="o">></span> <span
class="o">{</span>
+ <span class="o">...</span>
+
+ <span class="nd">@ProcessElement</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">processElement</span><span class="o">(</span><span
class="n">ProcessContext</span> <span class="n">c</span><span
class="o">)</span> <span class="o">{</span>
+ <span class="o">...</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<p>This <code class="highlighter-rouge">DoFn</code> (<code
class="highlighter-rouge">ExtractWordsFn</code>) is the processing operation
that <code class="highlighter-rouge">ParDo</code> applies to the <code
class="highlighter-rouge">PCollection</code> of words:</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="n">PCollection</span><span
class="o"><</span><span class="n">String</span><span class="o">></span>
<span class="n">words</span> <span class="o">=</span> <span
class="n">lines</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span
class="o">(</span><span class="k">new</span> <span
class="n">ExtractWordsFn</span><span class="o">()));</span>
+</code></pre>
+</div>
+
+<h3 id="creating-composite-transforms">Creating Composite Transforms</h3>
+
+<p>If you have a processing operation that consists of multiple transforms or
<code class="highlighter-rouge">ParDo</code> steps, you can create it as a
subclass of <code class="highlighter-rouge">PTransform</code>. Creating a <code
class="highlighter-rouge">PTransform</code> subclass allows you to create
complex reusable transforms, can make your pipelineâs structure more clear
and modular, and makes unit testing easier.</p>
+
+<p>In this example, two transforms are encapsulated as the <code
class="highlighter-rouge">PTransform</code> subclass <code
class="highlighter-rouge">CountWords</code>. <code
class="highlighter-rouge">CountWords</code> contains the <code
class="highlighter-rouge">ParDo</code> that runs <code
class="highlighter-rouge">ExtractWordsFn</code> and the SDK-provided <code
class="highlighter-rouge">Count</code> transform.</p>
+
+<p>When <code class="highlighter-rouge">CountWords</code> is defined, we
specify its ultimate input and output; the input is the <code
class="highlighter-rouge">PCollection<String></code> for the extraction
operation, and the output is the <code
class="highlighter-rouge">PCollection<KV<String, Long>></code>
produced by the count operation.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="kd">public</span> <span
class="kd">static</span> <span class="kd">class</span> <span
class="nc">CountWords</span> <span class="kd">extends</span> <span
class="n">PTransform</span><span class="o"><</span><span
class="n">PCollection</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span>
+ <span class="n">PCollection</span><span class="o"><</span><span
class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>>></span> <span
class="o">{</span>
+ <span class="nd">@Override</span>
+ <span class="kd">public</span> <span class="n">PCollection</span><span
class="o"><</span><span class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>></span> <span
class="nf">apply</span><span class="o">(</span><span
class="n">PCollection</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">lines</span><span class="o">)</span> <span class="o">{</span>
+
+ <span class="c1">// Convert lines of text into individual words.</span>
+ <span class="n">PCollection</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span class="n">words</span>
<span class="o">=</span> <span class="n">lines</span><span
class="o">.</span><span class="na">apply</span><span class="o">(</span>
+ <span class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="k">new</span> <span
class="n">ExtractWordsFn</span><span class="o">()));</span>
+
+ <span class="c1">// Count the number of times each word occurs.</span>
+ <span class="n">PCollection</span><span class="o"><</span><span
class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>></span> <span
class="n">wordCounts</span> <span class="o">=</span>
+ <span class="n">words</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">Count</span><span class="o">.<</span><span
class="n">String</span><span class="o">></span><span
class="n">perElement</span><span class="o">());</span>
+
+ <span class="k">return</span> <span class="n">wordCounts</span><span
class="o">;</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kt">void</span> <span class="nf">main</span><span
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span
class="n">args</span><span class="o">)</span> <span class="kd">throws</span>
<span class="n">IOException</span> <span class="o">{</span>
+ <span class="n">Pipeline</span> <span class="n">p</span> <span
class="o">=</span> <span class="o">...</span>
+
+ <span class="n">p</span><span class="o">.</span><span
class="na">apply</span><span class="o">(...)</span>
+ <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="k">new</span> <span
class="n">CountWords</span><span class="o">())</span>
+ <span class="o">...</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h3 id="using-parameterizable-pipelineoptions">Using Parameterizable
PipelineOptions</h3>
+
+<p>You can hard-code various execution options when you run your pipeline.
However, the more common way is to define your own configuration options via
command-line argument parsing. Defining your configuration options via the
command-line makes the code more easily portable across different runners.</p>
+
+<p>Add arguments to be processed by the command-line parser, and specify
default values for them. You can then access the options values in your
pipeline code.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="kd">public</span> <span
class="kd">static</span> <span class="kd">interface</span> <span
class="nc">WordCountOptions</span> <span class="kd">extends</span> <span
class="n">PipelineOptions</span> <span class="o">{</span>
+ <span class="nd">@Description</span><span class="o">(</span><span
class="s">"Path of the file to read from"</span><span class="o">)</span>
+ <span class="nd">@Default</span><span class="o">.</span><span
class="na">String</span><span class="o">(</span><span
class="s">"gs://dataflow-samples/shakespeare/kinglear.txt"</span><span
class="o">)</span>
+ <span class="n">String</span> <span class="nf">getInputFile</span><span
class="o">();</span>
+ <span class="kt">void</span> <span class="nf">setInputFile</span><span
class="o">(</span><span class="n">String</span> <span
class="n">value</span><span class="o">);</span>
+ <span class="o">...</span>
+<span class="o">}</span>
+
+<span class="kd">public</span> <span class="kd">static</span> <span
class="kt">void</span> <span class="nf">main</span><span
class="o">(</span><span class="n">String</span><span class="o">[]</span> <span
class="n">args</span><span class="o">)</span> <span class="o">{</span>
+ <span class="n">WordCountOptions</span> <span class="n">options</span> <span
class="o">=</span> <span class="n">PipelineOptionsFactory</span><span
class="o">.</span><span class="na">fromArgs</span><span class="o">(</span><span
class="n">args</span><span class="o">).</span><span
class="na">withValidation</span><span class="o">()</span>
+ <span class="o">.</span><span class="na">as</span><span
class="o">(</span><span class="n">WordCountOptions</span><span
class="o">.</span><span class="na">class</span><span class="o">);</span>
+ <span class="n">Pipeline</span> <span class="n">p</span> <span
class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span
class="na">create</span><span class="o">(</span><span
class="n">options</span><span class="o">);</span>
+ <span class="o">...</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h2 id="debugging-wordcount-example">Debugging WordCount Example</h2>
+
+<p>The Debugging WordCount example demonstrates some best practices for
instrumenting your pipeline code.</p>
+
+<p>To run this example, follow the instructions in the <a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/README.md#building-and-running">Beam
Examples README</a>. To view the full code, see <strong><a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java">DebuggingWordCount</a>.</strong></p>
+
+<p><strong>New Concepts:</strong></p>
+
+<ul>
+ <li>Logging</li>
+ <li>Testing your Pipeline via <code
class="highlighter-rouge">PAssert</code></li>
+</ul>
+
+<p>The following sections explain these key concepts in detail, and break down
the pipeline code into smaller sections.</p>
+
+<h3 id="logging">Logging</h3>
+
+<p>Each runner may choose to handle logs in its own way.</p>
+
+<h4 id="direct-runner">Direct Runner</h4>
+
+<p>If you execute your pipeline using <code
class="highlighter-rouge">DirectRunner</code>, it will print the log messages
directly to your local console.</p>
+
+<h4 id="dataflow-runner">Dataflow Runner</h4>
+
+<p>If you execute your pipeline using <code
class="highlighter-rouge">DataflowRunner</code>, you can use Google Cloud
Logging. Google Cloud Logging (currently in beta) aggregates the logs from all
of your Dataflow jobâs workers to a single location in the Google Cloud
Platform Console. You can use Cloud Logging to search and access the logs from
all of the Compute Engine instances that Dataflow has spun up to complete your
Dataflow job. You can add logging statements into your pipelineâs <code
class="highlighter-rouge">DoFn</code> instances that will appear in Cloud
Logging as your pipeline runs.</p>
+
+<p>In this example, we use <code class="highlighter-rouge">.trace</code> and
<code class="highlighter-rouge">.debug</code>:</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="kd">public</span> <span
class="kd">class</span> <span class="nc">DebuggingWordCount</span> <span
class="o">{</span>
+
+ <span class="kd">public</span> <span class="kd">static</span> <span
class="kd">class</span> <span class="nc">FilterTextFn</span> <span
class="kd">extends</span> <span class="n">DoFn</span><span
class="o"><</span><span class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>,</span> <span
class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>></span> <span class="o">{</span>
+ <span class="o">...</span>
+
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">processElement</span><span class="o">(</span><span
class="n">ProcessContext</span> <span class="n">c</span><span
class="o">)</span> <span class="o">{</span>
+ <span class="k">if</span> <span class="o">(...)</span> <span
class="o">{</span>
+ <span class="o">...</span>
+ <span class="n">LOG</span><span class="o">.</span><span
class="na">debug</span><span class="o">(</span><span class="s">"Matched:
"</span> <span class="o">+</span> <span class="n">c</span><span
class="o">.</span><span class="na">element</span><span
class="o">().</span><span class="na">getKey</span><span class="o">());</span>
+ <span class="o">}</span> <span class="k">else</span> <span
class="o">{</span>
+ <span class="o">...</span>
+ <span class="n">LOG</span><span class="o">.</span><span
class="na">trace</span><span class="o">(</span><span class="s">"Did not match:
"</span> <span class="o">+</span> <span class="n">c</span><span
class="o">.</span><span class="na">element</span><span
class="o">().</span><span class="na">getKey</span><span class="o">());</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+<span class="o">}</span>
+
+</code></pre>
+</div>
+
+<p>If you execute your pipeline using <code
class="highlighter-rouge">DataflowRunner</code>, you can control the worker log
levels. Dataflow workers that execute user code are configured to log to Cloud
Logging by default at âINFOâ log level and higher. You can override log
levels for specific logging namespaces by specifying: <code
class="highlighter-rouge">--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>.
For example, by specifying <code
class="highlighter-rouge">--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code>
when executing this pipeline using the Dataflow service, Cloud Logging would
contain only âDEBUGâ or higher level logs for the package in addition to
the default âINFOâ or higher level logs.</p>
+
+<p>The default Dataflow worker logging configuration can be overridden by
specifying <code class="highlighter-rouge">--defaultWorkerLogLevel=<one of
TRACE, DEBUG, INFO, WARN, ERROR></code>. For example, by specifying <code
class="highlighter-rouge">--defaultWorkerLogLevel=DEBUG</code> when executing
this pipeline with the Dataflow service, Cloud Logging would contain all
âDEBUGâ or higher level logs. Note that changing the default worker log
level to TRACE or DEBUG will significantly increase the amount of logs
output.</p>
+
+<h4 id="apache-spark-runner">Apache Spark Runner</h4>
+
+<blockquote>
+ <p><strong>Note:</strong> This section is yet to be added. There is an open
issue for this (<a
href="https://issues.apache.org/jira/browse/BEAM-792">BEAM-792</a>).</p>
+</blockquote>
+
+<h4 id="apache-flink-runner">Apache Flink Runner</h4>
+
+<blockquote>
+ <p><strong>Note:</strong> This section is yet to be added. There is an open
issue for this (<a
href="https://issues.apache.org/jira/browse/BEAM-791">BEAM-791</a>).</p>
+</blockquote>
+
+<h3 id="testing-your-pipeline-via-passert">Testing your Pipeline via
PAssert</h3>
+
+<p><code class="highlighter-rouge">PAssert</code> is a set of convenient <code
class="highlighter-rouge">PTransform</code>s in the style of Hamcrestâs
collection matchers that can be used when writing Pipeline level tests to
validate the contents of PCollections. <code
class="highlighter-rouge">PAssert</code> is best used in unit tests with small
data sets, but is demonstrated here as a teaching tool.</p>
+
+<p>Below, we verify that the set of filtered words matches our expected
counts. Note that <code class="highlighter-rouge">PAssert</code> does not
provide any output, and that successful completion of the pipeline implies that
the expectations were met. See <a
href="https://github.com/apache/incubator-beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java">DebuggingWordCountTest</a>
for an example unit test.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="kd">public</span> <span
class="kd">static</span> <span class="kt">void</span> <span
class="nf">main</span><span class="o">(</span><span
class="n">String</span><span class="o">[]</span> <span
class="n">args</span><span class="o">)</span> <span class="o">{</span>
+ <span class="o">...</span>
+ <span class="n">List</span><span class="o"><</span><span
class="n">KV</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">>></span> <span
class="n">expectedResults</span> <span class="o">=</span> <span
class="n">Arrays</span><span class="o">.</span><span
class="na">asList</span><span class="o">(</span>
+ <span class="n">KV</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="s">"Flourish"</span><span class="o">,</span> <span
class="mi">3L</span><span class="o">),</span>
+ <span class="n">KV</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="s">"stomach"</span><span class="o">,</span> <span
class="mi">1L</span><span class="o">));</span>
+ <span class="n">PAssert</span><span class="o">.</span><span
class="na">that</span><span class="o">(</span><span
class="n">filteredWords</span><span class="o">).</span><span
class="na">containsInAnyOrder</span><span class="o">(</span><span
class="n">expectedResults</span><span class="o">);</span>
+ <span class="o">...</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h2 id="windowedwordcount">WindowedWordCount</h2>
+
+<p>This example, <code class="highlighter-rouge">WindowedWordCount</code>,
counts words in text just as the previous examples did, but introduces several
advanced concepts.</p>
+
+<p><strong>New Concepts:</strong></p>
+
+<ul>
+ <li>Unbounded and bounded pipeline input modes</li>
+ <li>Adding timestamps to data</li>
+ <li>Windowing</li>
+ <li>Reusing PTransforms over windowed PCollections</li>
+</ul>
+
+<p>The following sections explain these key concepts in detail, and break down
the pipeline code into smaller sections.</p>
+
+<h3 id="unbounded-and-bounded-pipeline-input-modes">Unbounded and bounded
pipeline input modes</h3>
+
+<p>Beam allows you to create a single pipeline that can handle both bounded
and unbounded types of input. If the input is unbounded, then all <code
class="highlighter-rouge">PCollections</code> of the pipeline will be unbounded
as well. The same goes for bounded input. If your input has a fixed number of
elements, itâs considered a âboundedâ data set. If your input is
continuously updating, then itâs considered âunboundedâ.</p>
+
+<p>Recall that the input for this example is a a set of Shakespeareâs texts,
finite data. Therefore, this example reads bounded data from a text file:</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="kd">public</span> <span
class="kd">static</span> <span class="kt">void</span> <span
class="nf">main</span><span class="o">(</span><span
class="n">String</span><span class="o">[]</span> <span
class="n">args</span><span class="o">)</span> <span class="kd">throws</span>
<span class="n">IOException</span> <span class="o">{</span>
+ <span class="n">Options</span> <span class="n">options</span> <span
class="o">=</span> <span class="o">...</span>
+ <span class="n">Pipeline</span> <span class="n">pipeline</span> <span
class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span
class="na">create</span><span class="o">(</span><span
class="n">options</span><span class="o">);</span>
+
+ <span class="cm">/**
+ * Concept #1: The Beam SDK allows running the same pipeline with a
bounded or unbounded input source.
+ */</span>
+ <span class="n">PCollection</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span class="n">input</span>
<span class="o">=</span> <span class="n">pipeline</span>
+ <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span
class="na">Read</span><span class="o">.</span><span class="na">from</span><span
class="o">(</span><span class="n">options</span><span class="o">.</span><span
class="na">getInputFile</span><span class="o">()))</span>
+
+</code></pre>
+</div>
+
+<h3 id="adding-timestamps-to-data">Adding Timestamps to Data</h3>
+
+<p>Each element in a <code class="highlighter-rouge">PCollection</code> has an
associated <strong>timestamp</strong>. The timestamp for each element is
initially assigned by the source that creates the <code
class="highlighter-rouge">PCollection</code> and can be adjusted by a <code
class="highlighter-rouge">DoFn</code>. In this example the input is bounded.
For the purpose of the example, the <code class="highlighter-rouge">DoFn</code>
method named <code class="highlighter-rouge">AddTimestampsFn</code> (invoked by
<code class="highlighter-rouge">ParDo</code>) will set a timestamp for each
element in the <code class="highlighter-rouge">PCollection</code>.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="c1">// Concept #2: Add an element
timestamp, using an artificial time just to show windowing.</span>
+<span class="c1">// See AddTimestampFn for more details on this.</span>
+<span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="k">new</span> <span
class="n">AddTimestampFn</span><span class="o">()));</span>
+</code></pre>
+</div>
+
+<p>Below is the code for <code
class="highlighter-rouge">AddTimestampFn</code>, a <code
class="highlighter-rouge">DoFn</code> invoked by <code
class="highlighter-rouge">ParDo</code>, that sets the data element of the
timestamp given the element itself. For example, if the elements were log
lines, this <code class="highlighter-rouge">ParDo</code> could parse the time
out of the log string and set it as the elementâs timestamp. There are no
timestamps inherent in the works of Shakespeare, so in this case weâve made
up random timestamps just to illustrate the concept. Each line of the input
text will get a random associated timestamp sometime in a 2-hour period.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="cm">/**
+ * Concept #2: A DoFn that sets the data element timestamp. This is a silly
method, just for
+ * this example, for the bounded data case. Imagine that many ghosts of
Shakespeare are all
+ * typing madly at the same time to recreate his masterworks. Each line of
the corpus will
+ * get a random associated timestamp somewhere in a 2-hour period.
+ */</span>
+ <span class="kd">static</span> <span class="kd">class</span> <span
class="nc">AddTimestampFn</span> <span class="kd">extends</span> <span
class="n">DoFn</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">String</span><span class="o">></span> <span class="o">{</span>
+ <span class="kd">private</span> <span class="kd">static</span> <span
class="kd">final</span> <span class="n">Duration</span> <span
class="n">RAND_RANGE</span> <span class="o">=</span> <span
class="n">Duration</span><span class="o">.</span><span
class="na">standardHours</span><span class="o">(</span><span
class="mi">2</span><span class="o">);</span>
+ <span class="kd">private</span> <span class="kd">final</span> <span
class="n">Instant</span> <span class="n">minTimestamp</span><span
class="o">;</span>
+
+ <span class="n">AddTimestampFn</span><span class="o">()</span> <span
class="o">{</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">minTimestamp</span> <span class="o">=</span> <span
class="k">new</span> <span class="n">Instant</span><span
class="o">(</span><span class="n">System</span><span class="o">.</span><span
class="na">currentTimeMillis</span><span class="o">());</span>
+ <span class="o">}</span>
+
+ <span class="nd">@ProcessElement</span>
+ <span class="kd">public</span> <span class="kt">void</span> <span
class="nf">processElement</span><span class="o">(</span><span
class="n">ProcessContext</span> <span class="n">c</span><span
class="o">)</span> <span class="o">{</span>
+ <span class="c1">// Generate a timestamp that falls somewhere in the
past two hours.</span>
+ <span class="kt">long</span> <span class="n">randMillis</span> <span
class="o">=</span> <span class="o">(</span><span class="kt">long</span><span
class="o">)</span> <span class="o">(</span><span class="n">Math</span><span
class="o">.</span><span class="na">random</span><span class="o">()</span> <span
class="o">*</span> <span class="n">RAND_RANGE</span><span
class="o">.</span><span class="na">getMillis</span><span class="o">());</span>
+ <span class="n">Instant</span> <span class="n">randomTimestamp</span>
<span class="o">=</span> <span class="n">minTimestamp</span><span
class="o">.</span><span class="na">plus</span><span class="o">(</span><span
class="n">randMillis</span><span class="o">);</span>
+ <span class="cm">/**
+ * Set the data element with that timestamp.
+ */</span>
+ <span class="n">c</span><span class="o">.</span><span
class="na">outputWithTimestamp</span><span class="o">(</span><span
class="n">c</span><span class="o">.</span><span class="na">element</span><span
class="o">(),</span> <span class="k">new</span> <span
class="n">Instant</span><span class="o">(</span><span
class="n">randomTimestamp</span><span class="o">));</span>
+ <span class="o">}</span>
+ <span class="o">}</span>
+</code></pre>
+</div>
+
+<h3 id="windowing">Windowing</h3>
+
+<p>Beam uses a concept called <strong>Windowing</strong> to subdivide a <code
class="highlighter-rouge">PCollection</code> according to the timestamps of its
individual elements. <code class="highlighter-rouge">PTransforms</code> that
aggregate multiple elements, process each <code
class="highlighter-rouge">PCollection</code> as a succession of multiple,
finite windows, even though the entire collection itself may be of infinite
size (unbounded).</p>
+
+<p>The <code class="highlighter-rouge">WindowingWordCount</code> example
applies fixed-time windowing, wherein each window represents a fixed time
interval. The fixed window size for this example defaults to 1 minute (you can
change this with a command-line option).</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="cm">/**
+ * Concept #3: Window into fixed windows. The fixed window size for this
example defaults to 1
+ * minute (you can change this with a command-line option).
+ */</span>
+<span class="n">PCollection</span><span class="o"><</span><span
class="n">String</span><span class="o">></span> <span
class="n">windowedWords</span> <span class="o">=</span> <span
class="n">input</span>
+ <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">Window</span><span
class="o">.<</span><span class="n">String</span><span
class="o">></span><span class="n">into</span><span class="o">(</span>
+ <span class="n">FixedWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">standardMinutes</span><span class="o">(</span><span
class="n">options</span><span class="o">.</span><span
class="na">getWindowSize</span><span class="o">()))));</span>
+</code></pre>
+</div>
+
+<h3 id="reusing-ptransforms-over-windowed-pcollections">Reusing PTransforms
over windowed PCollections</h3>
+
+<p>You can reuse existing <code class="highlighter-rouge">PTransform</code>s,
that were created for manipulating simple <code
class="highlighter-rouge">PCollection</code>s, over windowed <code
class="highlighter-rouge">PCollection</code>s as well.</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>/**
+ * Concept #4: Re-use our existing CountWords transform that does not have
knowledge of
+ * windows over a PCollection containing windowed values.
+ */
+PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new
WordCount.CountWords());
+</code></pre>
+</div>
+
+<h2 id="write-results-to-an-unbounded-sink">Write Results to an Unbounded
Sink</h2>
+
+<p>Since our input is unbounded, the same is true of our output <code
class="highlighter-rouge">PCollection</code>. We need to make sure that we
choose an appropriate, unbounded sink. Some output sinks support only bounded
output, such as a text file. Google Cloud BigQuery is an output source that
supports both bounded and unbounded input.</p>
+
+<p>In this example, we stream the results to a BigQuery table. The results are
then formatted for a BigQuery table, and then written to BigQuery using
BigQueryIO.Write.</p>
+
+<div class="language-java highlighter-rouge"><pre
class="highlight"><code><span class="cm">/**
+ * Concept #5: Format the results for a BigQuery table, then write to BigQuery.
+ * The BigQuery output source supports both bounded and unbounded data.
+ */</span>
+<span class="n">wordCounts</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span
class="o">(</span><span class="k">new</span> <span
class="n">FormatAsTableRowFn</span><span class="o">()))</span>
+ <span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">BigQueryIO</span><span
class="o">.</span><span class="na">Write</span>
+ <span class="o">.</span><span class="na">to</span><span
class="o">(</span><span class="n">getTableReference</span><span
class="o">(</span><span class="n">options</span><span class="o">))</span>
+ <span class="o">.</span><span class="na">withSchema</span><span
class="o">(</span><span class="n">getSchema</span><span class="o">())</span>
+ <span class="o">.</span><span
class="na">withCreateDisposition</span><span class="o">(</span><span
class="n">BigQueryIO</span><span class="o">.</span><span
class="na">Write</span><span class="o">.</span><span
class="na">CreateDisposition</span><span class="o">.</span><span
class="na">CREATE_IF_NEEDED</span><span class="o">)</span>
+ <span class="o">.</span><span
class="na">withWriteDisposition</span><span class="o">(</span><span
class="n">BigQueryIO</span><span class="o">.</span><span
class="na">Write</span><span class="o">.</span><span
class="na">WriteDisposition</span><span class="o">.</span><span
class="na">WRITE_APPEND</span><span class="o">));</span>
+</code></pre>
+</div>
+
+
+ </div>
+
+
+ <hr>
+ <div class="row">
+ <div class="col-xs-12">
+ <footer>
+ <p class="text-center">© Copyright 2016
+ <a href="http://www.apache.org">The Apache Software
Foundation.</a> All Rights Reserved.</p>
+ <p class="text-center"><a href="/privacy_policy">Privacy
Policy</a> |
+ <a href="/feed.xml">RSS Feed</a></p>
+ </footer>
+ </div>
+ </div>
+ <!-- container div end -->
+</div>
+
+
+ </body>
+
+</html>