Regenerate website

Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/12121557
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/12121557
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/12121557

Branch: refs/heads/asf-site
Commit: 12121557b155ec7d0aea865afa5c6f2801217d56
Parents: 9fb214f
Author: Davor Bonaci <da...@google.com>
Authored: Fri May 12 16:17:49 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri May 12 16:17:49 2017 -0700

----------------------------------------------------------------------
 .../documentation/programming-guide/index.html  |  2 +-
 .../sdks/python-custom-io/index.html            |  2 +-
 .../get-started/wordcount-example/index.html    | 80 +++++++++++---------
 3 files changed, 47 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/12121557/content/documentation/programming-guide/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/programming-guide/index.html 
b/content/documentation/programming-guide/index.html
index 1e6e9a7..3b3cb14 100644
--- a/content/documentation/programming-guide/index.html
+++ b/content/documentation/programming-guide/index.html
@@ -1939,7 +1939,7 @@ Subsequent transforms, however, are applied to the result 
of the <code class="hi
     <span class="n">unix_timestamp</span> <span class="o">=</span> <span 
class="n">extract_timestamp_from_log_entry</span><span class="p">(</span><span 
class="n">element</span><span class="p">)</span>
     <span class="c"># Wrap and emit the current entry and new timestamp in 
a</span>
     <span class="c"># TimestampedValue.</span>
-    <span class="k">yield</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">TimestampedValue</span><span 
class="p">(</span><span class="n">element</span><span class="p">,</span> <span 
class="n">unix_timestamp</span><span class="p">)</span>
+    <span class="k">yield</span> <span class="n">beam</span><span 
class="o">.</span><span class="n">window</span><span class="o">.</span><span 
class="n">TimestampedValue</span><span class="p">(</span><span 
class="n">element</span><span class="p">,</span> <span 
class="n">unix_timestamp</span><span class="p">)</span>
 
 <span class="n">timestamped_items</span> <span class="o">=</span> <span 
class="n">items</span> <span class="o">|</span> <span 
class="s">'timestamp'</span> <span class="o">&gt;&gt;</span> <span 
class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span 
class="p">(</span><span class="n">AddTimestampDoFn</span><span 
class="p">())</span>
 

http://git-wip-us.apache.org/repos/asf/beam-site/blob/12121557/content/documentation/sdks/python-custom-io/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/sdks/python-custom-io/index.html 
b/content/documentation/sdks/python-custom-io/index.html
index 629ef0f..fb2646f 100644
--- a/content/documentation/sdks/python-custom-io/index.html
+++ b/content/documentation/sdks/python-custom-io/index.html
@@ -464,7 +464,7 @@ numbers = p | 'ProduceNumbers' &gt;&gt; 
beam.io.Read(CountingSource(count))
 
 <h4 id="filesink">FileSink</h4>
 
-<p>If your data source uses files, you can derive your <code 
class="highlighter-rouge">Sink</code> and <code 
class="highlighter-rouge">Writer</code> classes from the <code 
class="highlighter-rouge">FileSink</code> and <code 
class="highlighter-rouge">FileSinkWriter</code> classes, which can be found in 
the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/fileio.py";>fileio.py</a>
 module. These classes implement code common sinks that interact with files, 
including:</p>
+<p>If your data source uses files, you can derive your <code 
class="highlighter-rouge">Sink</code> and <code 
class="highlighter-rouge">Writer</code> classes from the <code 
class="highlighter-rouge">FileBasedSink</code> and <code 
class="highlighter-rouge">FileBasedSinkWriter</code> classes, which can be 
found in the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsink.py";>filebasedsink.py</a>
 module. These classes implement code common sinks that interact with files, 
including:</p>
 
 <ul>
   <li>Setting file headers and footers</li>

http://git-wip-us.apache.org/repos/asf/beam-site/blob/12121557/content/get-started/wordcount-example/index.html
----------------------------------------------------------------------
diff --git a/content/get-started/wordcount-example/index.html 
b/content/get-started/wordcount-example/index.html
index 5cc32f3..333cfb9 100644
--- a/content/get-started/wordcount-example/index.html
+++ b/content/get-started/wordcount-example/index.html
@@ -172,6 +172,7 @@
           <li><a href="#dataflow-runner" 
