Regenerate website
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/6ebcb08c Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/6ebcb08c Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/6ebcb08c Branch: refs/heads/asf-site Commit: 6ebcb08cb503a3e58101fc73be11649116111c65 Parents: f277339 Author: Frances Perry <f...@google.com> Authored: Wed Feb 8 10:53:36 2017 -0800 Committer: Frances Perry <f...@google.com> Committed: Wed Feb 8 10:53:36 2017 -0800 ---------------------------------------------------------------------- .../documentation/programming-guide/index.html | 327 ++++++++++++++++--- 1 file changed, 274 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/6ebcb08c/content/documentation/programming-guide/index.html ---------------------------------------------------------------------- diff --git a/content/documentation/programming-guide/index.html b/content/documentation/programming-guide/index.html index dee4869..9830735 100644 --- a/content/documentation/programming-guide/index.html +++ b/content/documentation/programming-guide/index.html @@ -208,7 +208,7 @@ <p><code class="highlighter-rouge">PCollection</code>: A <code class="highlighter-rouge">PCollection</code> represents a distributed data set that your Beam pipeline operates on. The data set can be <em>bounded</em>, meaning it comes from a fixed source like a file, or <em>unbounded</em>, meaning it comes from a continuously updating source via a subscription or other mechanism. Your pipeline typically creates an initial <code class="highlighter-rouge">PCollection</code> by reading data from an external data source, but you can also create a <code class="highlighter-rouge">PCollection</code> from in-memory data within your driver program. From there, <code class="highlighter-rouge">PCollection</code>s are the inputs and outputs for each step in your pipeline.</p> </li> <li> - <p><code class="highlighter-rouge">Transform</code>: A <code class="highlighter-rouge">Transform</code> represents a data processing operation, or a step, in your pipeline. Every <code class="highlighter-rouge">Transform</code> takes one or more <code class="highlighter-rouge">PCollection</code> objects as input, performs a processing function that you provide on the elements of that <code class="highlighter-rouge">PCollection</code>, and produces one or more output <code class="highlighter-rouge">PCollection</code> objects.</p> + <p><code class="highlighter-rouge">Transform</code>: A <code class="highlighter-rouge">Transform</code> represents a data processing operation, or a step, in your pipeline. Every <code class="highlighter-rouge">Transform</code> takes one or more <code class="highlighter-rouge">PCollection</code> objects as input, perfroms a processing function that you provide on the elements of that <code class="highlighter-rouge">PCollection</code>, and produces one or more output <code class="highlighter-rouge">PCollection</code> objects.</p> </li> <li> <p>I/O <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code>: Beam provides <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code> APIs to represent reading and writing data, respectively. <code class="highlighter-rouge">Source</code> encapsulates the code necessary to read data into your Beam pipeline from some external source, such as cloud file storage or a subscription to a streaming data source. <code class="highlighter-rouge">Sink</code> likewise encapsulates the code necessary to write the elements of a <code class="highlighter-rouge">PCollection</code> to an external data sink.</p> @@ -248,11 +248,13 @@ </code></pre> </div> -<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="kn">from</span> <span class="nn">apache_beam.utils.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span> - -<span class="c"># Will parse the arguments passed into the application and construct a PipelineOptions</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Will parse the arguments passed into the application and construct a PipelineOptions object.</span> <span class="c"># Note that --help will print registered options.</span> + +<span class="kn">from</span> <span class="nn">apache_beam.utils.pipeline_options</span> <span class="kn">import</span> <span class="n">PipelineOptions</span> + <span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">PipelineOptions</span><span class="p">())</span> + </code></pre> </div> @@ -286,13 +288,8 @@ </code></pre> </div> -<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="kn">import</span> <span class="nn">apache_beam</span> <span class="kn">as</span> <span class="nn">beam</span> - -<span class="c"># Create the pipeline.</span> -<span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">()</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'ReadMyFile'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromText</span><span class="p">(</span><span class="s">'gs://some/inputData.txt'</span><span class="p">)</span> -<span class="c"># Read the text file into a PCollection.</span> -<span class="n">lines</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'ReadMyFile'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">Read</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">TextFileSource</span><span class="p">(</span><span class="s">"protocol://path/to/some/inputData.txt"</span><span class="p">))</span> </code></pre> </div> @@ -327,20 +324,18 @@ </code></pre> </div> -<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="kn">import</span> <span class="nn">apache_beam</span> <span class="kn">as</span> <span class="nn">beam</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">pipeline_options</span><span class="p">)</span> -<span class="c"># python list</span> -<span class="n">lines</span> <span class="o">=</span> <span class="p">[</span> - <span class="s">"To be, or not to be: that is the question: "</span><span class="p">,</span> - <span class="s">"Whether 'tis nobler in the mind to suffer "</span><span class="p">,</span> - <span class="s">"The slings and arrows of outrageous fortune, "</span><span class="p">,</span> - <span class="s">"Or to take arms against a sea of troubles, "</span> -<span class="p">]</span> +<span class="p">(</span><span class="n">p</span> + <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">([</span> + <span class="s">'To be, or not to be: that is the question: '</span><span class="p">,</span> + <span class="s">'Whether </span><span class="se">\'</span><span class="s">tis nobler in the mind to suffer '</span><span class="p">,</span> + <span class="s">'The slings and arrows of outrageous fortune, '</span><span class="p">,</span> + <span class="s">'Or to take arms against a sea of troubles, '</span><span class="p">])</span> + <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">WriteToText</span><span class="p">(</span><span class="n">my_options</span><span class="o">.</span><span class="n">output</span><span class="p">))</span> -<span class="c"># Create the pipeline.</span> -<span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">()</span> +<span class="n">result</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> -<span class="n">collection</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'ReadMyLines'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">Create</span><span class="p">(</span><span class="n">lines</span><span class="p">)</span> </code></pre> </div> @@ -401,8 +396,8 @@ <p>How you apply your pipelineâs transforms determines the structure of your pipeline. The best way to think of your pipeline is as a directed acyclic graph, where the nodes are <code class="highlighter-rouge">PCollection</code>s and the edges are transforms. For example, you can chain transforms to create a sequential pipeline, like this one:</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">[</span><span class="n">Final</span> <span class="n">Output</span> <span class="n">PCollection</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Initial</span> <span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">First</span> <span class="n">Transform</span><span class="o">])</span> - <span class="o">.</span><span class="na">apply</span><span class="o">([</span><span class="n">Second</span> <span class="n">Transform</span><span class="o">])</span> - <span class="o">.</span><span class="na">apply</span><span class="o">([</span><span class="n">Third</span> <span class="n">Transform</span><span class="o">])</span> +<span class="o">.</span><span class="na">apply</span><span class="o">([</span><span class="n">Second</span> <span class="n">Transform</span><span class="o">])</span> +<span class="o">.</span><span class="na">apply</span><span class="o">([</span><span class="n">Third</span> <span class="n">Transform</span><span class="o">])</span> </code></pre> </div> @@ -418,8 +413,13 @@ <p>However, note that a transform <em>does not consume or otherwise alter</em> the input collectionâremember that a <code class="highlighter-rouge">PCollection</code> is immutable by definition. This means that you can apply multiple transforms to the same input <code class="highlighter-rouge">PCollection</code> to create a branching pipeline, like so:</p> -<div class="highlighter-rouge"><pre class="highlight"><code>[Output PCollection 1] = [Input PCollection].apply([Transform 1]) -[Output PCollection 2] = [Input PCollection].apply([Transform 2]) +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="o">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">1</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">Transform</span> <span class="mi">1</span><span class="o">])</span> +<span class="o">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">2</span><span class="o">]</span> <span class="o">=</span> <span class="o">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="o">].</span><span class="na">apply</span><span class="o">([</span><span class="n">Transform</span> <span class="mi">2</span><span class="o">])</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="p">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">|</span> <span class="p">[</span><span class="n">Transform</span> <span class="mi">1</span><span class="p">]</span> +<span class="p">[</span><span class="n">Output</span> <span class="n">PCollection</span> <span class="mi">2</span><span class="p">]</span> <span class="o">=</span> <span class="p">[</span><span class="n">Input</span> <span class="n">PCollection</span><span class="p">]</span> <span class="o">|</span> <span class="p">[</span><span class="n">Transform</span> <span class="mi">2</span><span class="p">]</span> </code></pre> </div> @@ -427,7 +427,7 @@ <p>[Branching Graph Graphic]</p> -<p>You can also build your own <a href="#transforms-composite">composite transforms</a> that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.</p> +<p>You can also build your own composite transforms that nest multiple sub-steps inside a single, larger transform. Composite transforms are particularly useful for building a reusable sequence of simple steps that get used in a lot of different places.</p> <h3 id="transforms-in-the-beam-sdk">Transforms in the Beam SDK</h3> @@ -488,12 +488,9 @@ <span class="c"># The DoFn to perform on each element in the input PCollection.</span> <span class="k">class</span> <span class="nc">ComputeWordLengthFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="c"># Get the input element from ProcessContext.</span> - <span class="n">word</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">element</span> - <span class="c"># Use return to emit the output element.</span> - <span class="k">return</span> <span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">word</span><span class="p">)]</span> - + <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> + <span class="k">return</span> <span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="p">)]</span> + <span class="c"># Apply a ParDo to the PCollection "words" to compute lengths for each word.</span> <span class="n">word_lengths</span> <span class="o">=</span> <span class="n">words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">ComputeWordLengthFn</span><span class="p">())</span> </code></pre> @@ -532,11 +529,9 @@ </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">class</span> <span class="nc">ComputeWordLengthFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span class="p">):</span> - <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> - <span class="c"># Get the input element from ProcessContext.</span> - <span class="n">word</span> <span class="o">=</span> <span class="n">context</span><span class="o">.</span><span class="n">element</span> - <span class="c"># Use return to emit the output element.</span> - <span class="k">return</span> <span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">word</span><span class="p">)]</span> + <span class="k">def</span> <span class="nf">process</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">element</span><span class="p">):</span> + <span class="k">return</span> <span class="p">[</span><span class="nb">len</span><span class="p">(</span><span class="n">element</span><span class="p">)]</span> + </code></pre> </div> @@ -780,7 +775,20 @@ tree, [2] <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># sum combines the elements in the input PCollection.</span> <span class="c"># The resulting PCollection, called result, contains one value: the sum of all the elements in the input PCollection.</span> <span class="n">pc</span> <span class="o">=</span> <span class="o">...</span> -<span class="n">result</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="nb">sum</span><span class="p">)</span> +<span class="k">class</span> <span class="nc">AverageFn</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">CombineFn</span><span class="p">):</span> + <span class="k">def</span> <span class="nf">create_accumulator</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> + <span class="k">return</span> <span class="p">(</span><span class="mf">0.0</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">add_input</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">),</span> <span class="nb">input</span><span class="p">):</span> + <span class="k">return</span> <span class="nb">sum</span> <span class="o">+</span> <span class="nb">input</span><span class="p">,</span> <span class="n">count</span> <span class="o">+</span> <span class="mi">1</span> + + <span class="k">def</span> <span class="nf">merge_accumulators</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">accumulators</span><span class="p">):</span> + <span class="n">sums</span><span class="p">,</span> <span class="n">counts</span> <span class="o">=</span> <span class="nb">zip</span><span class="p">(</span><span class="o">*</span><span class="n">accumulators</span><span class="p">)</span> + <span class="k">return</span> <span class="nb">sum</span><span class="p">(</span><span class="n">sums</span><span class="p">),</span> <span class="nb">sum</span><span class="p">(</span><span class="n">counts</span><span class="p">)</span> + + <span class="k">def</span> <span class="nf">extract_output</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="p">(</span><span class="nb">sum</span><span class="p">,</span> <span class="n">count</span><span class="p">)):</span> + <span class="k">return</span> <span class="nb">sum</span> <span class="o">/</span> <span class="n">count</span> <span class="k">if</span> <span class="n">count</span> <span class="k">else</span> <span class="nb">float</span><span class="p">(</span><span class="s">'NaN'</span><span class="p">)</span> +<span class="n">average</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="n">AverageFn</span><span class="p">())</span> </code></pre> </div> @@ -798,7 +806,6 @@ tree, [2] <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">pc</span> <span class="o">=</span> <span class="o">...</span> <span class="nb">sum</span> <span class="o">=</span> <span class="n">pc</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombineGlobally</span><span class="p">(</span><span class="nb">sum</span><span class="p">)</span><span class="o">.</span><span class="n">without_defaults</span><span class="p">()</span> - </code></pre> </div> @@ -847,6 +854,7 @@ tree, [2] <span class="n">avg_accuracy_per_player</span> <span class="o">=</span> <span class="p">(</span><span class="n">player_accuracies</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombinePerKey</span><span class="p">(</span> <span class="n">beam</span><span class="o">.</span><span class="n">combiners</span><span class="o">.</span><span class="n">MeanCombineFn</span><span class="p">()))</span> + </code></pre> </div> @@ -870,11 +878,14 @@ tree, [2] </div> <div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Flatten takes a tuple of PCollection objects.</span> -<span class="c"># Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.</span> +<span class="c"># Returns a single PCollection that contains all of the elements in the </span> <span class="n">merged</span> <span class="o">=</span> <span class="p">(</span> + <span class="c"># [START model_multiple_pcollections_tuple]</span> <span class="p">(</span><span class="n">pcoll1</span><span class="p">,</span> <span class="n">pcoll2</span><span class="p">,</span> <span class="n">pcoll3</span><span class="p">)</span> + <span class="c"># [END model_multiple_pcollections_tuple]</span> <span class="c"># A list of tuples can be "piped" directly into a Flatten transform.</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Flatten</span><span class="p">())</span> + </code></pre> </div> @@ -918,8 +929,10 @@ tree, [2] <span class="n">by_decile</span> <span class="o">=</span> <span class="n">students</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Partition</span><span class="p">(</span><span class="n">partition_fn</span><span class="p">,</span> <span class="mi">10</span><span class="p">)</span> + <span class="c"># You can extract each partition from the tuple of PCollection objects as follows:</span> <span class="n">fortieth_percentile</span> <span class="o">=</span> <span class="n">by_decile</span><span class="p">[</span><span class="mi">4</span><span class="p">]</span> + </code></pre> </div> @@ -1213,7 +1226,7 @@ tree, [2] </code></pre> </div> -<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">pipeline</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromText</span><span class="p">(</span><span class="s">'gs://some/inputData.txt'</span><span class="p">)</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">pipeline</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromText</span><span class="p">(</span><span class="s">'gs://some/inputData.txt'</span><span class="p">)</span> </code></pre> </div> @@ -1227,7 +1240,7 @@ tree, [2] </code></pre> </div> -<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">output</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">WriteToText</span><span class="p">(</span><span class="s">'gs://some/outputData'</span><span class="p">)</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">output</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">WriteToText</span><span class="p">(</span><span class="s">'gs://some/outputData'</span><span class="p">)</span> </code></pre> </div> @@ -1242,9 +1255,8 @@ tree, [2] </code></pre> </div> -<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">Read</span><span class="p">(</span> - <span class="s">'ReadFromText'</span><span class="p">,</span> - <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">TextFileSource</span><span class="p">(</span><span class="s">'protocol://my_bucket/path/to/input-*.csv'</span><span class="p">))</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">lines</span> <span class="o">=</span> <span class="n">p</span> <span class="o">|</span> <span class="s">'ReadFromText'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">ReadFromText</span><span class="p">(</span><span class="s">'path/to/input-*.csv'</span><span class="p">)</span> + </code></pre> </div> @@ -1262,8 +1274,9 @@ tree, [2] </code></pre> </div> -<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">filtered_words</span> <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">WriteToText</span><span class="p">(</span> -<span class="s">'protocol://my_bucket/path/to/numbers'</span><span class="p">,</span> <span class="n">file_name_suffix</span><span class="o">=</span><span class="s">'.csv'</span><span class="p">)</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">filtered_words</span> <span class="o">|</span> <span class="s">'WriteToText'</span> <span class="o">>></span> <span class="n">beam</span><span class="o">.</span><span class="n">io</span><span class="o">.</span><span class="n">WriteToText</span><span class="p">(</span> + <span class="s">'/path/to/numbers'</span><span class="p">,</span> <span class="n">file_name_suffix</span><span class="o">=</span><span class="s">'.csv'</span><span class="p">)</span> + </code></pre> </div> @@ -1324,23 +1337,231 @@ tree, [2] </code></pre> </div> -<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">pipeline</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">pipeline</span><span class="o">.</span><span class="n">run</span><span class="p">()</span> </code></pre> </div> -<p>For blocking execution, append the <code class="highlighter-rouge">waitUntilFinish</code> method:</p> +<p>For blocking execution, append the <span class="language-java"><code class="highlighter-rouge">waitUntilFinish</code></span> <span class="language-py"><code class="highlighter-rouge">wait_until_finish</code></span> method:</p> <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">pipeline</span><span class="o">.</span><span class="na">run</span><span class="o">().</span><span class="na">waitUntilFinish</span><span class="o">();</span> </code></pre> </div> -<div class="language-python highlighter-rouge"><pre class="highlight"><code><span class="n">pipeline</span><span class="o">.</span><span class="n">run</span><span class="p">()</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span> +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">pipeline</span><span class="o">.</span><span class="n">run</span><span class="p">()</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span> +</code></pre> +</div> + +<h2 id="a-namecodersadata-encoding-and-type-safety"><a name="coders"></a>Data encoding and type safety</h2> + +<p>When you create or output pipeline data, youâll need to specify how the elements in your <code class="highlighter-rouge">PCollection</code>s are encoded and decoded to and from byte strings. Byte strings are used for intermediate storage as well reading from sources and writing to sinks. The Beam SDKs use objects called coders to describe how the elements of a given <code class="highlighter-rouge">PCollection</code> should be encoded and decoded.</p> + +<h3 id="using-coders">Using coders</h3> + +<p>You typically need to specify a coder when reading data into your pipeline from an external source (or creating pipeline data from local data), and also when you output pipeline data to an external sink.</p> + +<p class="language-java">In the Beam SDK for Java, the type <code class="highlighter-rouge">Coder</code> provides the methods required for encoding and decoding data. The SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8 and more. You can find all of the available Coder subclasses in the <a href="https://github.com/apache/beam/tree/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders">Coder package</a>.</p> + +<p class="language-py">In the Beam SDK for Python, the type <code class="highlighter-rouge">Coder</code> provides the methods required for encoding and decoding data. The SDK for Python provides a number of Coder subclasses that work with a variety of standard Python types, such as primitive types, Tuple, Iterable, StringUtf8 and more. You can find all of the available Coder subclasses in the <a href="https://github.com/apache/beam/tree/master/sdks/python/apache_beam/coders">apache_beam.coders</a> package.</p> + +<p>When you read data into a pipeline, the coder indicates how to interpret the input data into a language-specific type, such as integer or string. Likewise, the coder indicates how the language-specific types in your pipeline should be written into byte strings for an output data sink, or to materialize intermediate data in your pipeline.</p> + +<p>The Beam SDKs set a coder for every <code class="highlighter-rouge">PCollection</code> in a pipeline, including those generated as output from a transform. Most of the time, the Beam SDKs can automatically infer the correct coder for an output <code class="highlighter-rouge">PCollection</code>.</p> + +<blockquote> + <p>Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder.</p> +</blockquote> + +<p>You can explicitly set a <code class="highlighter-rouge">Coder</code> when inputting or outputting a <code class="highlighter-rouge">PCollection</code>. You set the <code class="highlighter-rouge">Coder</code> by <span class="language-java">calling the method <code class="highlighter-rouge">.withCoder</code></span> <span class="language-py">setting the <code class="highlighter-rouge">coder</code> argument</span> when you apply your pipelineâs read or write transform.</p> + +<p>Typically, you set the <code class="highlighter-rouge">Coder</code> when the coder for a <code class="highlighter-rouge">PCollection</code> cannot be automatically inferred, or when you want to use a different coder than your pipelineâs default. The following example code reads a set of numbers from a text file, and sets a <code class="highlighter-rouge">Coder</code> of type <span class="language-java"><code class="highlighter-rouge">TextualIntegerCoder</code></span> <span class="language-py"><code class="highlighter-rouge">VarIntCoder</code></span> for the resulting <code class="highlighter-rouge">PCollection</code>:</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o"><</span><span class="n">Integer</span><span class="o">></span> <span class="n">numbers</span> <span class="o">=</span> + <span class="n">p</span><span class="o">.</span><span class="na">begin</span><span class="o">()</span> + <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">Read</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"ReadNumbers"</span><span class="o">)</span> + <span class="o">.</span><span class="na">from</span><span class="o">(</span><span class="s">"gs://my_bucket/path/to/numbers-*.txt"</span><span class="o">)</span> + <span class="o">.</span><span class="na">withCoder</span><span class="o">(</span><span class="n">TextualIntegerCoder</span><span class="o">.</span><span class="na">of</span><span class="o">()));</span><span class="err">```</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">()</span> +<span class="n">numbers</span> <span class="o">=</span> <span class="n">ReadFromText</span><span class="p">(</span><span class="s">"gs://my_bucket/path/to/numbers-*.txt"</span><span class="p">,</span> <span class="n">coder</span><span class="o">=</span><span class="n">VarIntCoder</span><span class="p">())</span> +</code></pre> +</div> + +<p class="language-java">You can set the coder for an existing <code class="highlighter-rouge">PCollection</code> by using the method <code class="highlighter-rouge">PCollection.setCoder</code>. Note that you cannot call <code class="highlighter-rouge">setCoder</code> on a <code class="highlighter-rouge">PCollection</code> that has been finalized (e.g. by calling <code class="highlighter-rouge">.apply</code> on it).</p> + +<p class="language-java">You can get the coder for an existing <code class="highlighter-rouge">PCollection</code> by using the method <code class="highlighter-rouge">getCoder</code>. This method will fail with <code class="highlighter-rouge">anIllegalStateException</code> if a coder has not been set and cannot be inferred for the given <code class="highlighter-rouge">PCollection</code>.</p> + +<h3 id="coder-inference-and-default-coders">Coder inference and default coders</h3> + +<p>The Beam SDKs require a coder for every <code class="highlighter-rouge">PCollection</code> in your pipeline. Most of the time, however, you do not need to explicitly specify a coder, such as for an intermediate <code class="highlighter-rouge">PCollection</code> produced by a transform in the middle of your pipeline. In such cases, the Beam SDKs can infer an appropriate coder from the inputs and outputs of the transform used to produce the PCollection.</p> + +<p class="language-java">Each pipeline object has a <code class="highlighter-rouge">CoderRegistry</code>. The <code class="highlighter-rouge">CoderRegistry</code> represents a mapping of Java types to the default coders that the pipeline should use for <code class="highlighter-rouge">PCollection</code>s of each type.</p> + +<p class="language-py">The Beam SDK for Python has a <code class="highlighter-rouge">CoderRegistry</code> that represents a mapping of Python types to the default coder that should be used for <code class="highlighter-rouge">PCollection</code>s of each type.</p> + +<p class="language-java">By default, the Beam SDK for Java automatically infers the <code class="highlighter-rouge">Coder</code> for the elements of an output <code class="highlighter-rouge">PCollection</code> using the type parameter from the transformâs function object, such as <code class="highlighter-rouge">DoFn</code>. In the case of <code class="highlighter-rouge">ParDo</code>, for example, a <code class="highlighter-rouge">DoFn<Integer, String>function</code> object accepts an input element of type <code class="highlighter-rouge">Integer</code> and produces an output element of type <code class="highlighter-rouge">String</code>. In such a case, the SDK for Java will automatically infer the default <code class="highlighter-rouge">Coder</code> for the output <code class="highlighter-rouge">PCollection<String></code> (in the default pipeline <code class="highlighter-rouge">CoderRegistry</code>, this is <code class="highlighter-rouge">StringUtf8Coder</code>).</p> + +<p class="language-py">By default, the Beam SDK for Python automatically infers the <code class="highlighter-rouge">Coder</code> for the elements of an output <code class="highlighter-rouge">PCollection</code> using the typehints from the transformâs function object, such as <code class="highlighter-rouge">DoFn</code>. In the case of <code class="highlighter-rouge">ParDo</code>, for example a <code class="highlighter-rouge">DoFn</code> with the typehints <code class="highlighter-rouge">@beam.typehints.with_input_types(int)</code> and <code class="highlighter-rouge">@beam.typehints.with_output_types(str)</code> accepts an input element of type int and produces an output element of type str. In such a case, the Beam SDK for Python will automatically infer the default <code class="highlighter-rouge">Coder</code> for the output <code class="highlighter-rouge">PCollection</code> (in the default pipeline <code class="highlighter-rouge">CoderRegistry</code>, this is <code class="highligh ter-rouge">BytesCoder</code>).</p> + +<blockquote> + <p>NOTE: If you create your <code class="highlighter-rouge">PCollection</code> from in-memory data by using the <code class="highlighter-rouge">Create</code> transform, you cannot rely on coder inference and default coders. <code class="highlighter-rouge">Create</code> does not have access to any typing information for its arguments, and may not be able to infer a coder if the argument list contains a value whose exact run-time class doesnât have a default coder registered.</p> +</blockquote> + +<p class="language-java">When using <code class="highlighter-rouge">Create</code>, the simplest way to ensure that you have the correct coder is by invoking <code class="highlighter-rouge">withCoder</code> when you apply the <code class="highlighter-rouge">Create</code> transform.</p> + +<h4 id="default-coders-and-the-coderregistry">Default coders and the CoderRegistry</h4> + +<p>Each Pipeline object has a <code class="highlighter-rouge">CoderRegistry</code> object, which maps language types to the default coder the pipeline should use for those types. You can use the <code class="highlighter-rouge">CoderRegistry</code> yourself to look up the default coder for a given type, or to register a new default coder for a given type.</p> + +<p><code class="highlighter-rouge">CoderRegistry</code> contains a default mapping of coders to standard <span class="language-java">Java</span> <span class="language-py">Python</span> types for any pipeline you create using the Beam SDK for <span class="language-java">Java</span> <span class="language-py">Python</span>. The following table shows the standard mapping:</p> + +<table class="language-java"> + <thead> + <tr class="header"> + <th>Java Type</th> + <th>Default Coder</th> + </tr> + </thead> + <tbody> + <tr class="odd"> + <td>Double</td> + <td>DoubleCoder</td> + </tr> + <tr class="even"> + <td>Instant</td> + <td>InstantCoder</td> + </tr> + <tr class="odd"> + <td>Integer</td> + <td>VarIntCoder</td> + </tr> + <tr class="even"> + <td>Iterable</td> + <td>IterableCoder</td> + </tr> + <tr class="odd"> + <td>KV</td> + <td>KvCoder</td> + </tr> + <tr class="even"> + <td>List</td> + <td>ListCoder</td> + </tr> + <tr class="odd"> + <td>Map</td> + <td>MapCoder</td> + </tr> + <tr class="even"> + <td>Long</td> + <td>VarLongCoder</td> + </tr> + <tr class="odd"> + <td>String</td> + <td>StringUtf8Coder</td> + </tr> + <tr class="even"> + <td>TableRow</td> + <td>TableRowJsonCoder</td> + </tr> + <tr class="odd"> + <td>Void</td> + <td>VoidCoder</td> + </tr> + <tr class="even"> + <td>byte[ ]</td> + <td>ByteArrayCoder</td> + </tr> + <tr class="odd"> + <td>TimestampedValue</td> + <td>TimestampedValueCoder</td> + </tr> + </tbody> +</table> + +<table class="language-py"> + <thead> + <tr class="header"> + <th>Python Type</th> + <th>Default Coder</th> + </tr> + </thead> + <tbody> + <tr class="odd"> + <td>int</td> + <td>VarIntCoder</td> + </tr> + <tr class="even"> + <td>float</td> + <td>FloatCoder</td> + </tr> + <tr class="odd"> + <td>str</td> + <td>BytesCoder</td> + </tr> + <tr class="even"> + <td>bytes</td> + <td>StrUtf8Coder</td> + </tr> + <tr class="odd"> + <td>Tuple</td> + <td>TupleCoder</td> + </tr> + </tbody> +</table> + +<h5 id="looking-up-a-default-coder">Looking up a default coder</h5> + +<p class="language-java">You can use the method <code class="highlighter-rouge">CoderRegistry.getDefaultCoder</code> to determine the default Coder for a Java type. You can access the <code class="highlighter-rouge">CoderRegistry</code> for a given pipeline by using the method <code class="highlighter-rouge">Pipeline.getCoderRegistry</code>. This allows you to determine (or set) the default Coder for a Java type on a per-pipeline basis: i.e. âfor this pipeline, verify that Integer values are encoded using <code class="highlighter-rouge">BigEndianIntegerCoder</code>.â</p> + +<p class="language-py">You can use the method <code class="highlighter-rouge">CoderRegistry.get_coder</code> to determine the default Coder for a Python type. You can use <code class="highlighter-rouge">coders.registry</code> to access the <code class="highlighter-rouge">CoderRegistry</code>. This allows you to determine (or set) the default Coder for a Python type.</p> + +<h5 id="setting-the-default-coder-for-a-type">Setting the default coder for a type</h5> + +<p>To set the default Coder for a <span class="language-java">Java</span> <span class="language-py">Python</span> type for a particular pipeline, you obtain and modify the pipelineâs <code class="highlighter-rouge">CoderRegistry</code>. You use the method <span class="language-java"><code class="highlighter-rouge">Pipeline.getCoderRegistry</code></span> <span class="language-py"><code class="highlighter-rouge">coders.registry</code></span> to get the <code class="highlighter-rouge">CoderRegistry</code> object, and then use the method <span class="language-java"><code class="highlighter-rouge">CoderRegistry.registerCoder</code></span> <span class="language-py"><code class="highlighter-rouge">CoderRegistry.register_coder</code></span> to register a new <code class="highlighter-rouge">Coder</code> for the target type.</p> + +<p>The following example code demonstrates how to set a default Coder, in this case <code class="highlighter-rouge">BigEndianIntegerCoder</code>, for <span class="language-java">Integer</span> <span class="language-py">int</span> values for a pipeline.</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">create</span><span class="o">();</span> +<span class="n">Pipeline</span> <span class="n">p</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> + +<span class="n">CoderRegistry</span> <span class="n">cr</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="na">getCoderRegistry</span><span class="o">();</span> +<span class="n">cr</span><span class="o">.</span><span class="na">registerCoder</span><span class="o">(</span><span class="n">Integer</span><span class="o">.</span><span class="na">class</span><span class="o">,</span> <span class="n">BigEndianIntegerCoder</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> +</code></pre> +</div> + +<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="n">apache_beam</span><span class="o">.</span><span class="n">coders</span><span class="o">.</span><span class="n">registry</span><span class="o">.</span><span class="n">register_coder</span><span class="p">(</span><span class="nb">int</span><span class="p">,</span> <span class="n">BigEndianIntegerCoder</span><span class="p">)</span> </code></pre> </div> -<p><a name="transforms-composite"></a> -<a name="coders"></a> -<a name="windowing"></a> +<h5 id="annotating-a-custom-data-type-with-a-default-coder">Annotating a custom data type with a default coder</h5> + +<p class="language-java">If your pipeline program defines a custom data type, you can use the <code class="highlighter-rouge">@DefaultCoder</code> annotation to specify the coder to use with that type. For example, letâs say you have a custom data type for which you want to use <code class="highlighter-rouge">SerializableCoder</code>. You can use the <code class="highlighter-rouge">@DefaultCoder</code> annotation as follows:</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="nd">@DefaultCoder</span><span class="o">(</span><span class="n">AvroCoder</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomDataType</span> <span class="o">{</span> + <span class="o">...</span> +<span class="o">}</span> +</code></pre> +</div> + +<p class="language-java">If youâve created a custom coder to match your data type, and you want to use the <code class="highlighter-rouge">@DefaultCoder</code> annotation, your coder class must implement a static <code class="highlighter-rouge">Coder.of(Class<T>)</code> factory method.</p> + +<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomCoder</span> <span class="kd">implements</span> <span class="n">Coder</span> <span class="o">{</span> + <span class="kd">public</span> <span class="kd">static</span> <span class="n">Coder</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="nf">of</span><span class="o">(</span><span class="n">Class</span><span class="o"><</span><span class="n">T</span><span class="o">></span> <span class="n">clazz</span><span class="o">)</span> <span class="o">{...}</span> + <span class="o">...</span> +<span class="o">}</span> + +<span class="nd">@DefaultCoder</span><span class="o">(</span><span class="n">MyCustomCoder</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> +<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomDataType</span> <span class="o">{</span> + <span class="o">...</span> +<span class="o">}</span> +</code></pre> +</div> + +<p class="language-py">The Beam SDK for Python does not support annotating data types with a default coder. If you would like to set a default coder, use the method described in the previous section, <em>Setting the default coder for a type</em>.</p> + +<p><a name="windowing"></a> <a name="triggers"></a></p> <blockquote>