Regenerate website

Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/83a6e401
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/83a6e401
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/83a6e401

Branch: refs/heads/asf-site
Commit: 83a6e4011f7eae8003fe85a7c51cb824440cffa4
Parents: a7e8b60
Author: Ahmet Altay <al...@google.com>
Authored: Fri Feb 17 13:17:53 2017 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Feb 17 13:17:53 2017 -0800

----------------------------------------------------------------------
 .../sdks/python-custom-io/index.html            | 613 +++++++++++++++++++
 content/documentation/sdks/python/index.html    |   5 +
 2 files changed, 618 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/83a6e401/content/documentation/sdks/python-custom-io/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/sdks/python-custom-io/index.html 
b/content/documentation/sdks/python-custom-io/index.html
new file mode 100644
index 0000000..c43f606
--- /dev/null
+++ b/content/documentation/sdks/python-custom-io/index.html
@@ -0,0 +1,613 @@
+<!DOCTYPE html>
+<html lang="en">
+
+  <head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1">
+
+  <title>Beam Custom Sources and Sinks for Python</title>
+  <meta name="description" content="Apache Beam is an open source, unified 
model and set of language-specific SDKs for defining and executing data 
processing workflows, and also data ingestion and integration flows, supporting 
Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). 
Dataflow pipelines simplify the mechanics of large-scale batch and streaming 
data processing and can run on a number of runtimes like Apache Flink, Apache 
Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in 
different languages, allowing users to easily implement their data integration 
processes.
+">
+
+  <link rel="stylesheet" href="/styles/site.css">
+  <link rel="stylesheet" href="/css/theme.css">
+  <script 
src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js";></script>
+  <script src="/js/bootstrap.min.js"></script>
+  <script src="/js/language-switch.js"></script>
+  <link rel="canonical" 
href="https://beam.apache.org/documentation/sdks/python-custom-io/"; 
data-proofer-ignore>
+  <link rel="alternate" type="application/rss+xml" title="Apache Beam" 
href="https://beam.apache.org/feed.xml";>
+  <script>
+    
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+    (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new 
Date();a=s.createElement(o),
+    
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+    
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+
+    ga('create', 'UA-73650088-1', 'auto');
+    ga('send', 'pageview');
+
+  </script>
+  <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
+</head>
+
+
+  <body role="document">
+
+    <nav class="navbar navbar-default navbar-fixed-top">
+  <div class="container">
+    <div class="navbar-header">
+      <a href="/" class="navbar-brand" >
+        <img alt="Brand" style="height: 25px" 
src="/images/beam_logo_navbar.png">
+      </a>
+      <button type="button" class="navbar-toggle collapsed" 
data-toggle="collapse" data-target="#navbar" aria-expanded="false" 
aria-controls="navbar">
+        <span class="sr-only">Toggle navigation</span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+        <span class="icon-bar"></span>
+      </button>
+    </div>
+    <div id="navbar" class="navbar-collapse collapse">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Get Started <span 
class="caret"></span></a>
+                 <ul class="dropdown-menu">
+                         <li><a href="/get-started/beam-overview/">Beam 
Overview</a></li>
+        <li><a href="/get-started/quickstart-java/">Quickstart - Java</a></li>
+        <li><a href="/get-started/quickstart-py/">Quickstart - Python</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Example Walkthroughs</li>
+                         <li><a 
href="/get-started/wordcount-example/">WordCount</a></li>
+                         <li><a 
href="/get-started/mobile-gaming-example/">Mobile Gaming</a></li>
+              <li role="separator" class="divider"></li>
+              <li class="dropdown-header">Resources</li>
+              <li><a href="/get-started/downloads">Downloads</a></li>
+              <li><a href="/get-started/support">Support</a></li>
+                 </ul>
+           </li>
+        <li class="dropdown">
+                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Documentation <span 
class="caret"></span></a>
+                 <ul class="dropdown-menu">
+                         <li><a href="/documentation">Using the 
Documentation</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Beam Concepts</li>
+                         <li><a 
href="/documentation/programming-guide/">Programming Guide</a></li>
+                         <li><a href="/documentation/resources/">Additional 
Resources</a></li>
+                         <li role="separator" class="divider"></li>
+              <li class="dropdown-header">Pipeline Fundamentals</li>
+              <li><a 
href="/documentation/pipelines/design-your-pipeline/">Design Your 
Pipeline</a></li>
+              <li><a 
href="/documentation/pipelines/create-your-pipeline/">Create Your 
Pipeline</a></li>
+              <li><a href="/documentation/pipelines/test-your-pipeline/">Test 
Your Pipeline</a></li>
+              <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">SDKs</li>
+                         <li><a href="/documentation/sdks/java/">Java 
SDK</a></li>
+                         <li><a href="/documentation/sdks/javadoc/0.5.0/" 
target="_blank">Java SDK API Reference <img src="/images/external-link-icon.png"
+                 width="14" height="14"
+                 alt="External link."></a>
+        </li>
+        <li><a href="/documentation/sdks/python/">Python SDK</a></li>
+                         <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Runners</li>
+                         <li><a 
href="/documentation/runners/capability-matrix/">Capability Matrix</a></li>
+                         <li><a href="/documentation/runners/direct/">Direct 
Runner</a></li>
+                         <li><a href="/documentation/runners/apex/">Apache 
Apex Runner</a></li>
+                         <li><a href="/documentation/runners/flink/">Apache 
Flink Runner</a></li>
+                         <li><a href="/documentation/runners/spark/">Apache 
Spark Runner</a></li>
+                         <li><a href="/documentation/runners/dataflow/">Cloud 
Dataflow Runner</a></li>
+                 </ul>
+           </li>
+        <li class="dropdown">
+                 <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Contribute <span 
class="caret"></span></a>
+                 <ul class="dropdown-menu">
+                         <li><a href="/contribute">Get Started 
Contributing</a></li>
+        <li role="separator" class="divider"></li>
+        <li class="dropdown-header">Guides</li>
+                         <li><a 
href="/contribute/contribution-guide/">Contribution Guide</a></li>
+        <li><a href="/contribute/testing/">Testing Guide</a></li>
+        <li><a href="/contribute/release-guide/">Release Guide</a></li>
+        <li><a href="/contribute/ptransform-style-guide/">PTransform Style 
Guide</a></li>
+        <li role="separator" class="divider"></li>
+        <li class="dropdown-header">Technical References</li>
+        <li><a href="/contribute/design-principles/">Design Principles</a></li>
+                         <li><a href="/contribute/work-in-progress/">Ongoing 
Projects</a></li>
+        <li><a href="/contribute/source-repository/">Source 
Repository</a></li>      
+        <li role="separator" class="divider"></li>
+                         <li class="dropdown-header">Promotion</li>
+        <li><a href="/contribute/presentation-materials/">Presentation 
Materials</a></li>
+        <li><a href="/contribute/logos/">Logos and Design</a></li>
+        <li role="separator" class="divider"></li>
+        <li><a href="/contribute/maturity-model/">Maturity Model</a></li>
+        <li><a href="/contribute/team/">Team</a></li>
+                 </ul>
+           </li>
+
+        <li><a href="/blog">Blog</a></li>
+      </ul>
+      <ul class="nav navbar-nav navbar-right">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false"><img 
src="https://www.apache.org/foundation/press/kit/feather_small.png"; alt="Apache 
Logo" style="height:24px;">Apache Software Foundation<span 
class="caret"></span></a>
+          <ul class="dropdown-menu dropdown-menu-right">
+            <li><a href="http://www.apache.org/";>ASF Homepage</a></li>
+            <li><a href="http://www.apache.org/licenses/";>License</a></li>
+            <li><a href="http://www.apache.org/security/";>Security</a></li>
+            <li><a 
href="http://www.apache.org/foundation/thanks.html";>Thanks</a></li>
+            <li><a 
href="http://www.apache.org/foundation/sponsorship.html";>Sponsorship</a></li>
+            <li><a 
href="https://www.apache.org/foundation/policies/conduct";>Code of 
Conduct</a></li>
+          </ul>
+        </li>
+      </ul>
+    </div><!--/.nav-collapse -->
+  </div>
+</nav>
+
+
+<link rel="stylesheet" href="">
+
+
+    <div class="container" role="main">
+
+      <div class="row">
+        <h1 id="beam-custom-sources-and-sinks-for-python">Beam Custom Sources 
and Sinks for Python</h1>
+
+<p>The Beam SDK for Python provides an extensible API that you can use to 
create custom data sources and sinks. This tutorial shows how to create custom 
sources and sinks using <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py";>Beam’s
 Source and Sink API</a>.</p>
+
+<ul>
+  <li>Create a custom source by extending the <code 
class="highlighter-rouge">BoundedSource</code> and <code 
class="highlighter-rouge">RangeTracker</code> interfaces.</li>
+  <li>Create a custom sink by implementing the <code 
class="highlighter-rouge">Sink</code> and <code 
class="highlighter-rouge">Writer</code> classes.</li>
+</ul>
+
+<h2 id="why-create-a-custom-source-or-sink">Why Create a Custom Source or 
Sink</h2>
+
+<p>You’ll need to create a custom source or sink if you want your pipeline 
to read data from (or write data to) a storage system for which the Beam SDK 
for Python does not provide <a 
href="/documentation/programming-guide/#io">native support</a>.</p>
+
+<p>In simple cases, you may not need to create a custom source or sink. For 
example, if you need to read data from an SQL database using an arbitrary 
query, none of the advanced Source API features would benefit you. Likewise, if 
you’d like to write data to a third-party API via a protocol that lacks 
deduplication support, the Sink API wouldn’t benefit you. In such cases it 
makes more sense to use a <code class="highlighter-rouge">ParDo</code>.</p>
+
+<p>However, if you’d like to use advanced features such as dynamic splitting 
and size estimation, you should use Beam’s APIs and create a custom source or 
sink.</p>
+
+<h2 
id="a-namebasic-code-reqsabasic-code-requirements-for-custom-sources-and-sinks"><a
 name="basic-code-reqs"></a>Basic Code Requirements for Custom Sources and 
Sinks</h2>
+
+<p>Services use the classes you provide to read and/or write data using 
multiple worker instances in parallel. As such, the code you provide for <code 
class="highlighter-rouge">Source</code> and <code 
class="highlighter-rouge">Sink</code> subclasses must meet some basic 
requirements:</p>
+
+<h3 id="serializability">Serializability</h3>
+
+<p>Your <code class="highlighter-rouge">Source</code> or <code 
class="highlighter-rouge">Sink</code> subclass must be serializable. The 
service may create multiple instances of your <code 
class="highlighter-rouge">Source</code> or <code 
class="highlighter-rouge">Sink</code> subclass to be sent to multiple remote 
workers to facilitate reading or writing in parallel. Note that the 
<em>way</em> the source and sink objects are serialized is runner specific.</p>
+
+<h3 id="immutability">Immutability</h3>
+
+<p>Your <code class="highlighter-rouge">Source</code> or <code 
class="highlighter-rouge">Sink</code> subclass must be effectively immutable. 
You should only use mutable state in your <code 
class="highlighter-rouge">Source</code> or <code 
class="highlighter-rouge">Sink</code> subclass if you are using lazy evaluation 
of expensive computations that you need to implement the source.</p>
+
+<h3 id="thread-safety">Thread-Safety</h3>
+
+<p>Your code must be thread-safe. The Beam SDK for Python provides the <code 
class="highlighter-rouge">RangeTracker</code> class to make this easier.</p>
+
+<h3 id="testability">Testability</h3>
+
+<p>It is critical to exhaustively unit-test all of your <code 
class="highlighter-rouge">Source</code> and <code 
class="highlighter-rouge">Sink</code> subclasses. A minor implementation error 
can lead to data corruption or data loss (such as skipping or duplicating 
records) that can be hard to detect.</p>
+
+<p>You can use test harnesses and utility methods available in the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/source_test_utils.py";>source_test_utils
 module</a> to develop tests for your source.</p>
+
+<h2 id="a-namecreating-sourcesacreating-a-custom-source"><a 
name="creating-sources"></a>Creating a Custom Source</h2>
+
+<p>You should create a custom source if you’d like to use the advanced 
features that the Source API provides:</p>
+
+<ul>
+  <li>Dynamic splitting</li>
+  <li>Progress estimation</li>
+  <li>Size estimation</li>
+  <li>Splitting into parts of particular size recommended by the service</li>
+</ul>
+
+<p>For example, if you’d like to read from a new file format that contains 
many records per file, or if you’d like to read from a key-value store that 
supports read operations in sorted key order.</p>
+
+<p>To create a custom data source for your pipeline, you’ll need to provide 
the format-specific logic that tells the service how to read data from your 
input source, and how to split your data source into multiple parts so that 
multiple worker instances can read your data in parallel.</p>
+
+<p>You supply the logic for your custom source by creating the following 
classes:</p>
+
+<ul>
+  <li>A subclass of <code class="highlighter-rouge">BoundedSource</code>, 
which you can find in the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py";>iobase.py</a>
 module. <code class="highlighter-rouge">BoundedSource</code> is a source that 
reads a finite amount of input records. The class describes the data you want 
to read, including the data’s location and parameters (such as how much data 
to read).</li>
+  <li>A subclass of <code class="highlighter-rouge">RangeTracker</code>, which 
you can find in the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py";>iobase.py</a>
 module. <code class="highlighter-rouge">RangeTracker</code> is a thread-safe 
object used to manage a range for a given position type.</li>
+</ul>
+
+<h3 id="implementing-the-boundedsource-subclass">Implementing the 
BoundedSource Subclass</h3>
+
+<p><code class="highlighter-rouge">BoundedSource</code> represents a finite 
data set from which the service reads, possibly in parallel. <code 
class="highlighter-rouge">BoundedSource</code> contains a set of methods that 
the service uses to split the data set for reading by multiple remote 
workers.</p>
+
+<p>To implement a <code class="highlighter-rouge">BoundedSource</code>, your 
subclass must override the following methods:</p>
+
+<ul>
+  <li>
+    <p><code class="highlighter-rouge">estimate_size</code>: Services use this 
method to estimate the <em>total size</em> of your data, in bytes. This 
estimate is in terms of external storage size, before performing decompression 
or other processing.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">split</code>: Service use this method 
to split your finite data into bundles of a given size.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">get_range_tracker</code>: Services use 
this method to get the <code class="highlighter-rouge">RangeTracker</code> for 
a given position range, and use the information to report progress and perform 
dynamic splitting of sources.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">read</code>: This method returns an 
iterator that reads data from the source, with respect to the boundaries 
defined by the given <code class="highlighter-rouge">RangeTracker</code> 
object.</p>
+  </li>
+</ul>
+
+<h3 id="implementing-the-rangetracker-subclass">Implementing the RangeTracker 
Subclass</h3>
+
+<p>A <code class="highlighter-rouge">RangeTracker</code> is a thread-safe 
object used to manage the current range and current position of the reader of a 
<code class="highlighter-rouge">BoundedSource</code> and protect concurrent 
access to them.</p>
+
+<p>To implement a <code class="highlighter-rouge">RangeTracker</code>, you 
should first familiarize yourself with the following definitions:</p>
+
+<ul>
+  <li>
+    <p><strong>Position-based sources</strong> - A position-based source can 
be described by a range of positions of an ordered type, and the records read 
by the source can be described by positions of that type. For example, for a 
record within a file, the position can be the starting byte offset of the 
record. The position type for the record in this case is <code 
class="highlighter-rouge">long</code>.</p>
+
+    <p>The main requirement for position-based sources is 
<strong>associativity</strong>: Reading records in position range ‘[A, B)’ 
and records in position range ‘[B, C)’ should give the same records as 
reading records in position range ‘[A, C)’, where ‘A’ &lt;= ‘B’ 
&lt;= ‘C’.  This property ensures that no matter how many arbitrary 
sub-ranges a range of positions is split into, the total set of records they 
describe stays the same.</p>
+
+    <p>The other important property is how the source’s range relates to 
positions of records in the source. In many sources each record can be 
identified by a unique starting position. In this case:</p>
+
+    <ul>
+      <li>All records returned by a source ‘[A, B)’ must have starting 
positions in this range.</li>
+      <li>All but the last record should end within this range. The last 
record may or may not extend past the end of the range.</li>
+      <li>Records must not overlap.</li>
+    </ul>
+
+    <p>Such sources should define “read ‘[A, B)’” as “read from the 
first record starting at or after ‘A’, up to but not including the first 
record starting at or after ‘B’”.</p>
+
+    <p>Some examples of such sources include reading lines or CSV from a text 
file, reading keys and values from a database, etc.</p>
+
+    <p>The concept of <em>split points</em> allows to extend the definitions 
for dealing with sources where some records cannot be identified by a unique 
starting position.</p>
+  </li>
+  <li>
+    <p><strong>Split points</strong> - A split point describes a record that 
is the first one returned when reading the range from and including position 
<strong>A</strong> up to infinity (i.e. [A, infinity)).</p>
+
+    <p>Some sources may have records that are not directly addressable. For 
example, imagine a file format consisting of a sequence of compressed blocks. 
Each block can be assigned an offset, but records within the block cannot be 
directly addressed without decompressing the block. Let us refer to this 
hypothetical format as <em>CBF (Compressed Blocks Format)</em>.</p>
+
+    <p>Many such formats can still satisfy the associativity property. For 
example, in CBF, reading [A, B) can mean “read all the records in all blocks 
whose starting offset is in [A, B)”.</p>
+
+    <p>To support such complex formats, Beam introduces the notion of 
<em>split points</em>. A record is a split point if there exists a position 
<strong>A</strong> such that the record is the first one to be returned when 
reading the range [A, infinity). In CBF, the only split points would be the 
first records in each block.</p>
+
+    <p>Split points allow us to define the meaning of a record’s position 
and a source’s range in the following cases:</p>
+
+    <ul>
+      <li>For a record that is at a split point, its position is defined to be 
the largest <strong>A</strong> such that reading a source with the range [A, 
infinity) returns this record.</li>
+      <li>Positions of other records are only required to be 
non-decreasing.</li>
+      <li>Reading the source [A, B) must return records starting from the 
first split point at or after <strong>A</strong>, up to but not including the 
first split point at or after <strong>B</strong>. In particular, this means 
that the first record returned by a source MUST always be a split point.</li>
+      <li>Positions of split points must be unique.</li>
+    </ul>
+
+    <p>As a result, for any decomposition of the full range of the source into 
position ranges, the total set of records will be the full set of records in 
the source, and each record will be read exactly once.</p>
+  </li>
+  <li>
+    <p><strong>Consumed positions</strong> - Consumed positions refer to 
records that have been read.</p>
+
+    <p>As the source is being read, and records read from it are being passed 
to the downstream transforms in the pipeline, we say that positions in the 
source are being <em>consumed</em>. When a reader has read a record (or 
promised to a caller that a record will be returned), positions up to and 
including the record’s start position are considered <em>consumed</em>.</p>
+
+    <p>Dynamic splitting can happen only at <em>unconsumed</em> positions. If 
the reader just returned a record at offset 42 in a file, dynamic splitting can 
happen only at offset 43 or beyond. Otherwise, that record could be read twice 
(by the current reader and the reader of the new task).</p>
+  </li>
+</ul>
+
+<h4 id="rangetracker-methods">RangeTracker Methods</h4>
+
+<p>To implement a <code class="highlighter-rouge">RangeTracker</code>, your 
subclass must override the following methods:</p>
+
+<ul>
+  <li>
+    <p><code class="highlighter-rouge">start_position</code>: Returns the 
starting position of the current range, inclusive.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">stop_position</code>: Returns the 
ending position of the current range, exclusive.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">try_claim</code>: This method is used 
to determine if a record at a split point is within the range. This method 
should modify the internal state of the <code 
class="highlighter-rouge">RangeTracker</code> by updating the last-consumed 
position to the given starting <code class="highlighter-rouge">position</code> 
of the record being read by the source. The method returns true if the given 
position falls within the current range.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">set_current_position</code>: This 
method updates the last-consumed position to the given starting position of a 
record being read by a source. You can invoke this method for records that do 
not start at split points, and this should modify the internal state of the 
<code class="highlighter-rouge">RangeTracker</code>. If the record starts at a 
split point, you must invoke <code class="highlighter-rouge">try_claim</code> 
instead of this method.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">position_at_fraction</code>: Given a 
fraction within the range [0.0, 1.0), this method will return the position at 
the given fraction compared to the position range [<code 
class="highlighter-rouge">self.start_position</code>, <code 
class="highlighter-rouge">self.stop_position</code>).</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">try_split</code>: This method attempts 
to split the current range into two parts around a suggested position. It is 
allowed to split at a different position, but in most cases it will split at 
the suggested position.</p>
+  </li>
+</ul>
+
+<p>This method splits the current range [<code 
class="highlighter-rouge">self.start_position</code>, <code 
class="highlighter-rouge">self.stop_position</code>) into a “primary” part 
[<code class="highlighter-rouge">self.start_position</code>, <code 
class="highlighter-rouge">split_position</code>), and a “residual” part 
[<code class="highlighter-rouge">split_position</code>, <code 
class="highlighter-rouge">self.stop_position</code>), assuming that <code 
class="highlighter-rouge">split_position</code> has not been consumed yet.</p>
+
+<p>If <code class="highlighter-rouge">split_position</code> has already been 
consumed, the method returns <code class="highlighter-rouge">None</code>.  
Otherwise, it updates the current range to be the primary and returns a tuple 
(<code class="highlighter-rouge">split_position</code>, <code 
class="highlighter-rouge">split_fraction</code>).  <code 
class="highlighter-rouge">split_fraction</code> should be the fraction of size 
of range [<code class="highlighter-rouge">self.start_position</code>, <code 
class="highlighter-rouge">split_position</code>) compared to the original 
(before split) range [<code 
class="highlighter-rouge">self.start_position</code>, <code 
class="highlighter-rouge">self.stop_position</code>).</p>
+
+<ul>
+  <li><code class="highlighter-rouge">fraction_consumed</code>: Returns the 
approximate fraction of consumed positions in the source.</li>
+</ul>
+
+<p><strong>Note:</strong> Methods of class <code 
class="highlighter-rouge">iobase.RangeTracker</code> may be invoked by multiple 
threads, hence this class must be made thread-safe, for example, by using a 
single lock object.</p>
+
+<h3 id="convenience-source-base-classes">Convenience Source Base Classes</h3>
+
+<p>The Beam SDK for Python contains some convenient abstract base classes to 
help you easily create new sources.</p>
+
+<h4 id="filebasedsource">FileBasedSource</h4>
+
+<p><code class="highlighter-rouge">FileBasedSource</code> is a framework for 
developing sources for new file types. You can derive your <code 
class="highlighter-rouge">BoundedSource</code> class from the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py";>FileBasedSource</a>
 class.</p>
+
+<p>To create a source for a new file type, you need to create a sub-class of 
<code class="highlighter-rouge">FileBasedSource</code>.  Sub-classes of <code 
class="highlighter-rouge">FileBasedSource</code> must implement the method 
<code class="highlighter-rouge">FileBasedSource.read_records()</code>.</p>
+
+<p>See <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/avroio.py";>AvroSource</a>
 for an example implementation of <code 
class="highlighter-rouge">FileBasedSource</code>.</p>
+
+<h2 id="a-namereading-sourcesareading-from-a-custom-source"><a 
name="reading-sources"></a>Reading from a Custom Source</h2>
+
+<p>The following example, <code 
class="highlighter-rouge">CountingSource</code>, demonstrates an implementation 
of <code class="highlighter-rouge">BoundedSource</code> and uses the 
SDK-provided <code class="highlighter-rouge">RangeTracker</code> called <code 
class="highlighter-rouge">OffsetRangeTracker</code>.</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>class 
CountingSource(iobase.BoundedSource):
+
+  def __init__(self, count):
+    self._count = count
+
+  def estimate_size(self):
+    return self._count
+
+  def get_range_tracker(self, start_position, stop_position):
+    if start_position is None:
+      start_position = 0
+    if stop_position is None:
+      stop_position = self._count
+
+    return OffsetRangeTracker(start_position, stop_position)
+
+  def read(self, range_tracker):
+    for i in range(self._count):
+      if not range_tracker.try_claim(i):
+        return
+      yield i
+
+  def split(self, desired_bundle_size, start_position=None,
+            stop_position=None):
+    if start_position is None:
+      start_position = 0
+    if stop_position is None:
+      stop_position = self._count
+
+    bundle_start = start_position
+    while bundle_start &lt; self._count:
+      bundle_stop = max(self._count, bundle_start + desired_bundle_size)
+      yield iobase.SourceBundle(weight=(bundle_stop - bundle_start),
+                                source=self,
+                                start_position=bundle_start,
+                                stop_position=bundle_stop)
+      bundle_start = bundle_stop
+</code></pre>
+</div>
+
+<p>To read data from a custom source in your pipeline, use the <code 
class="highlighter-rouge">Read</code> transform:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>p = 
beam.Pipeline(options=PipelineOptions())
+numbers = p | 'ProduceNumbers' &gt;&gt; beam.io.Read(CountingSource(count))
+</code></pre>
+</div>
+
+<p><strong>Note:</strong> When you create a source that end-users are going to 
use, it’s recommended that you do not expose the code for the source itself 
as demonstrated in the example above, but rather use a wrapping <code 
class="highlighter-rouge">PTransform</code> instead. See <a 
href="#ptransform-wrappers">PTransform wrappers</a> to see how and why to avoid 
exposing your sources.</p>
+
+<h2 id="a-namecreating-sinksacreating-a-custom-sink"><a 
name="creating-sinks"></a>Creating a Custom Sink</h2>
+
+<p>You should create a custom sink if you’d like to use the advanced 
features that the Sink API provides, such as global initialization and 
finalization that allow the write operation to appear “atomic” (i.e. either 
all data is written or none is).</p>
+
+<p>A sink represents a resource that can be written to using the <code 
class="highlighter-rouge">Write</code> transform. A parallel write to a sink 
consists of three phases:</p>
+
+<ol>
+  <li>A sequential initialization phase. For example, creating a temporary 
output directory.</li>
+  <li>A parallel write phase where workers write bundles of records.</li>
+  <li>A sequential finalization phase. For example, merging output files.</li>
+</ol>
+
+<p>For example, if you’d like to write to a new table in a database, you 
should use the Sink API. In this case, the initializer will create a temporary 
table, the writer will write rows to it, and the finalizer will rename the 
table to a final location.</p>
+
+<p>To create a custom data sink for your pipeline, you’ll need to provide 
the format-specific logic that tells the sink how to write bounded data from 
your pipeline’s <code class="highlighter-rouge">PCollection</code>s to an 
output sink. The sink writes bundles of data in parallel using multiple 
workers.</p>
+
+<p>You supply the writing logic by creating the following classes:</p>
+
+<ul>
+  <li>
+    <p>A subclass of <code class="highlighter-rouge">Sink</code>, which you 
can find in the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py";>iobase.py</a>
 module.  <code class="highlighter-rouge">Sink</code> describes the location or 
resource to write to. Depending on the type of sink, your <code 
class="highlighter-rouge">Sink</code> subclass may contain fields such as the 
path to an output directory on a filesystem or a database table name. <code 
class="highlighter-rouge">Sink</code> provides three methods for performing a 
write operation to the sink it describes. Your subclass of <code 
class="highlighter-rouge">Sink</code> must implement these three methods: <code 
class="highlighter-rouge">initialize_write()</code>, <code 
class="highlighter-rouge">open_writer()</code>, and <code 
class="highlighter-rouge">finalize_write()</code>.</p>
+  </li>
+  <li>
+    <p>A subclass of <code class="highlighter-rouge">Writer</code>, which you 
can find in the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py";>iobase.py</a>
 module. <code class="highlighter-rouge">Writer</code> writes a bundle of 
elements from an input <code class="highlighter-rouge">PCollection</code> to 
your designated data sink. <code class="highlighter-rouge">Writer</code> 
defines two methods: <code class="highlighter-rouge">write()</code>, which 
writes a single record from the bundle, and <code 
class="highlighter-rouge">close()</code>, which is called once at the end of 
writing a bundle.</p>
+  </li>
+</ul>
+
+<h3 id="implementing-the-sink-subclass">Implementing the Sink Subclass</h3>
+
+<p>Your <code class="highlighter-rouge">Sink</code> subclass describes the 
location or resource to which your pipeline writes its output. This might 
include a file system location, the name of a database table or dataset, 
etc.</p>
+
+<p>To implement a <code class="highlighter-rouge">Sink</code>, your subclass 
must override the following methods:</p>
+
+<ul>
+  <li>
+    <p><code class="highlighter-rouge">initialize_write</code>: This method 
performs any necessary initialization before writing to the output location. 
Services call this method before writing begins. For example, you can use <code 
class="highlighter-rouge">initialize_write</code> to create a temporary output 
directory.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">open_writer</code>: This method enables 
writing a bundle of elements to the sink.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">finalize_write</code>:This method 
finalizes the sink after all data is written to it. Given the result of 
initialization and an iterable of results from bundle writes, <code 
class="highlighter-rouge">finalize_write</code> performs finalization after 
writing and closes the sink. This method is called after all bundle write 
operations are complete.</p>
+  </li>
+</ul>
+
+<p><strong>Caution:</strong> <code 
class="highlighter-rouge">initialize_write</code> and <code 
class="highlighter-rouge">finalize_write</code> are conceptually called once: 
at the beginning and end of a <code class="highlighter-rouge">Write</code> 
transform.  However, when you implement these methods, you must ensure that 
they are <strong>idempotent</strong>, as they may be called multiple times on 
different machines in the case of failure, retry, or for redundancy.</p>
+
+<h3 id="implementing-the-writer-subclass">Implementing the Writer Subclass</h3>
+
+<p>Your <code class="highlighter-rouge">Writer</code> subclass implements the 
logic for writing a bundle of elements from a <code 
class="highlighter-rouge">PCollection</code> to output location defined in your 
<code class="highlighter-rouge">Sink</code>. Services may instantiate multiple 
instances of your <code class="highlighter-rouge">Writer</code> in different 
threads on the same worker, so access to any static members or methods must be 
thread-safe.</p>
+
+<p>To implement a <code class="highlighter-rouge">Writer</code>, your subclass 
must override the following abstract methods:</p>
+
+<ul>
+  <li>
+    <p><code class="highlighter-rouge">write</code>: This method writes a 
value to your <code class="highlighter-rouge">Sink</code> using the current 
writer.</p>
+  </li>
+  <li>
+    <p><code class="highlighter-rouge">close</code>: This method closes the 
current writer.</p>
+  </li>
+</ul>
+
+<h4 id="handling-bundle-ids">Handling Bundle IDs</h4>
+
+<p>When the service calls <code 
class="highlighter-rouge">Sink.open_writer</code>, it will pass a unique bundle 
ID for the records to be written. Your <code 
class="highlighter-rouge">Writer</code> must use this bundle ID to ensure that 
its output does not interfere with that of other <code 
class="highlighter-rouge">Writer</code> instances that might have been created 
in parallel. This is particularly important as the service may retry write 
operations multiple times in case of failure.</p>
+
+<p>For example, if your <code class="highlighter-rouge">Sink</code>’s output 
is file-based, your <code class="highlighter-rouge">Writer</code> class might 
use the bundle ID as a filename suffix to ensure that your <code 
class="highlighter-rouge">Writer</code> writes its records to a unique output 
file not used by other <code class="highlighter-rouge">Writer</code>s. You can 
then have your <code class="highlighter-rouge">Writer</code>’s <code 
class="highlighter-rouge">close</code> method return that file location as part 
of the write result.</p>
+
+<h3 id="convenience-sink-and-writer-base-classes">Convenience Sink and Writer 
Base Classes</h3>
+
+<p>The Beam SDK for Python contains some convenient abstract base classes to 
help you create <code class="highlighter-rouge">Source</code> and <code 
class="highlighter-rouge">Reader</code> classes that work with common data 
storage formats, like files.</p>
+
+<h4 id="filesink">FileSink</h4>
+
+<p>If your data source uses files, you can derive your <code 
class="highlighter-rouge">Sink</code> and <code 
class="highlighter-rouge">Writer</code> classes from the <code 
class="highlighter-rouge">FileSink</code> and <code 
class="highlighter-rouge">FileSinkWriter</code> classes, which can be found in 
the <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/fileio.py";>fileio.py</a>
 module. These classes implement code common sinks that interact with files, 
including:</p>
+
+<ul>
+  <li>Setting file headers and footers</li>
+  <li>Sequential record writing</li>
+  <li>Setting the output MIME type</li>
+</ul>
+
+<h2 id="a-namewriting-sinksawriting-to-a-custom-sink"><a 
name="writing-sinks"></a>Writing to a Custom Sink</h2>
+
+<p>Consider a simple key-value storage that writes a given set of key-value 
pairs to a set of tables. The following is the key-value storage’s API:</p>
+
+<ul>
+  <li><code class="highlighter-rouge">connect(url)</code> Connects to the 
storage and returns an access token which can be used to perform further 
operations.</li>
+  <li><code class="highlighter-rouge">open_table(access_token, 
table_name)</code> Creates a table named ‘table_name’. Returns a table 
object.</li>
+  <li><code class="highlighter-rouge">write_to_table(access_token, table, key, 
value)</code> Writes a key, value pair to the given table.</li>
+  <li><code class="highlighter-rouge">rename_table(access_token, old_name, 
new_name)</code> Renames the table named ‘old_name’ to ‘new_name’.</li>
+</ul>
+
+<p>The following code demonstrates how to implement the <code 
class="highlighter-rouge">Sink</code> class for this key-value storage.</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>class 
SimpleKVSink(iobase.Sink):
+
+  def __init__(self, url, final_table_name):
+    self._url = url
+    self._final_table_name = final_table_name
+
+  def initialize_write(self):
+    access_token = simplekv.connect(self._url)
+    return access_token
+
+  def open_writer(self, access_token, uid):
+    table_name = 'table' + uid
+    return SimpleKVWriter(access_token, table_name)
+
+  def finalize_write(self, access_token, table_names):
+    for i, table_name in enumerate(table_names):
+      simplekv.rename_table(
+          access_token, table_name, self._final_table_name + str(i))
+</code></pre>
+</div>
+
+<p>The following code demonstrates how to implement the <code 
class="highlighter-rouge">Writer</code> class for this key-value storage.</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>class 
SimpleKVWriter(iobase.Writer):
+
+  def __init__(self, access_token, table_name):
+    self._access_token = access_token
+    self._table_name = table_name
+    self._table = simplekv.open_table(access_token, table_name)
+
+  def write(self, record):
+    key, value = record
+
+    simplekv.write_to_table(self._access_token, self._table, key, value)
+
+  def close(self):
+    return self._table_name
+</code></pre>
+</div>
+
+<p>The following code demonstrates how to write to the sink using the <code 
class="highlighter-rouge">Write</code> transform.</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>p = 
beam.Pipeline(options=PipelineOptions())
+kvs = p | 'CreateKVs' &gt;&gt; beam.Create(KVs)
+
+kvs | 'WriteToSimpleKV' &gt;&gt; beam.io.Write(
+    SimpleKVSink('http://url_to_simple_kv/', final_table_name))
+</code></pre>
+</div>
+
+<p><strong>Note:</strong> When you create a sink that end-users are going to 
use, it’s recommended that you do not expose the code for the sink itself as 
demonstrated in the example above, but rather use a wrapping <code 
class="highlighter-rouge">PTransform</code> instead. See <a 
href="#ptransform-wrappers">PTransform wrappers</a> to see how and why to avoid 
exposing your sinks.</p>
+
+<h2 id="a-nameptransform-wrappersaptransform-wrappers"><a 
name="ptransform-wrappers"></a>PTransform Wrappers</h2>
+
+<p>If you create a custom source or sink for your own use, such as for 
learning purposes, you should create them as explained in the sections above 
and use them as demonstrated in the examples.</p>
+
+<p>However, when you create a source or sink that end-users are going to use, 
instead of exposing the source or sink itself, you should create a wrapper 
<code class="highlighter-rouge">PTransform</code>. Ideally, a custom source or 
sink should be exposed to users simply as “something that can be applied in a 
pipeline”, which is actually a <code 
class="highlighter-rouge">PTransform</code>. That way, its implementation can 
be hidden and arbitrarily complex or simple.</p>
+
+<p>The greatest benefit of not exposing the implementation details is that 
later on you will be able to add additional functionality without breaking the 
existing implementation for users.  For example, if your users’ pipelines 
read from your source using <code 
class="highlighter-rouge">beam.io.Read(...)</code> and you want to insert a 
reshard into the pipeline, all of your users would need to add the reshard 
themselves (using the <code class="highlighter-rouge">GroupByKey</code> 
transform). To solve this, it’s recommended that you expose your source as a 
composite <code class="highlighter-rouge">PTransform</code> that performs both 
the read operation and the reshard.</p>
+
+<p>To avoid exposing your custom sources and sinks to end-users, it’s 
recommended that you use the <code class="highlighter-rouge">_</code> prefix 
when creating your custom source and sink classes. Then, create a wrapper <code 
class="highlighter-rouge">PTransform</code>.</p>
+
+<p>The following examples change the custom source and sink from the above 
sections so that they are not exposed to end-users. For the source, rename 
<code class="highlighter-rouge">CountingSource</code> to <code 
class="highlighter-rouge">_CountingSource</code>. Then, create the wrapper 
<code class="highlighter-rouge">PTransform</code>, called <code 
class="highlighter-rouge">ReadFromCountingSource</code>:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>class 
ReadFromCountingSource(PTransform):
+
+  def __init__(self, count, **kwargs):
+    super(ReadFromCountingSource, self).__init__(**kwargs)
+    self._count = count
+
+  def expand(self, pcoll):
+    return pcoll | iobase.Read(_CountingSource(count))
+</code></pre>
+</div>
+
+<p>Finally, read from the source:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>p = 
beam.Pipeline(options=PipelineOptions())
+numbers = p | 'ProduceNumbers' &gt;&gt; ReadFromCountingSource(count)
+</code></pre>
+</div>
+
+<p>For the sink, rename <code class="highlighter-rouge">SimpleKVSink</code> to 
<code class="highlighter-rouge">_SimpleKVSink</code>. Then, create the wrapper 
<code class="highlighter-rouge">PTransform</code>, called <code 
class="highlighter-rouge">WriteToKVSink</code>:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>class 
WriteToKVSink(PTransform):
+
+  def __init__(self, url, final_table_name, **kwargs):
+    super(WriteToKVSink, self).__init__(**kwargs)
+    self._url = url
+    self._final_table_name = final_table_name
+
+  def expand(self, pcoll):
+    return pcoll | iobase.Write(_SimpleKVSink(self._url,
+                                              self._final_table_name))
+</code></pre>
+</div>
+
+<p>Finally, write to the sink:</p>
+
+<div class="highlighter-rouge"><pre class="highlight"><code>p = 
beam.Pipeline(options=PipelineOptions())
+kvs = p | 'CreateKVs' &gt;&gt; beam.core.Create(KVs)
+kvs | 'WriteToSimpleKV' &gt;&gt; WriteToKVSink(
+    'http://url_to_simple_kv/', final_table_name)
+</code></pre>
+</div>
+
+
+      </div>
+
+
+    <hr>
+  <div class="row">
+      <div class="col-xs-12">
+          <footer>
+              <p class="text-center">
+                &copy; Copyright
+                <a href="http://www.apache.org";>The Apache Software 
Foundation</a>,
+                2017. All Rights Reserved.
+              </p>
+              <p class="text-center">
+                <a href="/privacy_policy">Privacy Policy</a> |
+                <a href="/feed.xml">RSS Feed</a>
+              </p>
+          </footer>
+      </div>
+  </div>
+  <!-- container div end -->
+</div>
+
+
+  </body>
+
+</html>

http://git-wip-us.apache.org/repos/asf/beam-site/blob/83a6e401/content/documentation/sdks/python/index.html
----------------------------------------------------------------------
diff --git a/content/documentation/sdks/python/index.html 
b/content/documentation/sdks/python/index.html
index 6ae91b0..24573cf 100644
--- a/content/documentation/sdks/python/index.html
+++ b/content/documentation/sdks/python/index.html
@@ -164,6 +164,11 @@
 
 <p>When you run your pipeline locally, the packages that your pipeline depends 
on are available because they are installed on your local machine. However, 
when you want to run your pipeline remotely, you must make sure these 
dependencies are available on the remote machines. <a 
href="/documentation/sdks/python-pipeline-dependencies">Managing Python 
Pipeline Dependencies</a> shows you how to make your dependencies available to 
the remote workers.</p>
 
+<h2 id="custom-sources-and-sinks">Custom Sources and Sinks</h2>
+
+<p>The Beam SDK for Python provides an extensible API that you can use to 
create custom data sources and sinks. The <a 
href="/documentation/sdks/python-custom-io">Custom Sources and Sinks for Python 
tutorial</a> shows how to create custom sources and sinks using <a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py";>Beam’s
 Source and Sink API</a>.</p>
+
+
       </div>
 
 

Reply via email to