id="markdown-toc-dataflow-runner">Dataflow Runner</a></li>
           <li><a href="#apache-spark-runner" 
id="markdown-toc-apache-spark-runner">Apache Spark Runner</a></li>
           <li><a href="#apache-flink-runner" 
id="markdown-toc-apache-flink-runner">Apache Flink Runner</a></li>
+          <li><a href="#apache-apex-runner" 
id="markdown-toc-apache-apex-runner">Apache Apex Runner</a></li>
         </ul>
       </li>
       <li><a href="#testing-your-pipeline-via-passert" 
id="markdown-toc-testing-your-pipeline-via-passert">Testing your Pipeline via 
PAssert</a></li>
@@ -228,7 +229,7 @@
 
 <h3 id="creating-the-pipeline">Creating the Pipeline</h3>
 
-<p>The first step in creating a Beam pipeline is to create a <code 
class="highlighter-rouge">PipelineOptions object</code>. This object lets us 
set various options for our pipeline, such as the pipeline runner that will 
execute our pipeline and any runner-specific configuration required by the 
chosen runner. In this example we set these options programmatically, but more 
often command-line arguments are used to set <code 
class="highlighter-rouge">PipelineOptions</code>.</p>
+<p>The first step in creating a Beam pipeline is to create a <code 
class="highlighter-rouge">PipelineOptions</code> object. This object lets us 
set various options for our pipeline, such as the pipeline runner that will 
execute our pipeline and any runner-specific configuration required by the 
chosen runner. In this example we set these options programmatically, but more 
often command-line arguments are used to set <code 
class="highlighter-rouge">PipelineOptions</code>.</p>
 
 <p>You can specify a runner for executing your pipeline, such as the <code 
class="highlighter-rouge">DataflowRunner</code> or <code 
class="highlighter-rouge">SparkRunner</code>. If you omit specifying a runner, 
as in this example, your pipeline will be executed locally using the <code 
class="highlighter-rouge">DirectRunner</code>. In the next sections, we will 
specify the pipeline’s runner.</p>
 
@@ -273,7 +274,7 @@
 
 <p>The Minimal WordCount pipeline contains several transforms to read data 
into the pipeline, manipulate or otherwise transform the data, and write out 
the results. Each transform represents an operation in the pipeline.</p>
 
-<p>Each transform takes some kind of input (data or otherwise), and produces 
some output data. The input and output data is represented by the SDK class 
<code class="highlighter-rouge">PCollection</code>. <code 
class="highlighter-rouge">PCollection</code> is a special class, provided by 
the Beam SDK, that you can use to represent a data set of virtually any size, 
including infinite data sets.</p>
+<p>Each transform takes some kind of input (data or otherwise), and produces 
some output data. The input and output data is represented by the SDK class 
<code class="highlighter-rouge">PCollection</code>. <code 
class="highlighter-rouge">PCollection</code> is a special class, provided by 
the Beam SDK, that you can use to represent a data set of virtually any size, 
including unbounded data sets.</p>
 
 <p><img src="/images/wordcount-pipeline.png" alt="Word Count pipeline diagram" 
/>
 Figure 1: The pipeline data flow.</p>
@@ -282,7 +283,7 @@ Figure 1: The pipeline data flow.</p>
 
 <ol>
   <li>
