Regenerate website
Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/83a6e401 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/83a6e401 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/83a6e401 Branch: refs/heads/asf-site Commit: 83a6e4011f7eae8003fe85a7c51cb824440cffa4 Parents: a7e8b60 Author: Ahmet Altay <al...@google.com> Authored: Fri Feb 17 13:17:53 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Fri Feb 17 13:17:53 2017 -0800 ---------------------------------------------------------------------- .../sdks/python-custom-io/index.html | 613 +++++++++++++++++++ content/documentation/sdks/python/index.html | 5 + 2 files changed, 618 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam-site/blob/83a6e401/content/documentation/sdks/python-custom-io/index.html ---------------------------------------------------------------------- diff --git a/content/documentation/sdks/python-custom-io/index.html b/content/documentation/sdks/python-custom-io/index.html new file mode 100644 index 0000000..c43f606 --- /dev/null +++ b/content/documentation/sdks/python-custom-io/index.html @@ -0,0 +1,613 @@ +<!DOCTYPE html> +<html lang="en"> + + <head> + <meta charset="utf-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + + <title>Beam Custom Sources and Sinks for Python</title> + <meta name="description" content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes. +"> + + <link rel="stylesheet" href="/styles/site.css"> + <link rel="stylesheet" href="/css/theme.css"> + <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script> + <script src="/js/bootstrap.min.js"></script> + <script src="/js/language-switch.js"></script> + <link rel="canonical" href="https://beam.apache.org/documentation/sdks/python-custom-io/" data-proofer-ignore> + <link rel="alternate" type="application/rss+xml" title="Apache Beam" href="https://beam.apache.org/feed.xml"> + <script> + (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ + (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), + m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) + })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); + + ga('create', 'UA-73650088-1', 'auto'); + ga('send', 'pageview'); + + </script> + <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico"> +</head> + + + <body role="document"> + + <nav class="navbar navbar-default navbar-fixed-top"> + <div class="container"> + <div class="navbar-header"> + <a href="/" class="navbar-brand" > + <img alt="Brand" style="height: 25px" src="/images/beam_logo_navbar.png"> + </a> + <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar"> + <span class="sr-only">Toggle navigation</span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + </button> + </div> + <div id="navbar" class="navbar-collapse collapse"> + <ul class="nav navbar-nav"> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Get Started <span class="caret"></span></a> + <ul class="dropdown-menu"> + <li><a href="/get-started/beam-overview/">Beam Overview</a></li> + <li><a href="/get-started/quickstart-java/">Quickstart - Java</a></li> + <li><a href="/get-started/quickstart-py/">Quickstart - Python</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Example Walkthroughs</li> + <li><a href="/get-started/wordcount-example/">WordCount</a></li> + <li><a href="/get-started/mobile-gaming-example/">Mobile Gaming</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Resources</li> + <li><a href="/get-started/downloads">Downloads</a></li> + <li><a href="/get-started/support">Support</a></li> + </ul> + </li> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Documentation <span class="caret"></span></a> + <ul class="dropdown-menu"> + <li><a href="/documentation">Using the Documentation</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Beam Concepts</li> + <li><a href="/documentation/programming-guide/">Programming Guide</a></li> + <li><a href="/documentation/resources/">Additional Resources</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Pipeline Fundamentals</li> + <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li> + <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li> + <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">SDKs</li> + <li><a href="/documentation/sdks/java/">Java SDK</a></li> + <li><a href="/documentation/sdks/javadoc/0.5.0/" target="_blank">Java SDK API Reference <img src="/images/external-link-icon.png" + width="14" height="14" + alt="External link."></a> + </li> + <li><a href="/documentation/sdks/python/">Python SDK</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Runners</li> + <li><a href="/documentation/runners/capability-matrix/">Capability Matrix</a></li> + <li><a href="/documentation/runners/direct/">Direct Runner</a></li> + <li><a href="/documentation/runners/apex/">Apache Apex Runner</a></li> + <li><a href="/documentation/runners/flink/">Apache Flink Runner</a></li> + <li><a href="/documentation/runners/spark/">Apache Spark Runner</a></li> + <li><a href="/documentation/runners/dataflow/">Cloud Dataflow Runner</a></li> + </ul> + </li> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Contribute <span class="caret"></span></a> + <ul class="dropdown-menu"> + <li><a href="/contribute">Get Started Contributing</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Guides</li> + <li><a href="/contribute/contribution-guide/">Contribution Guide</a></li> + <li><a href="/contribute/testing/">Testing Guide</a></li> + <li><a href="/contribute/release-guide/">Release Guide</a></li> + <li><a href="/contribute/ptransform-style-guide/">PTransform Style Guide</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Technical References</li> + <li><a href="/contribute/design-principles/">Design Principles</a></li> + <li><a href="/contribute/work-in-progress/">Ongoing Projects</a></li> + <li><a href="/contribute/source-repository/">Source Repository</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Promotion</li> + <li><a href="/contribute/presentation-materials/">Presentation Materials</a></li> + <li><a href="/contribute/logos/">Logos and Design</a></li> + <li role="separator" class="divider"></li> + <li><a href="/contribute/maturity-model/">Maturity Model</a></li> + <li><a href="/contribute/team/">Team</a></li> + </ul> + </li> + + <li><a href="/blog">Blog</a></li> + </ul> + <ul class="nav navbar-nav navbar-right"> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><img src="https://www.apache.org/foundation/press/kit/feather_small.png" alt="Apache Logo" style="height:24px;">Apache Software Foundation<span class="caret"></span></a> + <ul class="dropdown-menu dropdown-menu-right"> + <li><a href="http://www.apache.org/">ASF Homepage</a></li> + <li><a href="http://www.apache.org/licenses/">License</a></li> + <li><a href="http://www.apache.org/security/">Security</a></li> + <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li> + <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li> + <li><a href="https://www.apache.org/foundation/policies/conduct">Code of Conduct</a></li> + </ul> + </li> + </ul> + </div><!--/.nav-collapse --> + </div> +</nav> + + +<link rel="stylesheet" href=""> + + + <div class="container" role="main"> + + <div class="row"> + <h1 id="beam-custom-sources-and-sinks-for-python">Beam Custom Sources and Sinks for Python</h1> + +<p>The Beam SDK for Python provides an extensible API that you can use to create custom data sources and sinks. This tutorial shows how to create custom sources and sinks using <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py">Beamâs Source and Sink API</a>.</p> + +<ul> + <li>Create a custom source by extending the <code class="highlighter-rouge">BoundedSource</code> and <code class="highlighter-rouge">RangeTracker</code> interfaces.</li> + <li>Create a custom sink by implementing the <code class="highlighter-rouge">Sink</code> and <code class="highlighter-rouge">Writer</code> classes.</li> +</ul> + +<h2 id="why-create-a-custom-source-or-sink">Why Create a Custom Source or Sink</h2> + +<p>Youâll need to create a custom source or sink if you want your pipeline to read data from (or write data to) a storage system for which the Beam SDK for Python does not provide <a href="/documentation/programming-guide/#io">native support</a>.</p> + +<p>In simple cases, you may not need to create a custom source or sink. For example, if you need to read data from an SQL database using an arbitrary query, none of the advanced Source API features would benefit you. Likewise, if youâd like to write data to a third-party API via a protocol that lacks deduplication support, the Sink API wouldnât benefit you. In such cases it makes more sense to use a <code class="highlighter-rouge">ParDo</code>.</p> + +<p>However, if youâd like to use advanced features such as dynamic splitting and size estimation, you should use Beamâs APIs and create a custom source or sink.</p> + +<h2 id="a-namebasic-code-reqsabasic-code-requirements-for-custom-sources-and-sinks"><a name="basic-code-reqs"></a>Basic Code Requirements for Custom Sources and Sinks</h2> + +<p>Services use the classes you provide to read and/or write data using multiple worker instances in parallel. As such, the code you provide for <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code> subclasses must meet some basic requirements:</p> + +<h3 id="serializability">Serializability</h3> + +<p>Your <code class="highlighter-rouge">Source</code> or <code class="highlighter-rouge">Sink</code> subclass must be serializable. The service may create multiple instances of your <code class="highlighter-rouge">Source</code> or <code class="highlighter-rouge">Sink</code> subclass to be sent to multiple remote workers to facilitate reading or writing in parallel. Note that the <em>way</em> the source and sink objects are serialized is runner specific.</p> + +<h3 id="immutability">Immutability</h3> + +<p>Your <code class="highlighter-rouge">Source</code> or <code class="highlighter-rouge">Sink</code> subclass must be effectively immutable. You should only use mutable state in your <code class="highlighter-rouge">Source</code> or <code class="highlighter-rouge">Sink</code> subclass if you are using lazy evaluation of expensive computations that you need to implement the source.</p> + +<h3 id="thread-safety">Thread-Safety</h3> + +<p>Your code must be thread-safe. The Beam SDK for Python provides the <code class="highlighter-rouge">RangeTracker</code> class to make this easier.</p> + +<h3 id="testability">Testability</h3> + +<p>It is critical to exhaustively unit-test all of your <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Sink</code> subclasses. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.</p> + +<p>You can use test harnesses and utility methods available in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/source_test_utils.py">source_test_utils module</a> to develop tests for your source.</p> + +<h2 id="a-namecreating-sourcesacreating-a-custom-source"><a name="creating-sources"></a>Creating a Custom Source</h2> + +<p>You should create a custom source if youâd like to use the advanced features that the Source API provides:</p> + +<ul> + <li>Dynamic splitting</li> + <li>Progress estimation</li> + <li>Size estimation</li> + <li>Splitting into parts of particular size recommended by the service</li> +</ul> + +<p>For example, if youâd like to read from a new file format that contains many records per file, or if youâd like to read from a key-value store that supports read operations in sorted key order.</p> + +<p>To create a custom data source for your pipeline, youâll need to provide the format-specific logic that tells the service how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel.</p> + +<p>You supply the logic for your custom source by creating the following classes:</p> + +<ul> + <li>A subclass of <code class="highlighter-rouge">BoundedSource</code>, which you can find in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py">iobase.py</a> module. <code class="highlighter-rouge">BoundedSource</code> is a source that reads a finite amount of input records. The class describes the data you want to read, including the dataâs location and parameters (such as how much data to read).</li> + <li>A subclass of <code class="highlighter-rouge">RangeTracker</code>, which you can find in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py">iobase.py</a> module. <code class="highlighter-rouge">RangeTracker</code> is a thread-safe object used to manage a range for a given position type.</li> +</ul> + +<h3 id="implementing-the-boundedsource-subclass">Implementing the BoundedSource Subclass</h3> + +<p><code class="highlighter-rouge">BoundedSource</code> represents a finite data set from which the service reads, possibly in parallel. <code class="highlighter-rouge">BoundedSource</code> contains a set of methods that the service uses to split the data set for reading by multiple remote workers.</p> + +<p>To implement a <code class="highlighter-rouge">BoundedSource</code>, your subclass must override the following methods:</p> + +<ul> + <li> + <p><code class="highlighter-rouge">estimate_size</code>: Services use this method to estimate the <em>total size</em> of your data, in bytes. This estimate is in terms of external storage size, before performing decompression or other processing.</p> + </li> + <li> + <p><code class="highlighter-rouge">split</code>: Service use this method to split your finite data into bundles of a given size.</p> + </li> + <li> + <p><code class="highlighter-rouge">get_range_tracker</code>: Services use this method to get the <code class="highlighter-rouge">RangeTracker</code> for a given position range, and use the information to report progress and perform dynamic splitting of sources.</p> + </li> + <li> + <p><code class="highlighter-rouge">read</code>: This method returns an iterator that reads data from the source, with respect to the boundaries defined by the given <code class="highlighter-rouge">RangeTracker</code> object.</p> + </li> +</ul> + +<h3 id="implementing-the-rangetracker-subclass">Implementing the RangeTracker Subclass</h3> + +<p>A <code class="highlighter-rouge">RangeTracker</code> is a thread-safe object used to manage the current range and current position of the reader of a <code class="highlighter-rouge">BoundedSource</code> and protect concurrent access to them.</p> + +<p>To implement a <code class="highlighter-rouge">RangeTracker</code>, you should first familiarize yourself with the following definitions:</p> + +<ul> + <li> + <p><strong>Position-based sources</strong> - A position-based source can be described by a range of positions of an ordered type, and the records read by the source can be described by positions of that type. For example, for a record within a file, the position can be the starting byte offset of the record. The position type for the record in this case is <code class="highlighter-rouge">long</code>.</p> + + <p>The main requirement for position-based sources is <strong>associativity</strong>: Reading records in position range â[A, B)â and records in position range â[B, C)â should give the same records as reading records in position range â[A, C)â, where âAâ <= âBâ <= âCâ. This property ensures that no matter how many arbitrary sub-ranges a range of positions is split into, the total set of records they describe stays the same.</p> + + <p>The other important property is how the sourceâs range relates to positions of records in the source. In many sources each record can be identified by a unique starting position. In this case:</p> + + <ul> + <li>All records returned by a source â[A, B)â must have starting positions in this range.</li> + <li>All but the last record should end within this range. The last record may or may not extend past the end of the range.</li> + <li>Records must not overlap.</li> + </ul> + + <p>Such sources should define âread â[A, B)ââ as âread from the first record starting at or after âAâ, up to but not including the first record starting at or after âBââ.</p> + + <p>Some examples of such sources include reading lines or CSV from a text file, reading keys and values from a database, etc.</p> + + <p>The concept of <em>split points</em> allows to extend the definitions for dealing with sources where some records cannot be identified by a unique starting position.</p> + </li> + <li> + <p><strong>Split points</strong> - A split point describes a record that is the first one returned when reading the range from and including position <strong>A</strong> up to infinity (i.e. [A, infinity)).</p> + + <p>Some sources may have records that are not directly addressable. For example, imagine a file format consisting of a sequence of compressed blocks. Each block can be assigned an offset, but records within the block cannot be directly addressed without decompressing the block. Let us refer to this hypothetical format as <em>CBF (Compressed Blocks Format)</em>.</p> + + <p>Many such formats can still satisfy the associativity property. For example, in CBF, reading [A, B) can mean âread all the records in all blocks whose starting offset is in [A, B)â.</p> + + <p>To support such complex formats, Beam introduces the notion of <em>split points</em>. A record is a split point if there exists a position <strong>A</strong> such that the record is the first one to be returned when reading the range [A, infinity). In CBF, the only split points would be the first records in each block.</p> + + <p>Split points allow us to define the meaning of a recordâs position and a sourceâs range in the following cases:</p> + + <ul> + <li>For a record that is at a split point, its position is defined to be the largest <strong>A</strong> such that reading a source with the range [A, infinity) returns this record.</li> + <li>Positions of other records are only required to be non-decreasing.</li> + <li>Reading the source [A, B) must return records starting from the first split point at or after <strong>A</strong>, up to but not including the first split point at or after <strong>B</strong>. In particular, this means that the first record returned by a source MUST always be a split point.</li> + <li>Positions of split points must be unique.</li> + </ul> + + <p>As a result, for any decomposition of the full range of the source into position ranges, the total set of records will be the full set of records in the source, and each record will be read exactly once.</p> + </li> + <li> + <p><strong>Consumed positions</strong> - Consumed positions refer to records that have been read.</p> + + <p>As the source is being read, and records read from it are being passed to the downstream transforms in the pipeline, we say that positions in the source are being <em>consumed</em>. When a reader has read a record (or promised to a caller that a record will be returned), positions up to and including the recordâs start position are considered <em>consumed</em>.</p> + + <p>Dynamic splitting can happen only at <em>unconsumed</em> positions. If the reader just returned a record at offset 42 in a file, dynamic splitting can happen only at offset 43 or beyond. Otherwise, that record could be read twice (by the current reader and the reader of the new task).</p> + </li> +</ul> + +<h4 id="rangetracker-methods">RangeTracker Methods</h4> + +<p>To implement a <code class="highlighter-rouge">RangeTracker</code>, your subclass must override the following methods:</p> + +<ul> + <li> + <p><code class="highlighter-rouge">start_position</code>: Returns the starting position of the current range, inclusive.</p> + </li> + <li> + <p><code class="highlighter-rouge">stop_position</code>: Returns the ending position of the current range, exclusive.</p> + </li> + <li> + <p><code class="highlighter-rouge">try_claim</code>: This method is used to determine if a record at a split point is within the range. This method should modify the internal state of the <code class="highlighter-rouge">RangeTracker</code> by updating the last-consumed position to the given starting <code class="highlighter-rouge">position</code> of the record being read by the source. The method returns true if the given position falls within the current range.</p> + </li> + <li> + <p><code class="highlighter-rouge">set_current_position</code>: This method updates the last-consumed position to the given starting position of a record being read by a source. You can invoke this method for records that do not start at split points, and this should modify the internal state of the <code class="highlighter-rouge">RangeTracker</code>. If the record starts at a split point, you must invoke <code class="highlighter-rouge">try_claim</code> instead of this method.</p> + </li> + <li> + <p><code class="highlighter-rouge">position_at_fraction</code>: Given a fraction within the range [0.0, 1.0), this method will return the position at the given fraction compared to the position range [<code class="highlighter-rouge">self.start_position</code>, <code class="highlighter-rouge">self.stop_position</code>).</p> + </li> + <li> + <p><code class="highlighter-rouge">try_split</code>: This method attempts to split the current range into two parts around a suggested position. It is allowed to split at a different position, but in most cases it will split at the suggested position.</p> + </li> +</ul> + +<p>This method splits the current range [<code class="highlighter-rouge">self.start_position</code>, <code class="highlighter-rouge">self.stop_position</code>) into a âprimaryâ part [<code class="highlighter-rouge">self.start_position</code>, <code class="highlighter-rouge">split_position</code>), and a âresidualâ part [<code class="highlighter-rouge">split_position</code>, <code class="highlighter-rouge">self.stop_position</code>), assuming that <code class="highlighter-rouge">split_position</code> has not been consumed yet.</p> + +<p>If <code class="highlighter-rouge">split_position</code> has already been consumed, the method returns <code class="highlighter-rouge">None</code>. Otherwise, it updates the current range to be the primary and returns a tuple (<code class="highlighter-rouge">split_position</code>, <code class="highlighter-rouge">split_fraction</code>). <code class="highlighter-rouge">split_fraction</code> should be the fraction of size of range [<code class="highlighter-rouge">self.start_position</code>, <code class="highlighter-rouge">split_position</code>) compared to the original (before split) range [<code class="highlighter-rouge">self.start_position</code>, <code class="highlighter-rouge">self.stop_position</code>).</p> + +<ul> + <li><code class="highlighter-rouge">fraction_consumed</code>: Returns the approximate fraction of consumed positions in the source.</li> +</ul> + +<p><strong>Note:</strong> Methods of class <code class="highlighter-rouge">iobase.RangeTracker</code> may be invoked by multiple threads, hence this class must be made thread-safe, for example, by using a single lock object.</p> + +<h3 id="convenience-source-base-classes">Convenience Source Base Classes</h3> + +<p>The Beam SDK for Python contains some convenient abstract base classes to help you easily create new sources.</p> + +<h4 id="filebasedsource">FileBasedSource</h4> + +<p><code class="highlighter-rouge">FileBasedSource</code> is a framework for developing sources for new file types. You can derive your <code class="highlighter-rouge">BoundedSource</code> class from the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py">FileBasedSource</a> class.</p> + +<p>To create a source for a new file type, you need to create a sub-class of <code class="highlighter-rouge">FileBasedSource</code>. Sub-classes of <code class="highlighter-rouge">FileBasedSource</code> must implement the method <code class="highlighter-rouge">FileBasedSource.read_records()</code>.</p> + +<p>See <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/avroio.py">AvroSource</a> for an example implementation of <code class="highlighter-rouge">FileBasedSource</code>.</p> + +<h2 id="a-namereading-sourcesareading-from-a-custom-source"><a name="reading-sources"></a>Reading from a Custom Source</h2> + +<p>The following example, <code class="highlighter-rouge">CountingSource</code>, demonstrates an implementation of <code class="highlighter-rouge">BoundedSource</code> and uses the SDK-provided <code class="highlighter-rouge">RangeTracker</code> called <code class="highlighter-rouge">OffsetRangeTracker</code>.</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>class CountingSource(iobase.BoundedSource): + + def __init__(self, count): + self._count = count + + def estimate_size(self): + return self._count + + def get_range_tracker(self, start_position, stop_position): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = self._count + + return OffsetRangeTracker(start_position, stop_position) + + def read(self, range_tracker): + for i in range(self._count): + if not range_tracker.try_claim(i): + return + yield i + + def split(self, desired_bundle_size, start_position=None, + stop_position=None): + if start_position is None: + start_position = 0 + if stop_position is None: + stop_position = self._count + + bundle_start = start_position + while bundle_start < self._count: + bundle_stop = max(self._count, bundle_start + desired_bundle_size) + yield iobase.SourceBundle(weight=(bundle_stop - bundle_start), + source=self, + start_position=bundle_start, + stop_position=bundle_stop) + bundle_start = bundle_stop +</code></pre> +</div> + +<p>To read data from a custom source in your pipeline, use the <code class="highlighter-rouge">Read</code> transform:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>p = beam.Pipeline(options=PipelineOptions()) +numbers = p | 'ProduceNumbers' >> beam.io.Read(CountingSource(count)) +</code></pre> +</div> + +<p><strong>Note:</strong> When you create a source that end-users are going to use, itâs recommended that you do not expose the code for the source itself as demonstrated in the example above, but rather use a wrapping <code class="highlighter-rouge">PTransform</code> instead. See <a href="#ptransform-wrappers">PTransform wrappers</a> to see how and why to avoid exposing your sources.</p> + +<h2 id="a-namecreating-sinksacreating-a-custom-sink"><a name="creating-sinks"></a>Creating a Custom Sink</h2> + +<p>You should create a custom sink if youâd like to use the advanced features that the Sink API provides, such as global initialization and finalization that allow the write operation to appear âatomicâ (i.e. either all data is written or none is).</p> + +<p>A sink represents a resource that can be written to using the <code class="highlighter-rouge">Write</code> transform. A parallel write to a sink consists of three phases:</p> + +<ol> + <li>A sequential initialization phase. For example, creating a temporary output directory.</li> + <li>A parallel write phase where workers write bundles of records.</li> + <li>A sequential finalization phase. For example, merging output files.</li> +</ol> + +<p>For example, if youâd like to write to a new table in a database, you should use the Sink API. In this case, the initializer will create a temporary table, the writer will write rows to it, and the finalizer will rename the table to a final location.</p> + +<p>To create a custom data sink for your pipeline, youâll need to provide the format-specific logic that tells the sink how to write bounded data from your pipelineâs <code class="highlighter-rouge">PCollection</code>s to an output sink. The sink writes bundles of data in parallel using multiple workers.</p> + +<p>You supply the writing logic by creating the following classes:</p> + +<ul> + <li> + <p>A subclass of <code class="highlighter-rouge">Sink</code>, which you can find in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py">iobase.py</a> module. <code class="highlighter-rouge">Sink</code> describes the location or resource to write to. Depending on the type of sink, your <code class="highlighter-rouge">Sink</code> subclass may contain fields such as the path to an output directory on a filesystem or a database table name. <code class="highlighter-rouge">Sink</code> provides three methods for performing a write operation to the sink it describes. Your subclass of <code class="highlighter-rouge">Sink</code> must implement these three methods: <code class="highlighter-rouge">initialize_write()</code>, <code class="highlighter-rouge">open_writer()</code>, and <code class="highlighter-rouge">finalize_write()</code>.</p> + </li> + <li> + <p>A subclass of <code class="highlighter-rouge">Writer</code>, which you can find in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py">iobase.py</a> module. <code class="highlighter-rouge">Writer</code> writes a bundle of elements from an input <code class="highlighter-rouge">PCollection</code> to your designated data sink. <code class="highlighter-rouge">Writer</code> defines two methods: <code class="highlighter-rouge">write()</code>, which writes a single record from the bundle, and <code class="highlighter-rouge">close()</code>, which is called once at the end of writing a bundle.</p> + </li> +</ul> + +<h3 id="implementing-the-sink-subclass">Implementing the Sink Subclass</h3> + +<p>Your <code class="highlighter-rouge">Sink</code> subclass describes the location or resource to which your pipeline writes its output. This might include a file system location, the name of a database table or dataset, etc.</p> + +<p>To implement a <code class="highlighter-rouge">Sink</code>, your subclass must override the following methods:</p> + +<ul> + <li> + <p><code class="highlighter-rouge">initialize_write</code>: This method performs any necessary initialization before writing to the output location. Services call this method before writing begins. For example, you can use <code class="highlighter-rouge">initialize_write</code> to create a temporary output directory.</p> + </li> + <li> + <p><code class="highlighter-rouge">open_writer</code>: This method enables writing a bundle of elements to the sink.</p> + </li> + <li> + <p><code class="highlighter-rouge">finalize_write</code>:This method finalizes the sink after all data is written to it. Given the result of initialization and an iterable of results from bundle writes, <code class="highlighter-rouge">finalize_write</code> performs finalization after writing and closes the sink. This method is called after all bundle write operations are complete.</p> + </li> +</ul> + +<p><strong>Caution:</strong> <code class="highlighter-rouge">initialize_write</code> and <code class="highlighter-rouge">finalize_write</code> are conceptually called once: at the beginning and end of a <code class="highlighter-rouge">Write</code> transform. However, when you implement these methods, you must ensure that they are <strong>idempotent</strong>, as they may be called multiple times on different machines in the case of failure, retry, or for redundancy.</p> + +<h3 id="implementing-the-writer-subclass">Implementing the Writer Subclass</h3> + +<p>Your <code class="highlighter-rouge">Writer</code> subclass implements the logic for writing a bundle of elements from a <code class="highlighter-rouge">PCollection</code> to output location defined in your <code class="highlighter-rouge">Sink</code>. Services may instantiate multiple instances of your <code class="highlighter-rouge">Writer</code> in different threads on the same worker, so access to any static members or methods must be thread-safe.</p> + +<p>To implement a <code class="highlighter-rouge">Writer</code>, your subclass must override the following abstract methods:</p> + +<ul> + <li> + <p><code class="highlighter-rouge">write</code>: This method writes a value to your <code class="highlighter-rouge">Sink</code> using the current writer.</p> + </li> + <li> + <p><code class="highlighter-rouge">close</code>: This method closes the current writer.</p> + </li> +</ul> + +<h4 id="handling-bundle-ids">Handling Bundle IDs</h4> + +<p>When the service calls <code class="highlighter-rouge">Sink.open_writer</code>, it will pass a unique bundle ID for the records to be written. Your <code class="highlighter-rouge">Writer</code> must use this bundle ID to ensure that its output does not interfere with that of other <code class="highlighter-rouge">Writer</code> instances that might have been created in parallel. This is particularly important as the service may retry write operations multiple times in case of failure.</p> + +<p>For example, if your <code class="highlighter-rouge">Sink</code>âs output is file-based, your <code class="highlighter-rouge">Writer</code> class might use the bundle ID as a filename suffix to ensure that your <code class="highlighter-rouge">Writer</code> writes its records to a unique output file not used by other <code class="highlighter-rouge">Writer</code>s. You can then have your <code class="highlighter-rouge">Writer</code>âs <code class="highlighter-rouge">close</code> method return that file location as part of the write result.</p> + +<h3 id="convenience-sink-and-writer-base-classes">Convenience Sink and Writer Base Classes</h3> + +<p>The Beam SDK for Python contains some convenient abstract base classes to help you create <code class="highlighter-rouge">Source</code> and <code class="highlighter-rouge">Reader</code> classes that work with common data storage formats, like files.</p> + +<h4 id="filesink">FileSink</h4> + +<p>If your data source uses files, you can derive your <code class="highlighter-rouge">Sink</code> and <code class="highlighter-rouge">Writer</code> classes from the <code class="highlighter-rouge">FileSink</code> and <code class="highlighter-rouge">FileSinkWriter</code> classes, which can be found in the <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/fileio.py">fileio.py</a> module. These classes implement code common sinks that interact with files, including:</p> + +<ul> + <li>Setting file headers and footers</li> + <li>Sequential record writing</li> + <li>Setting the output MIME type</li> +</ul> + +<h2 id="a-namewriting-sinksawriting-to-a-custom-sink"><a name="writing-sinks"></a>Writing to a Custom Sink</h2> + +<p>Consider a simple key-value storage that writes a given set of key-value pairs to a set of tables. The following is the key-value storageâs API:</p> + +<ul> + <li><code class="highlighter-rouge">connect(url)</code> Connects to the storage and returns an access token which can be used to perform further operations.</li> + <li><code class="highlighter-rouge">open_table(access_token, table_name)</code> Creates a table named âtable_nameâ. Returns a table object.</li> + <li><code class="highlighter-rouge">write_to_table(access_token, table, key, value)</code> Writes a key, value pair to the given table.</li> + <li><code class="highlighter-rouge">rename_table(access_token, old_name, new_name)</code> Renames the table named âold_nameâ to ânew_nameâ.</li> +</ul> + +<p>The following code demonstrates how to implement the <code class="highlighter-rouge">Sink</code> class for this key-value storage.</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>class SimpleKVSink(iobase.Sink): + + def __init__(self, url, final_table_name): + self._url = url + self._final_table_name = final_table_name + + def initialize_write(self): + access_token = simplekv.connect(self._url) + return access_token + + def open_writer(self, access_token, uid): + table_name = 'table' + uid + return SimpleKVWriter(access_token, table_name) + + def finalize_write(self, access_token, table_names): + for i, table_name in enumerate(table_names): + simplekv.rename_table( + access_token, table_name, self._final_table_name + str(i)) +</code></pre> +</div> + +<p>The following code demonstrates how to implement the <code class="highlighter-rouge">Writer</code> class for this key-value storage.</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>class SimpleKVWriter(iobase.Writer): + + def __init__(self, access_token, table_name): + self._access_token = access_token + self._table_name = table_name + self._table = simplekv.open_table(access_token, table_name) + + def write(self, record): + key, value = record + + simplekv.write_to_table(self._access_token, self._table, key, value) + + def close(self): + return self._table_name +</code></pre> +</div> + +<p>The following code demonstrates how to write to the sink using the <code class="highlighter-rouge">Write</code> transform.</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>p = beam.Pipeline(options=PipelineOptions()) +kvs = p | 'CreateKVs' >> beam.Create(KVs) + +kvs | 'WriteToSimpleKV' >> beam.io.Write( + SimpleKVSink('http://url_to_simple_kv/', final_table_name)) +</code></pre> +</div> + +<p><strong>Note:</strong> When you create a sink that end-users are going to use, itâs recommended that you do not expose the code for the sink itself as demonstrated in the example above, but rather use a wrapping <code class="highlighter-rouge">PTransform</code> instead. See <a href="#ptransform-wrappers">PTransform wrappers</a> to see how and why to avoid exposing your sinks.</p> + +<h2 id="a-nameptransform-wrappersaptransform-wrappers"><a name="ptransform-wrappers"></a>PTransform Wrappers</h2> + +<p>If you create a custom source or sink for your own use, such as for learning purposes, you should create them as explained in the sections above and use them as demonstrated in the examples.</p> + +<p>However, when you create a source or sink that end-users are going to use, instead of exposing the source or sink itself, you should create a wrapper <code class="highlighter-rouge">PTransform</code>. Ideally, a custom source or sink should be exposed to users simply as âsomething that can be applied in a pipelineâ, which is actually a <code class="highlighter-rouge">PTransform</code>. That way, its implementation can be hidden and arbitrarily complex or simple.</p> + +<p>The greatest benefit of not exposing the implementation details is that later on you will be able to add additional functionality without breaking the existing implementation for users. For example, if your usersâ pipelines read from your source using <code class="highlighter-rouge">beam.io.Read(...)</code> and you want to insert a reshard into the pipeline, all of your users would need to add the reshard themselves (using the <code class="highlighter-rouge">GroupByKey</code> transform). To solve this, itâs recommended that you expose your source as a composite <code class="highlighter-rouge">PTransform</code> that performs both the read operation and the reshard.</p> + +<p>To avoid exposing your custom sources and sinks to end-users, itâs recommended that you use the <code class="highlighter-rouge">_</code> prefix when creating your custom source and sink classes. Then, create a wrapper <code class="highlighter-rouge">PTransform</code>.</p> + +<p>The following examples change the custom source and sink from the above sections so that they are not exposed to end-users. For the source, rename <code class="highlighter-rouge">CountingSource</code> to <code class="highlighter-rouge">_CountingSource</code>. Then, create the wrapper <code class="highlighter-rouge">PTransform</code>, called <code class="highlighter-rouge">ReadFromCountingSource</code>:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>class ReadFromCountingSource(PTransform): + + def __init__(self, count, **kwargs): + super(ReadFromCountingSource, self).__init__(**kwargs) + self._count = count + + def expand(self, pcoll): + return pcoll | iobase.Read(_CountingSource(count)) +</code></pre> +</div> + +<p>Finally, read from the source:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>p = beam.Pipeline(options=PipelineOptions()) +numbers = p | 'ProduceNumbers' >> ReadFromCountingSource(count) +</code></pre> +</div> + +<p>For the sink, rename <code class="highlighter-rouge">SimpleKVSink</code> to <code class="highlighter-rouge">_SimpleKVSink</code>. Then, create the wrapper <code class="highlighter-rouge">PTransform</code>, called <code class="highlighter-rouge">WriteToKVSink</code>:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>class WriteToKVSink(PTransform): + + def __init__(self, url, final_table_name, **kwargs): + super(WriteToKVSink, self).__init__(**kwargs) + self._url = url + self._final_table_name = final_table_name + + def expand(self, pcoll): + return pcoll | iobase.Write(_SimpleKVSink(self._url, + self._final_table_name)) +</code></pre> +</div> + +<p>Finally, write to the sink:</p> + +<div class="highlighter-rouge"><pre class="highlight"><code>p = beam.Pipeline(options=PipelineOptions()) +kvs = p | 'CreateKVs' >> beam.core.Create(KVs) +kvs | 'WriteToSimpleKV' >> WriteToKVSink( + 'http://url_to_simple_kv/', final_table_name) +</code></pre> +</div> + + + </div> + + + <hr> + <div class="row"> + <div class="col-xs-12"> + <footer> + <p class="text-center"> + © Copyright + <a href="http://www.apache.org">The Apache Software Foundation</a>, + 2017. All Rights Reserved. + </p> + <p class="text-center"> + <a href="/privacy_policy">Privacy Policy</a> | + <a href="/feed.xml">RSS Feed</a> + </p> + </footer> + </div> + </div> + <!-- container div end --> +</div> + + + </body> + +</html> http://git-wip-us.apache.org/repos/asf/beam-site/blob/83a6e401/content/documentation/sdks/python/index.html ---------------------------------------------------------------------- diff --git a/content/documentation/sdks/python/index.html b/content/documentation/sdks/python/index.html index 6ae91b0..24573cf 100644 --- a/content/documentation/sdks/python/index.html +++ b/content/documentation/sdks/python/index.html @@ -164,6 +164,11 @@ <p>When you run your pipeline locally, the packages that your pipeline depends on are available because they are installed on your local machine. However, when you want to run your pipeline remotely, you must make sure these dependencies are available on the remote machines. <a href="/documentation/sdks/python-pipeline-dependencies">Managing Python Pipeline Dependencies</a> shows you how to make your dependencies available to the remote workers.</p> +<h2 id="custom-sources-and-sinks">Custom Sources and Sinks</h2> + +<p>The Beam SDK for Python provides an extensible API that you can use to create custom data sources and sinks. The <a href="/documentation/sdks/python-custom-io">Custom Sources and Sinks for Python tutorial</a> shows how to create custom sources and sinks using <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py">Beamâs Source and Sink API</a>.</p> + + </div>