http://git-wip-us.apache.org/repos/asf/beam-site/blob/5c993c61/content/contribute/runner-guide/index.html
----------------------------------------------------------------------
diff --git a/content/contribute/runner-guide/index.html 
b/content/contribute/runner-guide/index.html
new file mode 100644
index 0000000..2dc6917
--- /dev/null
+++ b/content/contribute/runner-guide/index.html
@@ -0,0 +1,1375 @@
+<!DOCTYPE html>
+<html lang="en">
+  <head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1">
+  <title>Runner Authoring Guide</title>
+  <meta name="description" content="Apache Beam is an open source, unified 
model and set of language-specific SDKs for defining and executing data 
processing workflows, and also data ingestion and integration flows, supporting 
Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). 
Dataflow pipelines simplify the mechanics of large-scale batch and streaming 
data processing and can run on a number of runtimes like Apache Flink, Apache 
Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in 
different languages, allowing users to easily implement their data integration 
processes.
+">
+  <link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400"; 
rel="stylesheet">
+  <link rel="stylesheet" href="/css/site.css">
+  <script 
src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js";></script>
+  <script src="/js/bootstrap.min.js"></script>
+  <script src="/js/language-switch.js"></script>
+  <link rel="canonical" 
href="https://beam.apache.org/contribute/runner-guide/"; data-proofer-ignore>
+  <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico">
+  <link rel="alternate" type="application/rss+xml" title="Apache Beam" 
href="https://beam.apache.org/feed.xml";>
+  <script>
+    
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
+    (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new 
Date();a=s.createElement(o),
+    
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
+    
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
+    ga('create', 'UA-73650088-1', 'auto');
+    ga('send', 'pageview');
+  </script>
+</head>
+
+  <body class="body ">
+    <nav class="header navbar navbar-fixed-top">
+    <div class="navbar-header">
+      <a href="/" class="navbar-brand" >
+        <img alt="Brand" style="height: 25px" 
src="/images/beam_logo_navbar.png">
+      </a>
+        <button type="button" class="navbar-toggle collapsed" 
data-toggle="collapse" data-target="#navbar" aria-expanded="false" 
aria-controls="navbar">
+          <span class="sr-only">Toggle navigation</span>
+          <span class="icon-bar"></span>
+          <span class="icon-bar"></span>
+          <span class="icon-bar"></span>
+        </button>
+    </div>
+    <div id="navbar" class="navbar-collapse collapse">
+      <ul class="nav navbar-nav">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Get Started <span 
class="caret"></span></a>
+          <ul class="dropdown-menu">
+            <li><a href="/get-started/beam-overview/">Beam Overview</a></li>
+            <li><a href="/get-started/quickstart-java/">Quickstart - 
Java</a></li>
+            <li><a href="/get-started/quickstart-py/">Quickstart - 
Python</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Example Walkthroughs</li>
+            <li><a href="/get-started/wordcount-example/">WordCount</a></li>
+            <li><a href="/get-started/mobile-gaming-example/">Mobile 
Gaming</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Resources</li>
+            <li><a href="/get-started/downloads">Downloads</a></li>
+            <li><a href="/get-started/support">Support</a></li>
+          </ul>
+        </li>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Documentation <span 
class="caret"></span></a>
+          <ul class="dropdown-menu">
+            <li><a href="/documentation">Using the Documentation</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Beam Concepts</li>
+            <li><a href="/documentation/programming-guide/">Programming 
Guide</a></li>
+            <li><a href="/documentation/resources/">Additional 
Resources</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Pipeline Fundamentals</li>
+            <li><a 
href="/documentation/pipelines/design-your-pipeline/">Design Your 
Pipeline</a></li>
+            <li><a 
href="/documentation/pipelines/create-your-pipeline/">Create Your 
Pipeline</a></li>
+            <li><a href="/documentation/pipelines/test-your-pipeline/">Test 
Your Pipeline</a></li>
+            <li><a href="/documentation/io/io-toc/">Pipeline I/O</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">SDKs</li>
+            <li><a href="/documentation/sdks/java/">Java SDK</a></li>
+            <li><a href="/documentation/sdks/javadoc/2.0.0/" 
target="_blank">Java SDK API Reference <img src="/images/external-link-icon.png"
+                                                                               
                                                                width="14" 
height="14"
+                                                                               
                                                                           
alt="External link."></a>
+            </li>
+            <li><a href="/documentation/sdks/python/">Python SDK</a></li>
+            <li><a href="/documentation/sdks/pydoc/2.0.0/" 
target="_blank">Python SDK API Reference <img 
src="/images/external-link-icon.png"
+                                                                               
                                                                width="14" 
height="14"
+                                                                               
                                                                           
alt="External link."></a>
+            </li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Runners</li>
+            <li><a href="/documentation/runners/capability-matrix/">Capability 
Matrix</a></li>
+            <li><a href="/documentation/runners/direct/">Direct Runner</a></li>
+            <li><a href="/documentation/runners/apex/">Apache Apex 
Runner</a></li>
+            <li><a href="/documentation/runners/flink/">Apache Flink 
Runner</a></li>
+            <li><a href="/documentation/runners/spark/">Apache Spark 
Runner</a></li>
+            <li><a href="/documentation/runners/dataflow/">Cloud Dataflow 
Runner</a></li>
+          </ul>
+        </li>
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false">Contribute <span 
class="caret"></span></a>
+          <ul class="dropdown-menu">
+            <li><a href="/contribute">Get Started Contributing</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Guides</li>
+            <li><a href="/contribute/contribution-guide/">Contribution 
Guide</a></li>
+            <li><a href="/contribute/testing/">Testing Guide</a></li>
+            <li><a href="/contribute/release-guide/">Release Guide</a></li>
+            <li><a href="/contribute/ptransform-style-guide/">PTransform Style 
Guide</a></li>
+            <li><a href="/contribute/runner-guide/">Runner Authoring 
Guide</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Technical References</li>
+            <li><a href="/contribute/design-principles/">Design 
Principles</a></li>
+            <li><a href="/contribute/work-in-progress/">Ongoing 
Projects</a></li>
+            <li><a href="/contribute/source-repository/">Source 
Repository</a></li>
+            <li role="separator" class="divider"></li>
+            <li class="dropdown-header">Promotion</li>
+            <li><a href="/contribute/presentation-materials/">Presentation 
Materials</a></li>
+            <li><a href="/contribute/logos/">Logos and Design</a></li>
+            <li role="separator" class="divider"></li>
+            <li><a href="/contribute/maturity-model/">Maturity Model</a></li>
+            <li><a href="/contribute/team/">Team</a></li>
+          </ul>
+        </li>
+
+        <li><a href="/blog">Blog</a></li>
+      </ul>
+      <ul class="nav navbar-nav navbar-right">
+        <li class="dropdown">
+          <a href="#" class="dropdown-toggle" data-toggle="dropdown" 
role="button" aria-haspopup="true" aria-expanded="false"><img 
src="https://www.apache.org/foundation/press/kit/feather_small.png"; alt="Apache 
Logo" style="height:20px;"><span class="caret"></span></a>
+          <ul class="dropdown-menu dropdown-menu-right">
+            <li><a href="http://www.apache.org/";>ASF Homepage</a></li>
+            <li><a href="http://www.apache.org/licenses/";>License</a></li>
+            <li><a href="http://www.apache.org/security/";>Security</a></li>
+            <li><a 
href="http://www.apache.org/foundation/thanks.html";>Thanks</a></li>
+            <li><a 
href="http://www.apache.org/foundation/sponsorship.html";>Sponsorship</a></li>
+            <li><a 
href="https://www.apache.org/foundation/policies/conduct";>Code of 
Conduct</a></li>
+          </ul>
+        </li>
+      </ul>
+    </div><!--/.nav-collapse -->
+</nav>
+
+    <div class="body__contained">
+      <h1 id="runner-authoring-guide">Runner Authoring Guide</h1>
+
+<p>This guide walks through how to implement a new runner. It is aimed at 
someone
+who has a data processing system and wants to use it to execute a Beam
+pipeline. The guide starts from the basics, to help you evaluate the work
+ahead. Then the sections become more and more detailed, to be a resource
+throughout the development of your runner.</p>
+
+<p>Topics covered:</p>
+
+<ul id="markdown-toc">
+  <li><a href="#basics-of-the-beam-model" 
id="markdown-toc-basics-of-the-beam-model">Basics of the Beam model</a>    <ul>
+      <li><a href="#pipeline" id="markdown-toc-pipeline">Pipeline</a></li>
+      <li><a href="#ptransforms" 
id="markdown-toc-ptransforms">PTransforms</a></li>
+      <li><a href="#pcollections" 
id="markdown-toc-pcollections">PCollections</a>        <ul>
+          <li><a href="#bounded-vs-unbounded" 
id="markdown-toc-bounded-vs-unbounded">Bounded vs Unbounded</a></li>
+          <li><a href="#timestamps" 
id="markdown-toc-timestamps">Timestamps</a></li>
+          <li><a href="#watermarks" 
id="markdown-toc-watermarks">Watermarks</a></li>
+          <li><a href="#windowed-elements" 
id="markdown-toc-windowed-elements">Windowed elements</a></li>
+          <li><a href="#coder" id="markdown-toc-coder">Coder</a></li>
+          <li><a href="#windowing-strategy" 
id="markdown-toc-windowing-strategy">Windowing Strategy</a></li>
+        </ul>
+      </li>
+      <li><a href="#user-defined-functions-udfs" 
id="markdown-toc-user-defined-functions-udfs">User-Defined Functions 
(UDFs)</a></li>
+      <li><a href="#runner" id="markdown-toc-runner">Runner</a></li>
+    </ul>
+  </li>
+  <li><a href="#implementing-the-beam-primitives" 
id="markdown-toc-implementing-the-beam-primitives">Implementing the Beam 
Primitives</a>    <ul>
+      <li><a href="#what-if-you-havent-implemented-some-of-these-features" 
id="markdown-toc-what-if-you-havent-implemented-some-of-these-features">What if 
you haven’t implemented some of these features?</a></li>
+      <li><a href="#implementing-the-pardo-primitive" 
id="markdown-toc-implementing-the-pardo-primitive">Implementing the ParDo 
primitive</a>        <ul>
+          <li><a href="#bundles" id="markdown-toc-bundles">Bundles</a></li>
+          <li><a href="#the-dofn-lifecycle" 
id="markdown-toc-the-dofn-lifecycle">The DoFn Lifecycle</a></li>
+          <li><a href="#dofnrunners" 
id="markdown-toc-dofnrunners">DoFnRunner(s)</a></li>
+          <li><a href="#side-inputs" id="markdown-toc-side-inputs">Side 
Inputs</a></li>
+          <li><a href="#state-and-timers" 
id="markdown-toc-state-and-timers">State and Timers</a></li>
+          <li><a href="#splittable-dofn" 
id="markdown-toc-splittable-dofn">Splittable DoFn</a></li>
+        </ul>
+      </li>
+      <li><a href="#implementing-the-groupbykey-and-window-primitive" 
id="markdown-toc-implementing-the-groupbykey-and-window-primitive">Implementing 
the GroupByKey (and window) primitive</a>        <ul>
+          <li><a href="#group-by-encoded-bytes" 
id="markdown-toc-group-by-encoded-bytes">Group By Encoded Bytes</a></li>
+          <li><a href="#window-merging" 
id="markdown-toc-window-merging">Window Merging</a></li>
+          <li><a href="#implementing-via-groupbykeyonly--groupalsobywindow" 
id="markdown-toc-implementing-via-groupbykeyonly--groupalsobywindow">Implementing
 via GroupByKeyOnly + GroupAlsoByWindow</a></li>
+          <li><a href="#dropping-late-data" 
id="markdown-toc-dropping-late-data">Dropping late data</a></li>
+          <li><a href="#triggering" 
id="markdown-toc-triggering">Triggering</a></li>
+          <li><a href="#timestampcombiner" 
id="markdown-toc-timestampcombiner">TimestampCombiner</a></li>
+        </ul>
+      </li>
+      <li><a href="#implementing-the-window-primitive" 
id="markdown-toc-implementing-the-window-primitive">Implementing the Window 
primitive</a></li>
+      <li><a href="#implementing-the-read-primitive" 
id="markdown-toc-implementing-the-read-primitive">Implementing the Read 
primitive</a>        <ul>
+          <li><a href="#reading-from-an-unboundedsource" 
id="markdown-toc-reading-from-an-unboundedsource">Reading from an 
UnboundedSource</a></li>
+          <li><a href="#reading-from-a-boundedsource" 
id="markdown-toc-reading-from-a-boundedsource">Reading from a 
BoundedSource</a></li>
+        </ul>
+      </li>
+      <li><a href="#implementing-the-flatten-primitive" 
id="markdown-toc-implementing-the-flatten-primitive">Implementing the Flatten 
primitive</a></li>
+      <li><a href="#special-mention-the-combine-composite" 
id="markdown-toc-special-mention-the-combine-composite">Special mention: the 
Combine composite</a></li>
+    </ul>
+  </li>
+  <li><a href="#working-with-pipelines" 
id="markdown-toc-working-with-pipelines">Working with pipelines</a>    <ul>
+      <li><a href="#traversing-a-pipeline" 
id="markdown-toc-traversing-a-pipeline">Traversing a pipeline</a></li>
+      <li><a href="#altering-a-pipeline" 
id="markdown-toc-altering-a-pipeline">Altering a pipeline</a></li>
+    </ul>
+  </li>
+  <li><a href="#testing-your-runner" 
id="markdown-toc-testing-your-runner">Testing your runner</a></li>
+  <li><a href="#integrating-your-runner-nicely-with-sdks" 
id="markdown-toc-integrating-your-runner-nicely-with-sdks">Integrating your 
runner nicely with SDKs</a>    <ul>
+      <li><a href="#integrating-with-the-java-sdk" 
id="markdown-toc-integrating-with-the-java-sdk">Integrating with the Java 
SDK</a>        <ul>
+          <li><a href="#allowing-users-to-pass-options-to-your-runner" 
id="markdown-toc-allowing-users-to-pass-options-to-your-runner">Allowing users 
to pass options to your runner</a></li>
+          <li><a 
href="#registering-your-runner-with-sdks-for-command-line-use" 
id="markdown-toc-registering-your-runner-with-sdks-for-command-line-use">Registering
 your runner with SDKs for command line use</a></li>
+        </ul>
+      </li>
+      <li><a href="#integrating-with-the-python-sdk" 
id="markdown-toc-integrating-with-the-python-sdk">Integrating with the Python 
SDK</a></li>
+    </ul>
+  </li>
+  <li><a href="#writing-an-sdk-independent-runner" 
id="markdown-toc-writing-an-sdk-independent-runner">Writing an SDK-independent 
runner</a>    <ul>
+      <li><a href="#the-fn-api" id="markdown-toc-the-fn-api">The Fn 
API</a></li>
+      <li><a href="#the-runner-api" id="markdown-toc-the-runner-api">The 
Runner API</a></li>
+    </ul>
+  </li>
+  <li><a href="#the-runner-api-protos" 
id="markdown-toc-the-runner-api-protos">The Runner API protos</a>    <ul>
+      <li><a href="#functionspec-proto" 
id="markdown-toc-functionspec-proto"><code 
class="highlighter-rouge">FunctionSpec</code> proto</a></li>
+      <li><a href="#sdkfunctionspec-proto" 
id="markdown-toc-sdkfunctionspec-proto"><code 
class="highlighter-rouge">SdkFunctionSpec</code> proto</a></li>
+      <li><a href="#primitive-transform-payload-protos" 
id="markdown-toc-primitive-transform-payload-protos">Primitive transform 
payload protos</a>        <ul>
+          <li><a href="#pardopayload-proto" 
id="markdown-toc-pardopayload-proto"><code 
class="highlighter-rouge">ParDoPayload</code> proto</a></li>
+          <li><a href="#readpayload-proto" 
id="markdown-toc-readpayload-proto"><code 
class="highlighter-rouge">ReadPayload</code> proto</a></li>
+          <li><a href="#windowintopayload-proto" 
id="markdown-toc-windowintopayload-proto"><code 
class="highlighter-rouge">WindowIntoPayload</code> proto</a></li>
+          <li><a href="#combinepayload-proto" 
id="markdown-toc-combinepayload-proto"><code 
class="highlighter-rouge">CombinePayload</code> proto</a></li>
+        </ul>
+      </li>
+      <li><a href="#ptransform-proto" id="markdown-toc-ptransform-proto"><code 
class="highlighter-rouge">PTransform</code> proto</a></li>
+      <li><a href="#pcollection-proto" 
id="markdown-toc-pcollection-proto"><code 
class="highlighter-rouge">PCollection</code> proto</a></li>
+      <li><a href="#coder-proto" id="markdown-toc-coder-proto"><code 
class="highlighter-rouge">Coder</code> proto</a></li>
+    </ul>
+  </li>
+  <li><a href="#the-runner-api-rpcs" id="markdown-toc-the-runner-api-rpcs">The 
Runner API RPCs</a>    <ul>
+      <li><a href="#pipelinerunnerrunpipeline-rpc" 
id="markdown-toc-pipelinerunnerrunpipeline-rpc"><code 
class="highlighter-rouge">PipelineRunner.run(Pipeline)</code> RPC</a></li>
+      <li><a href="#pipelineresult-aka-job-api" 
id="markdown-toc-pipelineresult-aka-job-api"><code 
class="highlighter-rouge">PipelineResult</code> aka “Job API”</a></li>
+    </ul>
+  </li>
+</ul>
+
+<h2 id="basics-of-the-beam-model">Basics of the Beam model</h2>
+
+<p>Suppose you have a data processing engine that can pretty easily process 
graphs
+of operations. You want to integrate it with the Beam ecosystem to get access
+to other languages, great event time processing, and a library of connectors.
+You need to know the core vocabulary:</p>
+
+<ul>
+  <li><a href="#pipeline"><em>Pipeline</em></a> - A pipeline is a graph of 
transformations that a user constructs
+that defines the data processing they want to do.</li>
+  <li><a href="#pcollections"><em>PCollection</em></a> - Data being processed 
in a pipeline is part of a PCollection.</li>
+  <li><a href="#ptransforms"><em>PTransforms</em></a> - The operations 
executed within a pipeline. These are best
+thought of as operations on PCollections.</li>
+  <li><em>SDK</em> - A language-specific library for pipeline authors (we 
often call them
+“users” even though we have many kinds of users) to build transforms,
+construct their pipelines and submit them to a runner</li>
+  <li><em>Runner</em> - You are going to write a piece of software called a 
runner that
+takes a Beam pipeline and executes it using the capabilities of your data
+processing engine.</li>
+</ul>
+
+<p>These concepts may be very similar to your processing engine’s concepts. 
Since
+Beam’s design is for cross-language operation and reusable libraries of
+transforms, there are some special features worth highlighting.</p>
+
+<h3 id="pipeline">Pipeline</h3>
+
+<p>A pipeline in Beam is a graph of PTransforms operating on PCollections. A
+pipeline is constructed by a user in their SDK of choice, and makes its way to
+your runner either via the SDK directly or via the Runner API’s (forthcoming)
+RPC interfaces.</p>
+
+<h3 id="ptransforms">PTransforms</h3>
+
+<p>In Beam, a PTransform can be one of the five primitives or it can be a
+composite transform encapsulating a subgraph. The primitives are:</p>
+
+<ul>
+  <li><a href="#implementing-the-read-primitive"><em>Read</em></a> - parallel 
connectors to external
+systems</li>
+  <li><a href="#implementing-the-pardo-primitive"><em>ParDo</em></a> - per 
element processing</li>
+  <li><a 
href="#implementing-the-groupbykey-and-window-primitive"><em>GroupByKey</em></a>
 - 
+aggregating elements per key and window</li>
+  <li><a href="#implementing-the-flatten-primitive"><em>Flatten</em></a> - 
union of PCollections</li>
+  <li><a href="#implementing-the-window-primitive"><em>Window</em></a> - set 
the windowing strategy
+for a PCollection</li>
+</ul>
+
+<p>When implementing a runner, these are the operations you need to implement.
+Composite transforms may or may not be important to your runner. If you expose
+a UI, maintaining some of the composite structure will make the pipeline easier
+for a user to understand. But the result of processing is not changed.</p>
+
+<h3 id="pcollections">PCollections</h3>
+
+<p>A PCollection is an unordered bag of elements. Your runner will be 
responsible
+for storing these elements.  There are some major aspects of a PCollection to
+note:</p>
+
+<h4 id="bounded-vs-unbounded">Bounded vs Unbounded</h4>
+
+<p>A PCollection may be bounded or unbounded.</p>
+
+<ul>
+  <li><em>Bounded</em> - it is finite and you know it, as in batch use 
cases</li>
+  <li><em>Unbounded</em> - it may be never end, you don’t know, as in 
streaming use cases</li>
+</ul>
+
+<p>These derive from the intuitions of batch and stream processing, but the two
+are unified in Beam and bounded and unbounded PCollections can coexist in the
+same pipeline. If your runner can only support bounded PCollections, you’ll
+need to reject pipelines that contain unbounded PCollections. If your
+runner is only really targeting streams, there are adapters in our support code
+to convert everything to APIs targeting unbounded data.</p>
+
+<h4 id="timestamps">Timestamps</h4>
+
+<p>Every element in a PCollection has a timestamp associated with it.</p>
+
+<p>When you execute a primitive connector to some storage system, that 
connector
+is responsible for providing initial timestamps.  Your runner will need to
+propagate and aggregate timestamps. If the timestamp is not important, as with
+certain batch processing jobs where elements do not denote events, they will be
+the minimum representable timestamp, often referred to colloquially as
+“negative infinity”.</p>
+
+<h4 id="watermarks">Watermarks</h4>
+
+<p>Every PCollection has to have a watermark that estimates how complete the
+PCollection is.</p>
+
+<p>The watermark is a guess that “we’ll never see an element with an 
earlier
+timestamp”. Sources of data are responsible for producing a watermark. Your
+runner needs to implement watermark propagation as PCollections are processed,
+merged, and partitioned.</p>
+
+<p>The contents of a PCollection are complete when a watermark advances to
+“infinity”. In this manner, you may discover that an unbounded PCollection 
is
+finite.</p>
+
+<h4 id="windowed-elements">Windowed elements</h4>
+
+<p>Every element in a PCollection resides in a window. No element resides in
+multiple windows (two elements can be equal except for their window, but they
+are not the same).</p>
+
+<p>When elements are read from the outside world they arrive in the global 
window.
+When they are written to the outside world, they are effectively placed back
+into the global window (any writing transform that doesn’t take this
+perspective probably risks data loss).</p>
+
+<p>A window has a maximum timestamp, and when the watermark exceeds this plus
+user-specified allowed lateness the window is expired. All data related
+to an expired window may be discarded at any time.</p>
+
+<h4 id="coder">Coder</h4>
+
+<p>Every PCollection has a coder, a specification of the binary format of the 
elements.</p>
+
+<p>In Beam, the user’s pipeline may be written in a language other than the
+language of the runner. There is no expectation that the runner can actually
+deserialize user data. So the Beam model operates principally on encoded data -
+“just bytes”. Each PCollection has a declared encoding for its elements, 
called
+a coder. A coder has a URN that identifies the encoding, and may have
+additional sub-coders (for example, a coder for lists may contain a coder for
+the elements of the list). Language-specific serialization techniques can, and
+frequently are used, but there are a few key formats - such as key-value pairs
+and timestamps - that are common so your runner can understand them.</p>
+
+<h4 id="windowing-strategy">Windowing Strategy</h4>
+
+<p>Every PCollection has a windowing strategy, a specification of essential
+information for grouping and triggering operations.</p>
+
+<p>The details will be discussed below when we discuss the
+<a href="#implementing-the-window-primitive">Window</a> primitive, which sets 
up the
+windowing strategy, and
+<a href="#implementing-the-groupbykey-and-window-primitive">GroupByKey</a> 
primitive,
+which has behavior governed by the windowing strategy.</p>
+
+<h3 id="user-defined-functions-udfs">User-Defined Functions (UDFs)</h3>
+
+<p>Beam has seven varieties of user-defined function (UDF). A Beam pipeline
+may contain UDFs written in a language other than your runner, or even multiple
+languages in the same pipeline (see the <a href="#the-runner-api">Runner 
API</a>) so the
+definitions are language-independent (see the <a href="#the-fn-api">Fn 
API</a>).</p>
+
+<p>The UDFs of Beam are:</p>
+
+<ul>
+  <li><em>DoFn</em> - per-element processing function (used in ParDo)</li>
+  <li><em>WindowFn</em> - places elements in windows and merges windows (used 
in Window
+and GroupByKey)</li>
+  <li><em>Source</em> - emits data read from external sources, including 
initial and
+dynamic splitting for parallelism (used in Read)</li>
+  <li><em>ViewFn</em> - adapts a materialized PCollection to a particular 
interface (used
+in side inputs)</li>
+  <li><em>WindowMappingFn</em> - maps one element’s window to another, and 
specifies
+bounds on how far in the past the result window will be (used in side
+inputs)</li>
+  <li><em>CombineFn</em> - associative and commutative aggregation (used in 
Combine and
+state)</li>
+  <li><em>Coder</em> - encodes user data; some coders have standard formats 
and are not really UDFs</li>
+</ul>
+
+<p>The various types of user-defined functions will be described further 
alongside
+the primitives that use them.</p>
+
+<h3 id="runner">Runner</h3>
+
+<p>The term “runner” is used for a couple of things. It generally refers 
to the
+software that takes a Beam pipeline and executes it somehow. Often, this is the
+translation code that you write. It usually also includes some customized
+operators for your data processing engine, and is sometimes used to refer to
+the full stack.</p>
+
+<p>A runner has just a single method <code 
class="highlighter-rouge">run(Pipeline)</code>. From here on, I will often
+use code font for proper nouns in our APIs, whether or not the identifiers
+match across all SDKs.</p>
+
+<p>The <code class="highlighter-rouge">run(Pipeline)</code> method should be 
asynchronous and results in a
+PipelineResult which generally will be a job descriptor for your data
+processing engine, provides methods for checking its status, canceling it, and
+waiting for it to terminate.</p>
+
+<h2 id="implementing-the-beam-primitives">Implementing the Beam Primitives</h2>
+
+<p>Aside from encoding and persisting data - which presumably your engine 
already
+does in some way or another - most of what you need to do is implement the Beam
+primitives. This section provides a detailed look at each primitive, covering
+what you need to know that might not be obvious and what support code is
+provided.</p>
+
+<p>The primitives are designed for the benefit of pipeline authors, not runner
+authors. Each represents a different conceptual mode of operation (external IO,
+element-wise, grouping, windowing, union) rather than a specific implementation
+decision.  The same primitive may require very different implementation based
+on how the user instantiates it. For example, a <code 
class="highlighter-rouge">ParDo</code> that uses state or
+timers may require key partitioning, a <code 
class="highlighter-rouge">GroupByKey</code> with speculative triggering
+may require a more costly or complex implementation, and <code 
class="highlighter-rouge">Read</code> is completely
+different for bounded and unbounded data.</p>
+
+<h3 id="what-if-you-havent-implemented-some-of-these-features">What if you 
haven’t implemented some of these features?</h3>
+
+<p>That’s OK! You don’t have to do it all at once, and there may even be 
features
+that don’t make sense for your runner to ever support.  We maintain a
+<a href="/documentation/runners/capability-matrix/">capability matrix</a> on 
the Beam site so you can tell
+users what you support. When you receive a <code 
class="highlighter-rouge">Pipeline</code>, you should traverse it
+and determine whether or not you can execute each <code 
class="highlighter-rouge">DoFn</code> that you find. If
+you cannot execute some <code class="highlighter-rouge">DoFn</code> in the 
pipeline (or if there is any other
+requirement that your runner lacks) you should reject the pipeline. In your
+native environment, this may look like throwing an
+<code class="highlighter-rouge">UnsupportedOperationException</code>.  The 
Runner API RPCs will make this explicit,
+for cross-language portability.</p>
+
+<h3 id="implementing-the-pardo-primitive">Implementing the ParDo primitive</h3>
+
+<p>The <code class="highlighter-rouge">ParDo</code> primitive describes 
element-wise transformation for a
+<code class="highlighter-rouge">PCollection</code>.  <code 
class="highlighter-rouge">ParDo</code> is the most complex primitive, because 
it is where any
+per-element processing is described. In addition to very simple operations like
+standard <code class="highlighter-rouge">map</code> or <code 
class="highlighter-rouge">flatMap</code> from functional programming, <code 
class="highlighter-rouge">ParDo</code> also supports
+multiple outputs, side inputs, initialization, flushing, teardown, and stateful
+processing.</p>
+
+<p>The UDF that is applied to each element is called a <code 
class="highlighter-rouge">DoFn</code>. The exact APIs for
+a <code class="highlighter-rouge">DoFn</code> can vary per language/SDK but 
generally follow the same pattern, so we
+can discuss it with pseudocode. I will also often refer to the Java support
+code, since I know it and most of our current and future runners are
+Java-based.</p>
+
+<h4 id="bundles">Bundles</h4>
+
+<p>For correctness, a <code class="highlighter-rouge">DoFn</code> 
<em>should</em> represent an element-wise function, but in
+fact is a long-lived object that processes elements in small groups called
+bundles.</p>
+
+<p>Your runner decides how many elements, and which elements, to include in a
+bundle, and can even decide dynamically in the middle of processing that the
+current bundle has “ended”. How a bundle is processed ties in with the 
rest of
+a DoFn’s lifecycle.</p>
+
+<p>It will generally improve throughput to make the largest bundles possible, 
so
+that initialization and finalization costs are amortized over many elements.
+But if your data is arriving as a stream, then you will want to terminate a
+bundle in order to achieve appropriate latency, so bundles may be just a few
+elements.</p>
+
+<h4 id="the-dofn-lifecycle">The DoFn Lifecycle</h4>
+
+<p>While each language’s SDK is free to make different decisions, the Python 
and
+Java SDKs share an API with the following stages of a DoFn’s lifecycle.</p>
+
+<p>However, if you choose to execute a DoFn directly to improve performance or
+single-language simplicity, then your runner is responsible for implementing
+the following sequence:</p>
+
+<ul>
+  <li><em>Setup</em> - called once per DoFn instance before anything else; 
this has not been
+implemented in the Python SDK so the user can work around just with lazy
+initialization</li>
+  <li><em>StartBundle</em> - called once per bundle as initialization 
(actually, lazy
+initialization is almost always equivalent and more efficient, but this hook
+remains for simplicity for users)</li>
+  <li><em>ProcessElement</em> / <em>OnTimer</em> - called for each element and 
timer activation</li>
+  <li><em>FinishBundle</em> - essentially “flush”; required to be called 
before
+considering elements actually processed</li>
+  <li><em>Teardown</em> - release resources that were used across bundles; 
calling this
+can be best effort due to failures</li>
+</ul>
+
+<h4 id="dofnrunners">DoFnRunner(s)</h4>
+
+<p>This is a support class that has manifestations in both the Java codebase 
and
+the Python codebase.</p>
+
+<p><strong>Java</strong></p>
+
+<p>In Java, the <code class="highlighter-rouge">beam-runners-core-java</code> 
library provides an interface
+<code class="highlighter-rouge">DoFnRunner</code> for bundle processing, with 
implementations for many situations.</p>
+
+<div class="language-java no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">interface</span> <span 
class="nc">DoFnRunner</span><span class="o">&lt;</span><span 
class="n">InputT</span><span class="o">,</span> <span 
class="n">OutputT</span><span class="o">&gt;</span> <span class="o">{</span>
+  <span class="kt">void</span> <span class="nf">startBundle</span><span 
class="o">();</span>
+  <span class="kt">void</span> <span class="nf">processElement</span><span 
class="o">(</span><span class="n">WindowedValue</span><span 
class="o">&lt;</span><span class="n">InputT</span><span class="o">&gt;</span> 
<span class="n">elem</span><span class="o">);</span>
+  <span class="kt">void</span> <span class="nf">onTimer</span><span 
class="o">(</span><span class="n">String</span> <span 
class="n">timerId</span><span class="o">,</span> <span 
class="n">BoundedWindow</span> <span class="n">window</span><span 
class="o">,</span> <span class="n">Instant</span> <span 
class="n">timestamp</span><span class="o">,</span> <span 
class="n">TimeDomain</span> <span class="n">timeDomain</span><span 
class="o">);</span>
+  <span class="kt">void</span> <span class="nf">finishBundle</span><span 
class="o">();</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<p>There are some implementations and variations of this for different 
scenarios:</p>
+
+<ul>
+  <li><a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java";><code
 class="highlighter-rouge">SimpleDoFnRunner</code></a> - 
+not actually simple at all; implements lots of the core functionality of
+<code class="highlighter-rouge">ParDo</code>. This is how most runners execute 
most <code class="highlighter-rouge">DoFns</code>.</li>
+  <li><a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java";><code
 class="highlighter-rouge">LateDataDroppingDoFnRunner</code></a> - 
+wraps a <code class="highlighter-rouge">DoFnRunner</code> and drops data from 
expired windows so the wrapped
+<code class="highlighter-rouge">DoFnRunner</code> doesn’t get any unpleasant 
surprises</li>
+  <li><a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java";><code
 class="highlighter-rouge">StatefulDoFnRunner</code></a> - 
+handles collecting expired state</li>
+  <li><a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java";><code
 class="highlighter-rouge">PushBackSideInputDoFnRunner</code></a> - 
+buffers input while waiting for side inputs to be ready</li>
+</ul>
+
+<p>These are all used heavily in implementations of Java runners. Invocations
+via the <a href="#the-fn-api">Fn API</a> may manifest as another 
implementation of
+<code class="highlighter-rouge">DoFnRunner</code> even though it will be doing 
far more than running a <code class="highlighter-rouge">DoFn</code>.</p>
+
+<p><strong>Python</strong></p>
+
+<p>See the <a 
href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.runners.html#apache_beam.runners.common.DoFnRunner";>DoFnRunner
 pydoc</a>.</p>
+
+<h4 id="side-inputs">Side Inputs</h4>
+
+<p><em>Main design document:
+<a 
href="https://s.apache.org/beam-side-inputs-1-pager";>https://s.apache.org/beam-side-inputs-1-pager</a></em></p>
+
+<p>A side input is a global view of a window of a <code 
class="highlighter-rouge">PCollection</code>. This distinguishes
+it from the main input, which is processed one element at a time. The SDK/user
+prepares a <code class="highlighter-rouge">PCollection</code> adequately, the 
runner materializes it, and then the
+runner feeds it to the <code class="highlighter-rouge">DoFn</code>. See the</p>
+
+<p>What you will need to implement is to inspect the materialization requested 
for
+the side input, and prepare it appropriately, and corresponding interactions
+when a <code class="highlighter-rouge">DoFn</code> reads the side inputs.</p>
+
+<p>The details and available support code vary by language.</p>
+
+<p><strong>Java</strong></p>
+
+<p>If you are using one of the above <code 
class="highlighter-rouge">DoFnRunner</code> classes, then the interface for
+letting them request side inputs is
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java";><code
 class="highlighter-rouge">SideInputReader</code></a>.
+It is a simple mapping from side input and window to a value. The <code 
class="highlighter-rouge">DoFnRunner</code>
+will perform a mapping with the
+<a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java";><code
 class="highlighter-rouge">WindowMappingFn</code></a>
+to request the appropriate window so you do not worry about invoking this UDF.
+When using the Fn API, it will be the SDK harness that maps windows as 
well.</p>
+
+<p>A simple, but not necessarily optimal approach to building a
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java";><code
 class="highlighter-rouge">SideInputReader</code></a>
+is to use a state backend. In our Java support code, this is called
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java";><code
 class="highlighter-rouge">StateInternals</code></a>
+and you can build a
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java";><code
 class="highlighter-rouge">SideInputHandler</code></a>
+that will use your <code class="highlighter-rouge">StateInternals</code> to 
materialize a <code class="highlighter-rouge">PCollection</code> into the
+appropriate side input view and then yield the value when requested for a
+particular side input and window.</p>
+
+<p>When a side input is needed but the side input has no data associated with 
it
+for a given window, elements in that window must be deferred until the side
+input has some data. The aforementioned
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java";><code
 class="highlighter-rouge">PushBackSideInputDoFnRunner</code></a>
+is used to implement this.</p>
+
+<p><strong>Python</strong></p>
+
+<p>In Python, <a 
href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.sideinputs.SideInputMap";><code
 class="highlighter-rouge">SideInputMap</code></a> maps
+windows to side input values. The <code 
class="highlighter-rouge">WindowMappingFn</code> manifests as a simple
+function. See
+<a 
href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sideinputs.py";>sideinputs.py</a>.</p>
+
+<h4 id="state-and-timers">State and Timers</h4>
+
+<p><em>Main design document: <a 
href="https://s.apache.org/beam-state";>https://s.apache.org/beam-state</a></em></p>
+
+<p>When <code class="highlighter-rouge">ParDo</code> includes state and 
timers, its execution on your runner is usually
+very different. See the full details beyond those covered here.</p>
+
+<p>State and timers are partitioned per key and window. You may need or want to
+explicitly shuffle data to support this.</p>
+
+<p><strong>Java</strong></p>
+
+<p>We provide
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java";><code
 class="highlighter-rouge">StatefulDoFnRunner</code></a>
+to help with state cleanup. The non-user-facing interface
+<a 
href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java";><code
 class="highlighter-rouge">StateInternals</code></a>
+is what a runner generally implements, and then the Beam support code can use
+this to implement user-facing state.</p>
+
+<h4 id="splittable-dofn">Splittable DoFn</h4>
+
+<p><em>Main design document: <a 
href="https://s.apache.org/splittable-do-fn";>https://s.apache.org/splittable-do-fn</a></em></p>
+
+<p>Splittable <code class="highlighter-rouge">DoFn</code> is a generalization 
and combination of <code class="highlighter-rouge">ParDo</code> and <code 
class="highlighter-rouge">Read</code>. It
+is per-element processing where each element the capabilities of being 
“split”
+in the same ways as a <code class="highlighter-rouge">BoundedSource</code> or 
<code class="highlighter-rouge">UnboundedSource</code>. This enables better
+performance for use cases such as a <code 
class="highlighter-rouge">PCollection</code> of names of large files where
+you want to read each of them. Previously they would have to be static data in
+the pipeline or be read in a non-splittable manner.</p>
+
+<p>This feature is still under development, but likely to become the new 
primitive
+for reading. It is best to be aware of it and follow developments.</p>
+
+<h3 id="implementing-the-groupbykey-and-window-primitive">Implementing the 
GroupByKey (and window) primitive</h3>
+
+<p>The <code class="highlighter-rouge">GroupByKey</code> operation (sometimes 
called GBK for short) groups a
+<code class="highlighter-rouge">PCollection</code> of key-value pairs by key 
and window, emitting results according
+to the <code class="highlighter-rouge">PCollection</code>’s triggering 
configuration.</p>
+
+<p>It is quite a bit more elaborate than simply colocating elements with the 
same
+key, and uses many fields from the <code 
class="highlighter-rouge">PCollection</code>’s windowing strategy.</p>
+
+<h4 id="group-by-encoded-bytes">Group By Encoded Bytes</h4>
+
+<p>For both the key and window, your runner sees them as “just bytes”. So 
you need
+to group in a way that is consistent with grouping by those bytes, even if you
+have some special knowledge of the types involved.</p>
+
+<p>The elements you are processing will be key-value pairs, and you’ll need 
to extract
+the keys. For this reason, the format of key-value pairs is standardized and
+shared across all SDKS. See either
+<a 
href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/coders/KvCoder.html";><code
 class="highlighter-rouge">KvCoder</code></a>
+in Java or
+<a 
href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.coders.html#apache_beam.coders.coders.TupleCoder.key_coder";><code
 class="highlighter-rouge">TupleCoder</code></a>
+in Python for documentation on the binary format.</p>
+
+<h4 id="window-merging">Window Merging</h4>
+
+<p>As well as grouping by key, your runner must group elements by their 
window. A
+<code class="highlighter-rouge">WindowFn</code> has the option of declaring 
that it merges windows on a per-key
+basis.  For example, session windows for the same key will be merged if they
+overlap. So your runner must invoke the merge method of the <code 
class="highlighter-rouge">WindowFn</code> during
+grouping.</p>
+
+<h4 id="implementing-via-groupbykeyonly--groupalsobywindow">Implementing via 
GroupByKeyOnly + GroupAlsoByWindow</h4>
+
+<p>The Java codebase includes support code for a particularly common way of
+implement the full <code class="highlighter-rouge">GroupByKey</code> 
operation: first group the keys, and then group
+by window. For merging windows, this is essentially required, since merging is
+per key.</p>
+
+<h4 id="dropping-late-data">Dropping late data</h4>
+
+<p><em>Main design document:
+<a 
href="https://s.apache.org/beam-lateness";>https://s.apache.org/beam-lateness</a></em></p>
+
+<p>A window is expired in a <code class="highlighter-rouge">PCollection</code> 
 if the watermark of the input PCollection
+has exceeded the end of the window by at least the input <code 
class="highlighter-rouge">PCollection</code>’s
+allowed lateness.</p>
+
+<p>Data for an expired window can be dropped any time and should be dropped at 
a
+<code class="highlighter-rouge">GroupByKey</code>. If you are using <code 
class="highlighter-rouge">GroupAlsoByWindow</code>, then just before executing
+this transform. You may shuffle less data if you drop data prior to
+<code class="highlighter-rouge">GroupByKeyOnly</code>, but should only safely 
be done for non-merging windows, as a
+window that appears expired may merge to become not expired.</p>
+
+<h4 id="triggering">Triggering</h4>
+
+<p><em>Main design document:
+<a 
href="https://s.apache.org/beam-triggers";>https://s.apache.org/beam-triggers</a></em></p>
+
+<p>The input <code class="highlighter-rouge">PCollection</code>’s trigger 
and accumulation mode specify when and how
+outputs should be emitted from the <code 
class="highlighter-rouge">GroupByKey</code> operation.</p>
+
+<p>In Java, there is a lot of support code for executing triggers in the
+<code class="highlighter-rouge">GroupAlsoByWindow</code> implementations, 
<code class="highlighter-rouge">ReduceFnRunner</code> (legacy name), and
+<code class="highlighter-rouge">TriggerStateMachine</code>, which is an 
obvious way of implementing all triggers as
+an event-driven machine over elements and timers.</p>
+
+<h4 id="timestampcombiner">TimestampCombiner</h4>
+
+<p>When an aggregated output is produced from multiple inputs, the <code 
class="highlighter-rouge">GroupByKey</code>
+operation has to choose a timestamp for the combination. To do so, first the
+WindowFn has a chance to shift timestamps - this is needed to ensure watermarks
+do not prevent progress of windows like sliding windows (the details are beyond
+this doc). Then, the shifted timestamps need to be combined - this is specified
+by a <code class="highlighter-rouge">TimestampCombiner</code>, which can 
either select the minimum or maximum of its
+inputs, or just ignore inputs and choose the end of the window.</p>
+
+<h3 id="implementing-the-window-primitive">Implementing the Window 
primitive</h3>
+
+<p>The window primitive applies a <code 
class="highlighter-rouge">WindowFn</code> UDF to place each input element into
+one or more windows of its output PCollection. Note that the primitive also
+generally configures other aspects of the windowing strategy for a <code 
class="highlighter-rouge">PCollection</code>,
+but the fully constructed graph that your runner receive will already have a
+complete windowing strategy for each <code 
class="highlighter-rouge">PCollection</code>.</p>
+
+<p>To implement this primitive, you need to invoke the provided WindowFn on 
each
+element, which will return some set of windows for that element to be a part of
+in the output <code class="highlighter-rouge">PCollection</code>.</p>
+
+<p><strong>Implementation considerations</strong></p>
+
+<p>A “window” is just a second grouping key that has a “maximum 
timestamp”. It can
+be any arbitrary user-defined type. The <code 
class="highlighter-rouge">WindowFn</code> provides the coder for the
+window type.</p>
+
+<p>Beam’s support code provides <code 
class="highlighter-rouge">WindowedValue</code> which is a compressed
+representation of an element in multiple windows. You may want to do use this,
+or your own compressed representation. Remember that it simply represents
+multiple elements at the same time; there is no such thing as an element “in
+multiple windows”.</p>
+
+<p>For values in the global window, you may want to use an even further 
compressed
+representation that doesn’t bother including the window at all.</p>
+
+<p>In the future, this primitive may be retired as it can be implemented as a
+ParDo if the capabilities of ParDo are enhanced to allow output to new 
windows.</p>
+
+<h3 id="implementing-the-read-primitive">Implementing the Read primitive</h3>
+
+<p>You implement this primitive to read data from an external system. The APIs 
are
+carefully crafted to enable efficient parallel execution. Reading from an
+<code class="highlighter-rouge">UnboundedSource</code> is a bit different than 
reading from a <code class="highlighter-rouge">BoundedSource</code>.</p>
+
+<h4 id="reading-from-an-unboundedsource">Reading from an UnboundedSource</h4>
+
+<p>An <code class="highlighter-rouge">UnboundedSource</code> is a source of 
potentially infinite data; you can think of
+it like a stream. The capabilities are:</p>
+
+<ul>
+  <li><code class="highlighter-rouge">split(int)</code> - your runner should 
call this to get the desired parallelism</li>
+  <li><code class="highlighter-rouge">createReader(...)</code> - call this to 
start reading elements; it is an enhanced iterator that also vends:</li>
+  <li>watermark (for this source) which you should propagate downstream
+timestamps, which you should associate with elements read</li>
+  <li>record identifiers, so you can dedup downstream if needed</li>
+  <li>progress indication of its backlog</li>
+  <li>checkpointing</li>
+  <li><code class="highlighter-rouge">requiresDeduping</code> - this indicates 
that there is some chance that the source
+may emit dupes; your runner should do its best to dedupe based on the
+identifier attached to emitted records</li>
+</ul>
+
+<p>An unbounded source has a custom type of checkpoints and an associated 
coder for serializing them.</p>
+
+<h4 id="reading-from-a-boundedsource">Reading from a BoundedSource</h4>
+
+<p>A <code class="highlighter-rouge">BoundedSource</code> is a source of data 
that you know is finite, such as a static
+collection of log files, or a database table. The capabilities are:</p>
+
+<ul>
+  <li><code class="highlighter-rouge">split(int)</code> - your runner should 
call this to get desired initial parallelism (but you can often steal work 
later)</li>
+  <li><code class="highlighter-rouge">getEstimatedSizeBytes(...)</code> - self 
explanatory</li>
+  <li><code class="highlighter-rouge">createReader(...)</code> - call this to 
start reading elements; it is an enhanced iterator, with also:</li>
+  <li>timestamps to associate with each element read</li>
+  <li><code class="highlighter-rouge">splitAtFraction</code> for dynamic 
splitting to enable work stealing, and other
+methods to support it - see the <a 
href="https://beam.apache.org/blog/2016/05/18/splitAtFraction-method.html";>Beam 
blog post on dynamic work
+rebalancing</a></li>
+</ul>
+
+<p>The <code class="highlighter-rouge">BoundedSource</code> does not report a 
watermark currently. Most of the time, reading
+from a bounded source can be parallelized in ways that result in utterly 
out-of-order
+data, so a watermark is not terribly useful.
+Thus the watermark for the output <code 
class="highlighter-rouge">PCollection</code> from a bounded read should
+remain at the minimum timestamp throughout reading (otherwise data might get
+dropped) and advance to the maximum timestamp when all data is exhausted.</p>
+
+<h3 id="implementing-the-flatten-primitive">Implementing the Flatten 
primitive</h3>
+
+<p>This one is easy - take as input a finite set of <code 
class="highlighter-rouge">PCollections</code> and outputs their
+bag union, keeping windows intact.</p>
+
+<p>For this operation to make sense, it is the SDK’s responsibility to make 
sure
+the windowing strategies are compatible.</p>
+
+<p>Also note that there is no requirement that the coders for all the <code 
class="highlighter-rouge">PCollections</code>
+be the same. If your runner wants to require that (to avoid tedious
+re-encoding) you have to enforce it yourself. Or you could just implement the
+fast path as an optimization.</p>
+
+<h3 id="special-mention-the-combine-composite">Special mention: the Combine 
composite</h3>
+
+<p>A composite transform that is almost always treated specially by a runner is
+<code class="highlighter-rouge">Combine</code> (per key), which applies an 
associative and commutative operator to
+the elements of a <code class="highlighter-rouge">PCollection</code>. This 
composite is not a primitive. It is
+implemented in terms of <code class="highlighter-rouge">ParDo</code> and <code 
class="highlighter-rouge">GroupByKey</code>, so your runner will work
+without treating it - but it does carry additional information that you
+probably want to use for optimizations: the associative-commutative operator,
+known as a <code class="highlighter-rouge">CombineFn</code>.</p>
+
+<h2 id="working-with-pipelines">Working with pipelines</h2>
+
+<p>When you receive a pipeline from a user, you will need to translate it. 
This is
+a tour of the APIs that you’ll use to do it.</p>
+
+<h3 id="traversing-a-pipeline">Traversing a pipeline</h3>
+
+<p>Something you will likely do is to traverse a pipeline, probably to 
translate
+it into primitives for your engine. The general pattern is to write a visitor
+that builds a job specification as it walks the graph of <code 
class="highlighter-rouge">PTransforms</code>.</p>
+
+<p>The entry point for this in Java is
+<a 
href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#traverseTopologically-org.apache.beam.sdk.Pipeline.PipelineVisitor-";><code
 class="highlighter-rouge">Pipeline.traverseTopologically</code></a>
+and
+<a 
href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.html#apache_beam.pipeline.Pipeline.visit";><code
 class="highlighter-rouge">Pipeline.visit</code></a>
+in Python. See the generated documentation for details.</p>
+
+<h3 id="altering-a-pipeline">Altering a pipeline</h3>
+
+<p>Often, the best way to keep your
+translator simple will be to alter the pipeline prior to translation. Some
+alterations you might perform:</p>
+
+<ul>
+  <li>Elaboration of a Beam primitive into a composite transform that uses
+multiple runner-specific primitives</li>
+  <li>Optimization of a Beam composite into a specialized primitive for your
+runner</li>
+  <li>Replacement of a Beam composite with a different expansion more suitable 
for
+your runner</li>
+</ul>
+
+<p>The Java SDK and the “runners core construction” library (the artifact 
is
+<code class="highlighter-rouge">beam-runners-core-construction-java</code> and 
the namespaces is
+<code 
class="highlighter-rouge">org.apache.beam.runners.core.construction</code>) 
contain helper code for this sort
+of work. In Python, support code is still under development.</p>
+
+<p>All pipeline alteration is done via
+<a 
href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#replaceAll-java.util.List-";><code
 class="highlighter-rouge">Pipeline.replaceAll(PTransformOverride)</code></a>
+method. A
+<a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java";><code
 class="highlighter-rouge">PTransformOverride</code></a>
+is a pair of a
+<a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java";><code
 class="highlighter-rouge">PTransformMatcher</code></a>
+to select transforms for replacement and a
+<a 
href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java";><code
 class="highlighter-rouge">PTransformOverrideFactory</code></a>
+to produce the replacement. All <code 
class="highlighter-rouge">PTransformMatchers</code> that have been needed by
+runners to date are provided. Examples include: matching a specific class,
+matching a <code class="highlighter-rouge">ParDo</code> where the <code 
class="highlighter-rouge">DoFn</code> uses state or timers, etc.</p>
+
+<h2 id="testing-your-runner">Testing your runner</h2>
+
+<p>The Beam Java SDK and Python SDK have suites of runner validation tests. The
+configuration may evolve faster than this document, so check the configuration
+of other Beam runners. But be aware that we have tests and you can use them
+very easily!  To enable these tests in a Java-based runner using Maven, you
+scan the dependencies of the SDK for tests with the JUnit category
+<code class="highlighter-rouge">ValidatesRunner</code>.</p>
+
+<div class="language-xml no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="nt">&lt;plugin&gt;</span>
+  <span class="nt">&lt;groupId&gt;</span>org.apache.maven.plugins<span 
class="nt">&lt;/groupId&gt;</span>
+  <span class="nt">&lt;artifactId&gt;</span>maven-surefire-plugin<span 
class="nt">&lt;/artifactId&gt;</span>
+  <span class="nt">&lt;executions&gt;</span>
+    <span class="nt">&lt;execution&gt;</span>
+      <span class="nt">&lt;id&gt;</span>validates-runner-tests<span 
class="nt">&lt;/id&gt;</span>
+      <span class="nt">&lt;phase&gt;</span>integration-test<span 
class="nt">&lt;/phase&gt;</span>
+      <span class="nt">&lt;goals&gt;&lt;goal&gt;</span>test<span 
class="nt">&lt;/goal&gt;&lt;/goals&gt;</span>
+      <span class="nt">&lt;configuration&gt;</span>
+        <span 
class="nt">&lt;groups&gt;</span>org.apache.beam.sdk.testing.ValidatesRunner<span
 class="nt">&lt;/groups&gt;</span>
+        <span class="nt">&lt;dependenciesToScan&gt;</span>
+          <span 
class="nt">&lt;dependency&gt;</span>org.apache.beam:beam-sdks-java-core<span 
class="nt">&lt;/dependency&gt;</span>
+        <span class="nt">&lt;/dependenciesToScan&gt;</span>
+        <span class="nt">&lt;systemPropertyVariables&gt;</span>
+          <span class="nt">&lt;beamTestPipelineOptions&gt;</span>
+            [
+              "--runner=MyRunner",
+              … misc test options … 
+            ]
+          <span class="nt">&lt;/beamTestPipelineOptions&gt;</span>
+        <span class="nt">&lt;/systemPropertyVariables&gt;</span>
+      <span class="nt">&lt;/configuration&gt;</span>
+    <span class="nt">&lt;/execution&gt;</span>
+  <span class="nt">&lt;/executions&gt;</span>
+<span class="nt">&lt;/plugin&gt;</span>
+</code></pre>
+</div>
+
+<p>Enable these tests in other languages is unexplored.</p>
+
+<h2 id="integrating-your-runner-nicely-with-sdks">Integrating your runner 
nicely with SDKs</h2>
+
+<p>Whether or not your runner is based in the same language as an SDK (such as
+Java), you will want to provide a shim to invoke it from another SDK if you
+want the users of that SDK (such as Python) to use it.</p>
+
+<h3 id="integrating-with-the-java-sdk">Integrating with the Java SDK</h3>
+
+<h4 id="allowing-users-to-pass-options-to-your-runner">Allowing users to pass 
options to your runner</h4>
+
+<p>The mechanism for configuration is
+<a 
href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/options/PipelineOptions.html";><code
 class="highlighter-rouge">PipelineOptions</code></a>,
+an interface that works completely differently than normal Java objects. Forget
+what you know, and follow the rules, and <code 
class="highlighter-rouge">PipelineOptions</code> will treat you well.</p>
+
+<p>You must implement a sub-interface for your runner with getters and setters
+with matching names, like so:</p>
+
+<div class="language-java no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">public</span> <span 
class="kd">interface</span> <span class="nc">MyRunnerOptions</span> <span 
class="kd">extends</span> <span class="n">PipelineOptions</span> <span 
class="o">{</span>
+  <span class="nd">@Description</span><span class="o">(</span><span 
class="s">"The Foo to use with MyRunner"</span><span class="o">)</span>
+  <span class="nd">@Required</span>
+  <span class="kd">public</span> <span class="n">Foo</span> <span 
class="nf">getMyRequiredFoo</span><span class="o">();</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">setMyRequiredFoo</span><span class="o">(</span><span 
class="n">Foo</span> <span class="n">newValue</span><span class="o">);</span>
+ 
+  <span class="nd">@Description</span><span class="o">(</span><span 
class="s">"Enable Baz; on by default"</span><span class="o">)</span>
+  <span class="nd">@Default</span><span class="o">.</span><span 
class="na">Boolean</span><span class="o">(</span><span 
class="kc">true</span><span class="o">)</span>
+  <span class="kd">public</span> <span class="n">Boolean</span> <span 
class="nf">isBazEnabled</span><span class="o">();</span>
+  <span class="kd">public</span> <span class="kt">void</span> <span 
class="nf">setBazEnabled</span><span class="o">(</span><span 
class="n">Boolean</span> <span class="n">newValue</span><span 
class="o">);</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<p>You can set up defaults, etc. See the javadoc for details.  When your 
runner is
+instantiated with a <code class="highlighter-rouge">PipelineOptions</code> 
object, you access your interface by
+<code class="highlighter-rouge">options.as(MyRunnerOptions.class)</code>.</p>
+
+<p>To make these options available on the command line, you register your 
options
+with a <code class="highlighter-rouge">PipelineOptionsRegistrar</code>. It is 
easy if you use <code class="highlighter-rouge">@AutoService</code>:</p>
+
+<div class="language-java no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="nd">@AutoService</span><span 
class="o">(</span><span class="n">PipelineOptionsRegistrar</span><span 
class="o">.</span><span class="na">class</span><span class="o">)</span>
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">MyOptionsRegistrar</span> <span 
class="kd">implements</span> <span class="n">PipelineOptionsRegistrar</span> 
<span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Iterable</span><span 
class="o">&lt;</span><span class="n">Class</span><span class="o">&lt;?</span> 
<span class="kd">extends</span> <span class="n">PipelineOptions</span><span 
class="o">&gt;&gt;</span> <span class="nf">getPipelineOptions</span><span 
class="o">()</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">ImmutableList</span><span 
class="o">.&lt;</span><span class="n">Class</span><span class="o">&lt;?</span> 
<span class="kd">extends</span> <span class="n">PipelineOptions</span><span 
class="o">&gt;&gt;</span><span class="n">of</span><span class="o">(</span><span 
class="n">MyRunnerOptions</span><span class="o">.</span><span 
class="na">class</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h4 id="registering-your-runner-with-sdks-for-command-line-use">Registering 
your runner with SDKs for command line use</h4>
+
+<p>To make your runner available on the command line, you register your options
+with a <code class="highlighter-rouge">PipelineRunnerRegistrar</code>. It is 
easy if you use <code class="highlighter-rouge">@AutoService</code>:</p>
+
+<div class="language-java no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="nd">@AutoService</span><span 
class="o">(</span><span class="n">PipelineRunnerRegistrar</span><span 
class="o">.</span><span class="na">class</span><span class="o">)</span>
+<span class="kd">public</span> <span class="kd">static</span> <span 
class="kd">class</span> <span class="nc">MyRunnerRegistrar</span> <span 
class="kd">implements</span> <span class="n">PipelineRunnerRegistrar</span> 
<span class="o">{</span>
+  <span class="nd">@Override</span>
+  <span class="kd">public</span> <span class="n">Iterable</span><span 
class="o">&lt;</span><span class="n">Class</span><span class="o">&lt;?</span> 
<span class="kd">extends</span> <span class="n">PipelineRunner</span><span 
class="o">&gt;&gt;</span> <span class="nf">getPipelineRunners</span><span 
class="o">()</span> <span class="o">{</span>
+    <span class="k">return</span> <span class="n">ImmutableList</span><span 
class="o">.&lt;</span><span class="n">Class</span><span class="o">&lt;?</span> 
<span class="kd">extends</span> <span class="n">PipelineRunner</span><span 
class="o">&gt;&gt;</span><span class="n">of</span><span class="o">(</span><span 
class="n">MyRunner</span><span class="o">.</span><span 
class="na">class</span><span class="o">);</span>
+  <span class="o">}</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+
+<h3 id="integrating-with-the-python-sdk">Integrating with the Python SDK</h3>
+
+<p>In the Python SDK the registration of the code is not automatic. So there 
are
+few things to keep in mind when creating a new runner.</p>
+
+<p>Any dependencies on packages for the new runner should be options so create 
a
+new target in <code class="highlighter-rouge">extra_requires</code> in <code 
class="highlighter-rouge">setup.py</code> that is needed for the new runner.</p>
+
+<p>All runner code should go in it’s own package in <code 
class="highlighter-rouge">apache_beam/runners</code> directory.</p>
+
+<p>Register the new runner in the <code 
class="highlighter-rouge">create_runner</code> function of <code 
class="highlighter-rouge">runner.py</code> so that the
+partial name is matched with the correct class to be used.</p>
+
+<h2 id="writing-an-sdk-independent-runner">Writing an SDK-independent 
runner</h2>
+
+<p>There are two aspects to making your runner SDK-independent, able to run
+pipelines written in other languages: The Fn API and the Runner API.</p>
+
+<h3 id="the-fn-api">The Fn API</h3>
+
+<p><em>Design documents:</em></p>
+
+<ul>
+  <li><em><a 
href="https://s.apache.org/beam-fn-api";>https://s.apache.org/beam-fn-api</a></em></li>
+  <li><em><a 
href="https://s.apache.org/beam-fn-api-processing-a-bundle";>https://s.apache.org/beam-fn-api-processing-a-bundle</a></em></li>
+  <li><em><a 
href="https://s.apache.org/beam-fn-api-send-and-receive-data";>https://s.apache.org/beam-fn-api-send-and-receive-data</a></em></li>
+</ul>
+
+<p>To run a user’s pipeline, you need to be able to invoke their UDFs.  The 
Fn API
+is an RPC interface for the standard UDFs of Beam, implemented using protocol
+buffers over gRPC.</p>
+
+<p>The Fn API includes:</p>
+
+<ul>
+  <li>APIs for registering a subgraph of UDFs</li>
+  <li>APIs for streaming elements of a bundle</li>
+  <li>Shared data formats (key-value pairs, timestamps, iterables, etc)</li>
+</ul>
+
+<p>You are fully welcome to <em>also</em> use the SDK for your language for 
utility code,
+or provide optimized implementations of bundle processing for same-language
+UDFs.</p>
+
+<h3 id="the-runner-api">The Runner API</h3>
+
+<p>The Runner API is an SDK-independent schema for a pipeline along with RPC
+interfaces for launching a pipeline and checking the status of a job. The RPC
+interfaces are still in development so for now we focus on the SDK-agnostic
+representation of a pipeline. By examining a pipeline only through Runner API
+interfaces, you remove your runner’s dependence on the SDK for its language 
for
+pipeline analysis and job translation.</p>
+
+<p>To execute such an SDK-independent pipeline, you will need to support the Fn
+API. UDFs are embedded in the pipeline as a specification of the function
+(often just opaque serialized bytes for a particular language) plus a
+specification of an environment that can execute it (essentially a particular
+SDK). So far, this specification is expected to be a URI for a Docker container
+hosting the SDK’s Fn API harness.</p>
+
+<p>You are fully welcome to <em>also</em> use the SDK for your language, which 
may offer
+useful utility code.</p>
+
+<p>The language-independent definition of a pipeline is described via a 
protocol
+buffers schema, covered below for reference. But your runner <em>should 
not</em>
+directly manipulate protobuf messages.  Instead, the Beam codebase provides
+utilities for working with pipelines so that you don’t need to be aware of
+whether or not the pipeline has ever been serialized or transmitted, or what
+language it may have been written in to begin with.</p>
+
+<p><strong>Java</strong></p>
+
+<p>If your runner is Java-based, the tools to interact with pipelines in an
+SDK-agnostic manner are in the <code 
class="highlighter-rouge">beam-runners-core-construction-java</code>
+artifact, in the <code 
class="highlighter-rouge">org.apache.beam.runners.core.construction</code> 
namespace.
+The utilities are named consistently, like so:</p>
+
+<ul>
+  <li><code class="highlighter-rouge">PTransformTranslation</code> - registry 
of known transforms and standard URNs</li>
+  <li><code class="highlighter-rouge">ParDoTranslation</code> - utilities for 
working with <code class="highlighter-rouge">ParDo</code> in a
+language-independent manner</li>
+  <li><code class="highlighter-rouge">WindowIntoTranslation</code> - same for 
<code class="highlighter-rouge">Window</code></li>
+  <li><code class="highlighter-rouge">FlattenTranslation</code> - same for 
<code class="highlighter-rouge">Flatten</code></li>
+  <li><code class="highlighter-rouge">WindowingStrategyTranslation</code> - 
same for windowing strategies</li>
+  <li><code class="highlighter-rouge">CoderTranslation</code> - same for 
coders</li>
+  <li>… etc, etc …</li>
+</ul>
+
+<p>By inspecting transforms only through these classes, your runner will not
+depend on the particulars of the Java SDK.</p>
+
+<h2 id="the-runner-api-protos">The Runner API protos</h2>
+
+<p><a 
href="https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto";>The
 Runner
+API</a>
+refers to a specific manifestation of the concepts in the Beam model, as a
+protocol buffers schema.  Even though you should not manipulate these messages
+directly, it can be helpful to know the canonical data that makes up a
+pipeline.</p>
+
+<p>Most of the API is exactly the same as the high-level description; you can 
get
+started implementing a runner without understanding all the low-level 
details.</p>
+
+<p>The most important takeaway of the Runner API for you is that it is a
+language-independent definition of a Beam pipeline. You will probably always
+interact via a particular SDK’s support code wrapping these definitions with
+sensible idiomatic APIs, but always be aware that this is the specification and
+any other data is not necessarily inherent to the pipeline, but may be
+SDK-specific enrichments (or bugs!).</p>
+
+<p>The UDFs in the pipeline may be written for any Beam SDK, or even multiple 
in
+the same pipeline. So this is where we will start, taking a bottom-up approach
+to understanding the protocol buffers definitions for UDFs before going back to
+the higher-level, mostly obvious, record definitions.</p>
+
+<h3 id="functionspec-proto"><code 
class="highlighter-rouge">FunctionSpec</code> proto</h3>
+
+<p>The heart of cross-language portability is the <code 
class="highlighter-rouge">FunctionSpec</code>. This is a
+language-independent specification of a function, in the usual programming
+sense that includes side effects, etc.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">FunctionSpec</span> <span class="p">{</span>
+  <span class="kt">string</span> <span class="n">urn</span><span 
class="p">;</span>
+  <span class="n">google.protobuf.Any</span> <span 
class="n">parameter</span><span class="p">;</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<p>A <code class="highlighter-rouge">FunctionSpec</code> includes a URN 
identifying the function as well as an arbitrary
+fixed parameter. For example the (hypothetical) “max” CombineFn might have 
the
+URN <code class="highlighter-rouge">urn:beam:combinefn:max:0.1</code> and a 
parameter that indicates by what
+comparison to take the max.</p>
+
+<p>For most UDFs in a pipeline constructed using a particular language’s 
SDK, the
+URN will indicate that the SDK must interpret it, for example
+<code class="highlighter-rouge">urn:beam:dofn:javasdk:0.1</code> or <code 
class="highlighter-rouge">urn:beam:dofn:pythonsdk:0.1</code>. The parameter
+will contain serialized code, such as a Java-serialized <code 
class="highlighter-rouge">DoFn</code> or a Python
+pickled <code class="highlighter-rouge">DoFn</code>.</p>
+
+<p>A <code class="highlighter-rouge">FunctionSpec</code> is not only for UDFs. 
It is just a generic way to name/specify
+any function. It is also used as the specification for a <code 
class="highlighter-rouge">PTransform</code>. But when
+used in a <code class="highlighter-rouge">PTransform</code> it describes a 
function from <code class="highlighter-rouge">PCollection</code> to <code 
class="highlighter-rouge">PCollection</code>
+and cannot be specific to an SDK because the runner is in charge of evaluating
+transforms and producing <code 
class="highlighter-rouge">PCollections</code>.</p>
+
+<h3 id="sdkfunctionspec-proto"><code 
class="highlighter-rouge">SdkFunctionSpec</code> proto</h3>
+
+<p>When a <code class="highlighter-rouge">FunctionSpec</code> represents a 
UDF, in general only the SDK that serialized
+it will be guaranteed to understand it. So in that case, it will always come
+with an environment that can understand and execute the function. This is
+represented by the <code class="highlighter-rouge">SdkFunctionSpec</code>.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">SdkFunctionSpec</span> <span class="p">{</span>
+  <span class="n">FunctionSpec</span> <span class="n">spec</span><span 
class="p">;</span>
+  <span class="kt">bytes</span> <span class="n">environment_id</span><span 
class="p">;</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<p>In the Runner API, many objects are stored by reference. Here in the
+<code class="highlighter-rouge">environment_id</code> is a pointer, local to 
the pipeline and just made up by the
+SDK that serialized it, that can be dereferenced to yield the actual
+environment proto.</p>
+
+<p>Thus far, an environment is expected to be a Docker container specification 
for
+an SDK harness that can execute the specified UDF.</p>
+
+<h3 id="primitive-transform-payload-protos">Primitive transform payload 
protos</h3>
+
+<p>The payload for the primitive transforms are just proto serializations of 
their
+specifications. Rather than reproduce their full code here, I will just
+highlight the important pieces to show how they fit together.</p>
+
+<p>It is worth emphasizing again that while you probably will not interact
+directly with these payloads, they are the only data that is inherently part of
+the transform.</p>
+
+<h4 id="pardopayload-proto"><code 
class="highlighter-rouge">ParDoPayload</code> proto</h4>
+
+<p>A <code class="highlighter-rouge">ParDo</code> transform carries its <code 
class="highlighter-rouge">DoFn</code> in an <code 
class="highlighter-rouge">SdkFunctionSpec</code> and then
+provides language-independent specifications for its other features - side
+inputs, state declarations, timer declarations, etc.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">ParDoPayload</span> <span class="p">{</span>
+  <span class="n">SdkFunctionSpec</span> <span class="n">do_fn</span><span 
class="p">;</span>
+  <span class="n">map</span><span class="o">&lt;</span><span 
class="kt">string</span><span class="p">,</span> <span 
class="n">SideInput</span><span class="err">&gt;</span> <span 
class="n">side_inputs</span><span class="p">;</span>
+  <span class="n">map</span><span class="o">&lt;</span><span 
class="kt">string</span><span class="p">,</span> <span 
class="n">StateSpec</span><span class="err">&gt;</span> <span 
class="n">state_specs</span><span class="p">;</span>
+  <span class="n">map</span><span class="o">&lt;</span><span 
class="kt">string</span><span class="p">,</span> <span 
class="n">TimerSpec</span><span class="err">&gt;</span> <span 
class="n">timer_specs</span><span class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h4 id="readpayload-proto"><code class="highlighter-rouge">ReadPayload</code> 
proto</h4>
+
+<p>A <code class="highlighter-rouge">Read</code> transform carries an <code 
class="highlighter-rouge">SdkFunctionSpec</code> for its <code 
class="highlighter-rouge">Source</code> UDF.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">ReadPayload</span> <span class="p">{</span>
+  <span class="n">SdkFunctionSpec</span> <span class="n">source</span><span 
class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h4 id="windowintopayload-proto"><code 
class="highlighter-rouge">WindowIntoPayload</code> proto</h4>
+
+<p>A <code class="highlighter-rouge">Window</code> transform carries an <code 
class="highlighter-rouge">SdkFunctionSpec</code> for its <code 
class="highlighter-rouge">WindowFn</code> UDF. It is
+part of the Fn API that the runner passes this UDF along and tells the SDK
+harness to use it to assign windows (as opposed to merging).</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">WindowIntoPayload</span> <span class="p">{</span>
+  <span class="n">SdkFunctionSpec</span> <span class="n">window_fn</span><span 
class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h4 id="combinepayload-proto"><code 
class="highlighter-rouge">CombinePayload</code> proto</h4>
+
+<p><code class="highlighter-rouge">Combine</code> is not a primitive. But 
non-primitives are perfectly able to carry
+additional information for better optimization. The most important thing that a
+<code class="highlighter-rouge">Combine</code> transform carries is the <code 
class="highlighter-rouge">CombineFn</code> in an <code 
class="highlighter-rouge">SdkFunctionSpec</code> record.
+In order to effectively carry out the optimizations desired, it is also
+necessary to know the coder for intermediate accumulations, so it also carries
+a reference to this coder.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">CombinePayload</span> <span class="p">{</span>
+  <span class="n">SdkFunctionSpec</span> <span 
class="n">combine_fn</span><span class="p">;</span>
+  <span class="kt">string</span> <span 
class="n">accumulator_coder_id</span><span class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h3 id="ptransform-proto"><code class="highlighter-rouge">PTransform</code> 
proto</h3>
+
+<p>A <code class="highlighter-rouge">PTransform</code> is a function from 
<code class="highlighter-rouge">PCollection</code> to <code 
class="highlighter-rouge">PCollection</code>. This is
+represented in the proto using a FunctionSpec. Note that this is not an
+<code class="highlighter-rouge">SdkFunctionSpec</code>, since it is the runner 
that observes these. They will never
+be passed back to an SDK harness; they do not represent a UDF.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">PTransform</span> <span class="p">{</span>
+  <span class="n">FunctionSpec</span> <span class="n">spec</span><span 
class="p">;</span>
+  <span class="k">repeated</span> <span class="kt">string</span> <span 
class="n">subtransforms</span><span class="p">;</span>
+ 
+  <span class="c1">// Maps from local string names to PCollection ids
+</span>  <span class="n">map</span><span class="o">&lt;</span><span 
class="kt">string</span><span class="p">,</span> <span 
class="kt">bytes</span><span class="err">&gt;</span> <span 
class="n">inputs</span><span class="p">;</span>
+  <span class="n">map</span><span class="o">&lt;</span><span 
class="kt">string</span><span class="p">,</span> <span 
class="kt">bytes</span><span class="err">&gt;</span> <span 
class="n">outputs</span><span class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<p>A <code class="highlighter-rouge">PTransform</code> may have subtransforms 
if it is a composite, in which case the
+<code class="highlighter-rouge">FunctionSpec</code> may be omitted since the 
subtransforms define its behavior.</p>
+
+<p>The input and output <code class="highlighter-rouge">PCollections</code> 
are unordered and referred to by a local
+name. The SDK decides what this name is, since it will likely be embedded in
+serialized UDFs.</p>
+
+<h3 id="pcollection-proto"><code class="highlighter-rouge">PCollection</code> 
proto</h3>
+
+<p>A <code class="highlighter-rouge">PCollection</code> just stores a coder, 
windowing strategy, and whether or not it
+is bounded.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">PCollection</span> <span class="p">{</span>
+  <span class="kt">string</span> <span class="n">coder_id</span><span 
class="p">;</span>
+  <span class="n">IsBounded</span> <span class="n">is_bounded</span><span 
class="p">;</span>
+  <span class="kt">string</span> <span 
class="n">windowing_strategy_id</span><span class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h3 id="coder-proto"><code class="highlighter-rouge">Coder</code> proto</h3>
+
+<p>This is a very interesting proto. A coder is a parameterized function that 
may
+only be understood by a particular SDK, hence an <code 
class="highlighter-rouge">SdkFunctionSpec</code>, but also
+may have component coders that fully define it. For example, a <code 
class="highlighter-rouge">ListCoder</code> is
+only a meta-format, while <code 
class="highlighter-rouge">ListCoder(VarIntCoder)</code> is a fully specified 
format.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">Coder</span> <span class="p">{</span>
+  <span class="n">SdkFunctionSpec</span> <span class="n">spec</span><span 
class="p">;</span>
+  <span class="k">repeated</span> <span class="kt">string</span> <span 
class="n">component_coder_ids</span><span class="p">;</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h2 id="the-runner-api-rpcs">The Runner API RPCs</h2>
+
+<p>While your language’s SDK will probably insulate you from touching the 
Runner
+API protos directly, you may need to implement adapters for your runner, to
+expose it to another language. So this section covers proto that you will
+possibly interact with quite directly.</p>
+
+<p>The specific manner in which the existing runner method calls will be 
expressed
+as RPCs is not implemented as proto yet. This RPC layer is to enable, for
+example, building a pipeline using the Python SDK and launching it on a runner
+that is written in Java. It is expected that a small Python shim will
+communicate with a Java process or service hosting the Runner API.</p>
+
+<p>The RPCs themselves will necessarily follow the existing APIs of 
PipelineRunner
+and PipelineResult, but altered to be the minimal backend channel, versus a
+rich and convenient API.</p>
+
+<h3 id="pipelinerunnerrunpipeline-rpc"><code 
class="highlighter-rouge">PipelineRunner.run(Pipeline)</code> RPC</h3>
+
+<p>This will take the same form, but <code 
class="highlighter-rouge">PipelineOptions</code> will have to be serialized
+to JSON (or a proto <code class="highlighter-rouge">Struct</code>) and passed 
along.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">RunPipelineRequest</span> <span class="p">{</span>
+  <span class="n">Pipeline</span> <span class="n">pipeline</span><span 
class="p">;</span>
+  <span class="n">Struct</span> <span class="n">pipeline_options</span><span 
class="p">;</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">RunPipelineResponse</span> <span class="p">{</span>
+  <span class="kt">bytes</span> <span class="n">pipeline_id</span><span 
class="p">;</span>
+
+  <span class="c1">// TODO: protocol for rejecting pipelines that cannot be 
executed
+</span>  <span class="c1">// by this runner. May just be REJECTED job state 
with error message.
+</span> 
+  <span class="c1">// totally opaque to the SDK; for the shim to interpret
+</span>  <span class="n">Any</span> <span class="n">contents</span><span 
class="p">;</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+<h3 id="pipelineresult-aka-job-api"><code 
class="highlighter-rouge">PipelineResult</code> aka “Job API”</h3>
+
+<p>The two core pieces of functionality in this API today are getting the 
state of
+a job and canceling the job. It is very much likely to evolve, for example to
+be generalized to support draining a job (stop reading input and let watermarks
+go to infinity). Today, verifying our test framework benefits (but does not
+depend upon wholly) querying metrics over this channel.</p>
+
+<div class="language-proto no-toggle highlighter-rouge"><pre 
class="highlight"><code><span class="kd">message</span> <span 
class="nc">CancelPipelineRequest</span> <span class="p">{</span>
+  <span class="kt">bytes</span> <span class="n">pipeline_id</span><span 
class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+ 
+<span class="kd">message</span> <span class="nc">GetStateRequest</span> <span 
class="p">{</span>
+  <span class="kt">bytes</span> <span class="n">pipeline_id</span><span 
class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+ 
+<span class="kd">message</span> <span class="nc">GetStateResponse</span> <span 
class="p">{</span>
+  <span class="n">JobState</span> <span class="n">state</span><span 
class="p">;</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+ 
+<span class="kd">enum</span> <span class="n">JobState</span> <span 
class="p">{</span>
+  <span class="o">...</span>
+<span class="p">}</span>
+</code></pre>
+</div>
+
+
+    </div>
+    <footer class="footer">
+  <div class="footer__contained">
+    <div class="footer__cols">
+      <div class="footer__cols__col">
+        <div class="footer__cols__col__logo">
+          <img src="/images/beam_logo_circle.svg" class="footer__logo" 
alt="Beam logo">
+        </div>
+        <div class="footer__cols__col__logo">
+          <img src="/images/apache_logo_circle.svg" class="footer__logo" 
alt="Apache logo">
+        </div>
+      </div>
+      <div class="footer__cols__col footer__cols__col--md">
+        <div class="footer__cols__col__title">Start</div>
+        <div class="footer__cols__col__link"><a 
href="/get-started/beam-overview/">Overview</a></div>
+        <div class="footer__cols__col__link"><a 
href="/get-started/quickstart-java/">Quickstart (Java)</a></div>
+        <div class="footer__cols__col__link"><a 
href="/get-started/quickstart-py/">Quickstart (Python)</a></div>
+        <div class="footer__cols__col__link"><a 
href="/get-started/downloads/">Downloads</a></div>
+      </div>
+      <div class="footer__cols__col footer__cols__col--md">
+        <div class="footer__cols__col__title">Docs</div>
+        <div class="footer__cols__col__link"><a 
href="/documentation/programming-guide/">Concepts</a></div>
+        <div class="footer__cols__col__link"><a 
href="/documentation/pipelines/design-your-pipeline/">Pipelines</a></div>
+        <div class="footer__cols__col__link"><a 
href="/documentation/runners/capability-matrix/">Runners</a></div>
+      </div>
+      <div class="footer__cols__col footer__cols__col--md">
+        <div class="footer__cols__col__title">Community</div>
+        <div class="footer__cols__col__link"><a 
href="/contribute/">Contribute</a></div>
+        <div class="footer__cols__col__link"><a 
href="/contribute/team/">Team</a></div>
+        <div class="footer__cols__col__link"><a 
href="/contribute/presentation-materials/">Media</a></div>
+

<TRUNCATED>

Reply via email to