-    <p>A text file <code class="highlighter-rouge">Read</code> transform is 
applied to the Pipeline object itself, and produces a <code 
class="highlighter-rouge">PCollection</code> as output. Each element in the 
output PCollection represents one line of text from the input file. This 
example happens to use input data stored in a publicly accessible Google Cloud 
Storage bucket (“gs://”).</p>
+    <p>A text file <code class="highlighter-rouge">Read</code> transform is 
applied to the Pipeline object itself, and produces a <code 
class="highlighter-rouge">PCollection</code> as output. Each element in the 
output PCollection represents one line of text from the input file. This 
example uses input data stored in a publicly accessible Google Cloud Storage 
bucket (“gs://”).</p>
 
     <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">p</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="n">TextIO</span><span class="o">.</span><span 
class="na">Read</span><span class="o">.</span><span class="na">from</span><span 
class="o">(</span><span 
class="s">"gs://apache-beam-samples/shakespeare/*"</span><span 
class="o">))</span>
 </code></pre>
@@ -299,7 +300,9 @@ Figure 1: The pipeline data flow.</p>
     <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="s">"ExtractWords"</span><span class="o">,</span> <span 
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;()</span> <span 
class="o">{</span>
     <span class="nd">@ProcessElement</span>
     <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
-        <span class="k">for</span> <span class="o">(</span><span 
class="n">String</span> <span class="n">word</span> <span class="o">:</span> 
<span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">().</span><span 
class="na">split</span><span class="o">(</span><span 
class="s">"[^a-zA-Z']+"</span><span class="o">))</span> <span class="o">{</span>
+        <span class="c1">// \p{L} denotes the category of Unicode 
letters,</span>
+        <span class="c1">// so this pattern will match on everything that is 
not a letter.</span>
+        <span class="k">for</span> <span class="o">(</span><span 
class="n">String</span> <span class="n">word</span> <span class="o">:</span> 
<span class="n">c</span><span class="o">.</span><span 
class="na">element</span><span class="o">().</span><span 
class="na">split</span><span class="o">(</span><span 
class="s">"[^\\p{L}]+"</span><span class="o">))</span> <span class="o">{</span>
             <span class="k">if</span> <span class="o">(!</span><span 
class="n">word</span><span class="o">.</span><span 
class="na">isEmpty</span><span class="o">())</span> <span class="o">{</span>
                 <span class="n">c</span><span class="o">.</span><span 
class="na">output</span><span class="o">(</span><span 
class="n">word</span><span class="o">);</span>
             <span class="o">}</span>
@@ -330,7 +333,7 @@ Figure 1: The pipeline data flow.</p>
   <li>
     <p>The next transform formats each of the key/value pairs of unique words 
and occurrence counts into a printable string suitable for writing to an output 
file.</p>
 
-    <p>The map transform is a higher-level composite transform that 
encapsulates a simple <code class="highlighter-rouge">ParDo</code>; for each 
element in the input <code class="highlighter-rouge">PCollection</code>, the 
map transform applies a function that produces exactly one output element.</p>
+    <p>The map transform is a higher-level composite transform that 
encapsulates a simple <code class="highlighter-rouge">ParDo</code>. For each 
element in the input <code class="highlighter-rouge">PCollection</code>, the 
map transform applies a function that produces exactly one output element.</p>
 
     <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span 
class="s">"FormatResults"</span><span class="o">,</span> <span 
class="n">MapElements</span><span class="o">.</span><span 
class="na">via</span><span class="o">(</span><span class="k">new</span> <span 
class="n">SimpleFunction</span><span class="o">&lt;</span><span 
class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;,</span> <span 
class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
     <span class="nd">@Override</span>
@@ -526,14 +529,6 @@ Figure 1: The pipeline data flow.</p>
 
 <p>Each runner may choose to handle logs in its own way.</p>
 
-<h4 id="direct-runner">Direct Runner</h4>
-
-<p>If you execute your pipeline using <code 
class="highlighter-rouge">DirectRunner</code>, it will print the log messages 
directly to your local console.</p>
-
-<h4 id="dataflow-runner">Dataflow Runner</h4>
-
-<p>If you execute your pipeline using <code 
class="highlighter-rouge">DataflowRunner</code>, you can use Google Cloud 
Logging. Google Cloud Logging (currently in beta) aggregates the logs from all 
of your Dataflow job’s workers to a single location in the Google Cloud 
Platform Console. You can use Cloud Logging to search and access the logs from 
all of the Compute Engine instances that Dataflow has spun up to complete your 
Dataflow job. You can add logging statements into your pipeline’s <code 
class="highlighter-rouge">DoFn</code> instances that will appear in Cloud 
Logging as your pipeline runs.</p>
-
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="c1">// This example uses .trace and 
.debug:</span>
 
 <span class="kd">public</span> <span class="kd">class</span> <span 
class="nc">DebuggingWordCount</span> <span class="o">{</span>
@@ -592,7 +587,15 @@ Figure 1: The pipeline data flow.</p>
 </code></pre>
 </div>
 
-<p>If you execute your pipeline using <code 
class="highlighter-rouge">DataflowRunner</code>, you can control the worker log 
levels. Dataflow workers that execute user code are configured to log to Cloud 
Logging by default at “INFO” log level and higher. You can override log 
levels for specific logging namespaces by specifying: <code 
class="highlighter-rouge">--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>.
 For example, by specifying <code 
class="highlighter-rouge">--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code>
 when executing this pipeline using the Dataflow service, Cloud Logging would 
contain only “DEBUG” or higher level logs for the package in addition to 
the default “INFO” or higher level logs.</p>
+<h4 id="direct-runner">Direct Runner</h4>
+
+<p>If you execute your pipeline using <code 
class="highlighter-rouge">DirectRunner</code>, it will print the log messages 
directly to your local console.</p>
+
+<h4 id="dataflow-runner">Dataflow Runner</h4>
+
+<p>If you execute your pipeline using <code 
class="highlighter-rouge">DataflowRunner</code>, you can use Stackdriver 
Logging. Stackdriver Logging aggregates the logs from all of your Dataflow 
job’s workers to a single location in the Google Cloud Platform Console. You 
can use Stackdriver Logging to search and access the logs from all of the 
workers that Dataflow has spun up to complete your Dataflow job. Logging 
statements in your pipeline’s <code class="highlighter-rouge">DoFn</code> 
instances will appear in Stackdriver Logging as your pipeline runs.</p>
+
+<p>If you execute your pipeline using <code 
class="highlighter-rouge">DataflowRunner</code>, you can control the worker log 
levels. Dataflow workers that execute user code are configured to log to 
Stackdriver Logging by default at “INFO” log level and higher. You can 
override log levels for specific logging namespaces by specifying: <code 
class="highlighter-rouge">--workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}</code>.
 For example, by specifying <code 
class="highlighter-rouge">--workerLogLevelOverrides={"org.apache.beam.examples":"DEBUG"}</code>
 when executing this pipeline using the Dataflow service, Stackdriver Logging 
would contain only “DEBUG” or higher level logs for the package in addition 
to the default “INFO” or higher level logs.</p>
 
 <p>The default Dataflow worker logging configuration can be overridden by 
specifying <code class="highlighter-rouge">--defaultWorkerLogLevel=&lt;one of 
TRACE, DEBUG, INFO, WARN, ERROR&gt;</code>. For example, by specifying <code 
class="highlighter-rouge">--defaultWorkerLogLevel=DEBUG</code> when executing 
this pipeline with the Dataflow service, Cloud Logging would contain all 
“DEBUG” or higher level logs. Note that changing the default worker log 
level to TRACE or DEBUG will significantly increase the amount of logs 
output.</p>
 
@@ -608,11 +611,17 @@ Figure 1: The pipeline data flow.</p>
   <p><strong>Note:</strong> This section is yet to be added. There is an open 
issue for this (<a 
href="https://issues.apache.org/jira/browse/BEAM-791";>BEAM-791</a>).</p>
 </blockquote>
 
+<h4 id="apache-apex-runner">Apache Apex Runner</h4>
+
+<blockquote>
+  <p><strong>Note:</strong> This section is yet to be added. There is an open 
issue for this (<a 
href="https://issues.apache.org/jira/browse/BEAM-2285";>BEAM-2285</a>).</p>
+</blockquote>
+
 <h3 id="testing-your-pipeline-via-passert">Testing your Pipeline via 
PAssert</h3>
 
-<p><code class="highlighter-rouge">PAssert</code> is a set of convenient <code 
class="highlighter-rouge">PTransform</code>s in the style of Hamcrest’s 
collection matchers that can be used when writing Pipeline level tests to 
validate the contents of PCollections. <code 
class="highlighter-rouge">PAssert</code> is best used in unit tests with small 
data sets, but is demonstrated here as a teaching tool.</p>
+<p><code class="highlighter-rouge">PAssert</code> is a set of convenient 
PTransforms in the style of Hamcrest’s collection matchers that can be used 
when writing Pipeline level tests to validate the contents of PCollections. 
<code class="highlighter-rouge">PAssert</code> is best used in unit tests with 
small data sets, but is demonstrated here as a teaching tool.</p>
 
-<p>Below, we verify that the set of filtered words matches our expected 
counts. Note that <code class="highlighter-rouge">PAssert</code> does not 
provide any output, and that successful completion of the pipeline implies that 
the expectations were met. See <a 
href="https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java";>DebuggingWordCountTest</a>
 for an example unit test.</p>
+<p>Below, we verify that the set of filtered words matches our expected 
counts. Note that <code class="highlighter-rouge">PAssert</code> does not 
produce any output, and pipeline will only succeed if all of the expectations 
are met. See <a 
href="https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java";>DebuggingWordCountTest</a>
 for an example unit test.</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">static</span> <span class="kt">void</span> <span 
class="nf">main</span><span class="o">(</span><span 
class="n">String</span><span class="o">[]</span> <span 
class="n">args</span><span class="o">)</span> <span class="o">{</span>
   <span class="o">...</span>
@@ -646,7 +655,7 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="unbounded-and-bounded-pipeline-input-modes">Unbounded and bounded 
pipeline input modes</h3>
 
-<p>Beam allows you to create a single pipeline that can handle both bounded 
and unbounded types of input. If the input is unbounded, then all <code 
class="highlighter-rouge">PCollections</code> of the pipeline will be unbounded 
as well. The same goes for bounded input. If your input has a fixed number of 
elements, it’s considered a ‘bounded’ data set. If your input is 
continuously updating, then it’s considered ‘unbounded’.</p>
+<p>Beam allows you to create a single pipeline that can handle both bounded 
and unbounded types of input. If the input is unbounded, then all PCollections 
of the pipeline will be unbounded as well. The same goes for bounded input. If 
your input has a fixed number of elements, it’s considered a ‘bounded’ 
data set. If your input is continuously updating, then it’s considered 
‘unbounded’.</p>
 
 <p>Recall that the input for this example is a a set of Shakespeare’s texts, 
finite data. Therefore, this example reads bounded data from a text file:</p>
 
@@ -679,20 +688,24 @@ Figure 1: The pipeline data flow.</p>
 <p>Below is the code for <code 
class="highlighter-rouge">AddTimestampFn</code>, a <code 
class="highlighter-rouge">DoFn</code> invoked by <code 
class="highlighter-rouge">ParDo</code>, that sets the data element of the 
timestamp given the element itself. For example, if the elements were log 
lines, this <code class="highlighter-rouge">ParDo</code> could parse the time 
out of the log string and set it as the element’s timestamp. There are no 
timestamps inherent in the works of Shakespeare, so in this case we’ve made 
up random timestamps just to illustrate the concept. Each line of the input 
text will get a random associated timestamp sometime in a 2-hour period.</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">AddTimestampFn</span> <span 
class="kd">extends</span> <span class="n">DoFn</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">String</span><span class="o">&gt;</span> <span 
class="o">{</span>
-  <span class="kd">private</span> <span class="kd">static</span> <span 
class="kd">final</span> <span class="n">Duration</span> <span 
class="n">RAND_RANGE</span> <span class="o">=</span> <span 
class="n">Duration</span><span class="o">.</span><span 
class="na">standardHours</span><span class="o">(</span><span 
class="mi">2</span><span class="o">);</span>
   <span class="kd">private</span> <span class="kd">final</span> <span 
class="n">Instant</span> <span class="n">minTimestamp</span><span 
class="o">;</span>
+  <span class="kd">private</span> <span class="kd">final</span> <span 
class="n">Instant</span> <span class="n">maxTimestamp</span><span 
class="o">;</span>
 
-  <span class="n">AddTimestampFn</span><span class="o">()</span> <span 
class="o">{</span>
-    <span class="k">this</span><span class="o">.</span><span 
class="na">minTimestamp</span> <span class="o">=</span> <span 
class="k">new</span> <span class="n">Instant</span><span 
class="o">(</span><span class="n">System</span><span class="o">.</span><span 
class="na">currentTimeMillis</span><span class="o">());</span>
+  <span class="n">AddTimestampFn</span><span class="o">(</span><span 
class="n">Instant</span> <span class="n">minTimestamp</span><span 
class="o">,</span> <span class="n">Instant</span> <span 
class="n">maxTimestamp</span><span class="o">)</span> <span class="o">{</span>
+    <span class="k">this</span><span class="o">.</span><span 
class="na">minTimestamp</span> <span class="o">=</span> <span 
class="n">minTimestamp</span><span class="o">;</span>
+    <span class="k">this</span><span class="o">.</span><span 
class="na">maxTimestamp</span> <span class="o">=</span> <span 
class="n">maxTimestamp</span><span class="o">;</span>
   <span class="o">}</span>
 
   <span class="nd">@ProcessElement</span>
   <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">processElement</span><span class="o">(</span><span 
class="n">ProcessContext</span> <span class="n">c</span><span 
class="o">)</span> <span class="o">{</span>
-    <span class="c1">// Generate a timestamp that falls somewhere in the past 
two hours.</span>
-    <span class="kt">long</span> <span class="n">randMillis</span> <span 
class="o">=</span> <span class="o">(</span><span class="kt">long</span><span 
class="o">)</span> <span class="o">(</span><span class="n">Math</span><span 
class="o">.</span><span class="na">random</span><span class="o">()</span> <span 
class="o">*</span> <span class="n">RAND_RANGE</span><span 
class="o">.</span><span class="na">getMillis</span><span class="o">());</span>
-    <span class="n">Instant</span> <span class="n">randomTimestamp</span> 
<span class="o">=</span> <span class="n">minTimestamp</span><span 
class="o">.</span><span class="na">plus</span><span class="o">(</span><span 
class="n">randMillis</span><span class="o">);</span>
-
-    <span class="c1">// Set the data element with that timestamp.</span>
+    <span class="n">Instant</span> <span class="n">randomTimestamp</span> 
<span class="o">=</span>
+      <span class="k">new</span> <span class="nf">Instant</span><span 
class="o">(</span>
+          <span class="n">ThreadLocalRandom</span><span 
class="o">.</span><span class="na">current</span><span class="o">()</span>
+          <span class="o">.</span><span class="na">nextLong</span><span 
class="o">(</span><span class="n">minTimestamp</span><span 
class="o">.</span><span class="na">getMillis</span><span class="o">(),</span> 
<span class="n">maxTimestamp</span><span class="o">.</span><span 
class="na">getMillis</span><span class="o">()));</span>
+
+    <span class="cm">/**
+     * Concept #2: Set the data element with that timestamp.
+     */</span>
     <span class="n">c</span><span class="o">.</span><span 
class="na">outputWithTimestamp</span><span class="o">(</span><span 
class="n">c</span><span class="o">.</span><span class="na">element</span><span 
class="o">(),</span> <span class="k">new</span> <span 
class="n">Instant</span><span class="o">(</span><span 
class="n">randomTimestamp</span><span class="o">));</span>
   <span class="o">}</span>
 <span class="o">}</span>
@@ -705,9 +718,9 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="windowing">Windowing</h3>
 
-<p>Beam uses a concept called <strong>Windowing</strong> to subdivide a <code 
class="highlighter-rouge">PCollection</code> according to the timestamps of its 
individual elements. <code class="highlighter-rouge">PTransforms</code> that 
aggregate multiple elements, process each <code 
class="highlighter-rouge">PCollection</code> as a succession of multiple, 
finite windows, even though the entire collection itself may be of infinite 
size (unbounded).</p>
+<p>Beam uses a concept called <strong>Windowing</strong> to subdivide a <code 
class="highlighter-rouge">PCollection</code> according to the timestamps of its 
individual elements. PTransforms that aggregate multiple elements, process each 
<code class="highlighter-rouge">PCollection</code> as a succession of multiple, 
finite windows, even though the entire collection itself may be of infinite 
size (unbounded).</p>
 
-<p>The <code class="highlighter-rouge">WindowingWordCount</code> example 
applies fixed-time windowing, wherein each window represents a fixed time 
interval. The fixed window size for this example defaults to 1 minute (you can 
change this with a command-line option).</p>
+<p>The <code class="highlighter-rouge">WindowedWordCount</code> example 
applies fixed-time windowing, wherein each window represents a fixed time 
interval. The fixed window size for this example defaults to 1 minute (you can 
change this with a command-line option).</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> 
<span class="n">windowedWords</span> <span class="o">=</span> <span 
class="n">input</span>
   <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">Window</span><span 
class="o">.&lt;</span><span class="n">String</span><span 
class="o">&gt;</span><span class="n">into</span><span class="o">(</span>
@@ -721,7 +734,7 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="reusing-ptransforms-over-windowed-pcollections">Reusing PTransforms 
over windowed PCollections</h3>
 
-<p>You can reuse existing <code class="highlighter-rouge">PTransform</code>s, 
that were created for manipulating simple <code 
class="highlighter-rouge">PCollection</code>s, over windowed <code 
class="highlighter-rouge">PCollection</code>s as well.</p>
+<p>You can reuse existing PTransforms that were created for manipulating 
simple PCollections over windowed PCollections as well.</p>
 
 <div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">PCollection</span><span 
class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;&gt;</span> <span 
class="n">wordCounts</span> <span class="o">=</span> <span 
class="n">windowedWords</span><span class="o">.</span><span 
class="na">apply</span><span class="o">(</span><span class="k">new</span> <span 
class="n">WordCount</span><span class="o">.</span><span 
class="na">CountWords</span><span class="o">());</span>
 </code></pre>
@@ -733,16 +746,13 @@ Figure 1: The pipeline data flow.</p>
 
 <h3 id="write-results-to-an-unbounded-sink">Write Results to an Unbounded 
Sink</h3>
 
-<p>Since our input is unbounded, the same is true of our output <code 
class="highlighter-rouge">PCollection</code>. We need to make sure that we 
choose an appropriate, unbounded sink. Some output sinks support only bounded 
output, such as a text file. Google Cloud BigQuery is an output source that 
supports both bounded and unbounded input.</p>
+<p>When our input is unbounded, the same is true of our output <code 
class="highlighter-rouge">PCollection</code>. We need to make sure that we 
choose an appropriate, unbounded sink. Some output sinks support only bounded 
output, while others support both bounded and unbounded outputs. By using a 
<code class="highlighter-rouge">FilenamePolicy</code>, we can use <code 
class="highlighter-rouge">TextIO</code> to files that are partitioned by 
windows. We use a composite <code class="highlighter-rouge">PTransform</code> 
that uses such a policy internally to write a single sharded file per 
window.</p>
 
 <p>In this example, we stream the results to a BigQuery table. The results are 
then formatted for a BigQuery table, and then written to BigQuery using 
BigQueryIO.Write.</p>
 
-<div class="language-java highlighter-rouge"><pre 
class="highlight"><code><span class="n">wordCounts</span><span 
class="o">.</span><span class="na">apply</span><span class="o">(</span><span 
class="n">ParDo</span><span class="o">.</span><span class="na">of</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">FormatAsTableRowFn</span><span class="o">()))</span>
-    <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">BigQueryIO</span><span 
class="o">.</span><span class="na">Write</span>
-      <span class="o">.</span><span class="na">to</span><span 
class="o">(</span><span class="n">getTableReference</span><span 
class="o">(</span><span class="n">options</span><span class="o">))</span>
-      <span class="o">.</span><span class="na">withSchema</span><span 
class="o">(</span><span class="n">getSchema</span><span class="o">())</span>
-      <span class="o">.</span><span 
class="na">withCreateDisposition</span><span class="o">(</span><span 
class="n">BigQueryIO</span><span class="o">.</span><span 
class="na">Write</span><span class="o">.</span><span 
class="na">CreateDisposition</span><span class="o">.</span><span 
class="na">CREATE_IF_NEEDED</span><span class="o">)</span>
-      <span class="o">.</span><span 
class="na">withWriteDisposition</span><span class="o">(</span><span 
class="n">BigQueryIO</span><span class="o">.</span><span 
class="na">Write</span><span class="o">.</span><span 
class="na">WriteDisposition</span><span class="o">.</span><span 
class="na">WRITE_APPEND</span><span class="o">));</span>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code>  
<span class="n">wordCounts</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="n">MapElements</span><span 
class="o">.</span><span class="na">via</span><span class="o">(</span><span 
class="k">new</span> <span class="n">WordCount</span><span 
class="o">.</span><span class="na">FormatAsTextFn</span><span 
class="o">()))</span>
+      <span class="o">.</span><span class="na">apply</span><span 
class="o">(</span><span class="k">new</span> <span 
class="n">WriteOneFilePerWindow</span><span class="o">(</span><span 
class="n">output</span><span class="o">,</span> <span 
class="n">options</span><span class="o">.</span><span 
class="na">getNumShards</span><span class="o">()));</span>
 </code></pre>
 </div>
 

Reply via email to