http://git-wip-us.apache.org/repos/asf/beam-site/blob/5c993c61/content/contribute/runner-guide/index.html ---------------------------------------------------------------------- diff --git a/content/contribute/runner-guide/index.html b/content/contribute/runner-guide/index.html new file mode 100644 index 0000000..2dc6917 --- /dev/null +++ b/content/contribute/runner-guide/index.html @@ -0,0 +1,1375 @@ +<!DOCTYPE html> +<html lang="en"> + <head> + <meta charset="utf-8"> + <meta http-equiv="X-UA-Compatible" content="IE=edge"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <title>Runner Authoring Guide</title> + <meta name="description" content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes. +"> + <link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400" rel="stylesheet"> + <link rel="stylesheet" href="/css/site.css"> + <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.0/jquery.min.js"></script> + <script src="/js/bootstrap.min.js"></script> + <script src="/js/language-switch.js"></script> + <link rel="canonical" href="https://beam.apache.org/contribute/runner-guide/" data-proofer-ignore> + <link rel="shortcut icon" type="image/x-icon" href="/images/favicon.ico"> + <link rel="alternate" type="application/rss+xml" title="Apache Beam" href="https://beam.apache.org/feed.xml"> + <script> + (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ + (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), + m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) + })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); + ga('create', 'UA-73650088-1', 'auto'); + ga('send', 'pageview'); + </script> +</head> + + <body class="body "> + <nav class="header navbar navbar-fixed-top"> + <div class="navbar-header"> + <a href="/" class="navbar-brand" > + <img alt="Brand" style="height: 25px" src="/images/beam_logo_navbar.png"> + </a> + <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#navbar" aria-expanded="false" aria-controls="navbar"> + <span class="sr-only">Toggle navigation</span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + <span class="icon-bar"></span> + </button> + </div> + <div id="navbar" class="navbar-collapse collapse"> + <ul class="nav navbar-nav"> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Get Started <span class="caret"></span></a> + <ul class="dropdown-menu"> + <li><a href="/get-started/beam-overview/">Beam Overview</a></li> + <li><a href="/get-started/quickstart-java/">Quickstart - Java</a></li> + <li><a href="/get-started/quickstart-py/">Quickstart - Python</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Example Walkthroughs</li> + <li><a href="/get-started/wordcount-example/">WordCount</a></li> + <li><a href="/get-started/mobile-gaming-example/">Mobile Gaming</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Resources</li> + <li><a href="/get-started/downloads">Downloads</a></li> + <li><a href="/get-started/support">Support</a></li> + </ul> + </li> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Documentation <span class="caret"></span></a> + <ul class="dropdown-menu"> + <li><a href="/documentation">Using the Documentation</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Beam Concepts</li> + <li><a href="/documentation/programming-guide/">Programming Guide</a></li> + <li><a href="/documentation/resources/">Additional Resources</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Pipeline Fundamentals</li> + <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li> + <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li> + <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li> + <li><a href="/documentation/io/io-toc/">Pipeline I/O</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">SDKs</li> + <li><a href="/documentation/sdks/java/">Java SDK</a></li> + <li><a href="/documentation/sdks/javadoc/2.0.0/" target="_blank">Java SDK API Reference <img src="/images/external-link-icon.png" + width="14" height="14" + alt="External link."></a> + </li> + <li><a href="/documentation/sdks/python/">Python SDK</a></li> + <li><a href="/documentation/sdks/pydoc/2.0.0/" target="_blank">Python SDK API Reference <img src="/images/external-link-icon.png" + width="14" height="14" + alt="External link."></a> + </li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Runners</li> + <li><a href="/documentation/runners/capability-matrix/">Capability Matrix</a></li> + <li><a href="/documentation/runners/direct/">Direct Runner</a></li> + <li><a href="/documentation/runners/apex/">Apache Apex Runner</a></li> + <li><a href="/documentation/runners/flink/">Apache Flink Runner</a></li> + <li><a href="/documentation/runners/spark/">Apache Spark Runner</a></li> + <li><a href="/documentation/runners/dataflow/">Cloud Dataflow Runner</a></li> + </ul> + </li> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false">Contribute <span class="caret"></span></a> + <ul class="dropdown-menu"> + <li><a href="/contribute">Get Started Contributing</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Guides</li> + <li><a href="/contribute/contribution-guide/">Contribution Guide</a></li> + <li><a href="/contribute/testing/">Testing Guide</a></li> + <li><a href="/contribute/release-guide/">Release Guide</a></li> + <li><a href="/contribute/ptransform-style-guide/">PTransform Style Guide</a></li> + <li><a href="/contribute/runner-guide/">Runner Authoring Guide</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Technical References</li> + <li><a href="/contribute/design-principles/">Design Principles</a></li> + <li><a href="/contribute/work-in-progress/">Ongoing Projects</a></li> + <li><a href="/contribute/source-repository/">Source Repository</a></li> + <li role="separator" class="divider"></li> + <li class="dropdown-header">Promotion</li> + <li><a href="/contribute/presentation-materials/">Presentation Materials</a></li> + <li><a href="/contribute/logos/">Logos and Design</a></li> + <li role="separator" class="divider"></li> + <li><a href="/contribute/maturity-model/">Maturity Model</a></li> + <li><a href="/contribute/team/">Team</a></li> + </ul> + </li> + + <li><a href="/blog">Blog</a></li> + </ul> + <ul class="nav navbar-nav navbar-right"> + <li class="dropdown"> + <a href="#" class="dropdown-toggle" data-toggle="dropdown" role="button" aria-haspopup="true" aria-expanded="false"><img src="https://www.apache.org/foundation/press/kit/feather_small.png" alt="Apache Logo" style="height:20px;"><span class="caret"></span></a> + <ul class="dropdown-menu dropdown-menu-right"> + <li><a href="http://www.apache.org/">ASF Homepage</a></li> + <li><a href="http://www.apache.org/licenses/">License</a></li> + <li><a href="http://www.apache.org/security/">Security</a></li> + <li><a href="http://www.apache.org/foundation/thanks.html">Thanks</a></li> + <li><a href="http://www.apache.org/foundation/sponsorship.html">Sponsorship</a></li> + <li><a href="https://www.apache.org/foundation/policies/conduct">Code of Conduct</a></li> + </ul> + </li> + </ul> + </div><!--/.nav-collapse --> +</nav> + + <div class="body__contained"> + <h1 id="runner-authoring-guide">Runner Authoring Guide</h1> + +<p>This guide walks through how to implement a new runner. It is aimed at someone +who has a data processing system and wants to use it to execute a Beam +pipeline. The guide starts from the basics, to help you evaluate the work +ahead. Then the sections become more and more detailed, to be a resource +throughout the development of your runner.</p> + +<p>Topics covered:</p> + +<ul id="markdown-toc"> + <li><a href="#basics-of-the-beam-model" id="markdown-toc-basics-of-the-beam-model">Basics of the Beam model</a> <ul> + <li><a href="#pipeline" id="markdown-toc-pipeline">Pipeline</a></li> + <li><a href="#ptransforms" id="markdown-toc-ptransforms">PTransforms</a></li> + <li><a href="#pcollections" id="markdown-toc-pcollections">PCollections</a> <ul> + <li><a href="#bounded-vs-unbounded" id="markdown-toc-bounded-vs-unbounded">Bounded vs Unbounded</a></li> + <li><a href="#timestamps" id="markdown-toc-timestamps">Timestamps</a></li> + <li><a href="#watermarks" id="markdown-toc-watermarks">Watermarks</a></li> + <li><a href="#windowed-elements" id="markdown-toc-windowed-elements">Windowed elements</a></li> + <li><a href="#coder" id="markdown-toc-coder">Coder</a></li> + <li><a href="#windowing-strategy" id="markdown-toc-windowing-strategy">Windowing Strategy</a></li> + </ul> + </li> + <li><a href="#user-defined-functions-udfs" id="markdown-toc-user-defined-functions-udfs">User-Defined Functions (UDFs)</a></li> + <li><a href="#runner" id="markdown-toc-runner">Runner</a></li> + </ul> + </li> + <li><a href="#implementing-the-beam-primitives" id="markdown-toc-implementing-the-beam-primitives">Implementing the Beam Primitives</a> <ul> + <li><a href="#what-if-you-havent-implemented-some-of-these-features" id="markdown-toc-what-if-you-havent-implemented-some-of-these-features">What if you havenât implemented some of these features?</a></li> + <li><a href="#implementing-the-pardo-primitive" id="markdown-toc-implementing-the-pardo-primitive">Implementing the ParDo primitive</a> <ul> + <li><a href="#bundles" id="markdown-toc-bundles">Bundles</a></li> + <li><a href="#the-dofn-lifecycle" id="markdown-toc-the-dofn-lifecycle">The DoFn Lifecycle</a></li> + <li><a href="#dofnrunners" id="markdown-toc-dofnrunners">DoFnRunner(s)</a></li> + <li><a href="#side-inputs" id="markdown-toc-side-inputs">Side Inputs</a></li> + <li><a href="#state-and-timers" id="markdown-toc-state-and-timers">State and Timers</a></li> + <li><a href="#splittable-dofn" id="markdown-toc-splittable-dofn">Splittable DoFn</a></li> + </ul> + </li> + <li><a href="#implementing-the-groupbykey-and-window-primitive" id="markdown-toc-implementing-the-groupbykey-and-window-primitive">Implementing the GroupByKey (and window) primitive</a> <ul> + <li><a href="#group-by-encoded-bytes" id="markdown-toc-group-by-encoded-bytes">Group By Encoded Bytes</a></li> + <li><a href="#window-merging" id="markdown-toc-window-merging">Window Merging</a></li> + <li><a href="#implementing-via-groupbykeyonly--groupalsobywindow" id="markdown-toc-implementing-via-groupbykeyonly--groupalsobywindow">Implementing via GroupByKeyOnly + GroupAlsoByWindow</a></li> + <li><a href="#dropping-late-data" id="markdown-toc-dropping-late-data">Dropping late data</a></li> + <li><a href="#triggering" id="markdown-toc-triggering">Triggering</a></li> + <li><a href="#timestampcombiner" id="markdown-toc-timestampcombiner">TimestampCombiner</a></li> + </ul> + </li> + <li><a href="#implementing-the-window-primitive" id="markdown-toc-implementing-the-window-primitive">Implementing the Window primitive</a></li> + <li><a href="#implementing-the-read-primitive" id="markdown-toc-implementing-the-read-primitive">Implementing the Read primitive</a> <ul> + <li><a href="#reading-from-an-unboundedsource" id="markdown-toc-reading-from-an-unboundedsource">Reading from an UnboundedSource</a></li> + <li><a href="#reading-from-a-boundedsource" id="markdown-toc-reading-from-a-boundedsource">Reading from a BoundedSource</a></li> + </ul> + </li> + <li><a href="#implementing-the-flatten-primitive" id="markdown-toc-implementing-the-flatten-primitive">Implementing the Flatten primitive</a></li> + <li><a href="#special-mention-the-combine-composite" id="markdown-toc-special-mention-the-combine-composite">Special mention: the Combine composite</a></li> + </ul> + </li> + <li><a href="#working-with-pipelines" id="markdown-toc-working-with-pipelines">Working with pipelines</a> <ul> + <li><a href="#traversing-a-pipeline" id="markdown-toc-traversing-a-pipeline">Traversing a pipeline</a></li> + <li><a href="#altering-a-pipeline" id="markdown-toc-altering-a-pipeline">Altering a pipeline</a></li> + </ul> + </li> + <li><a href="#testing-your-runner" id="markdown-toc-testing-your-runner">Testing your runner</a></li> + <li><a href="#integrating-your-runner-nicely-with-sdks" id="markdown-toc-integrating-your-runner-nicely-with-sdks">Integrating your runner nicely with SDKs</a> <ul> + <li><a href="#integrating-with-the-java-sdk" id="markdown-toc-integrating-with-the-java-sdk">Integrating with the Java SDK</a> <ul> + <li><a href="#allowing-users-to-pass-options-to-your-runner" id="markdown-toc-allowing-users-to-pass-options-to-your-runner">Allowing users to pass options to your runner</a></li> + <li><a href="#registering-your-runner-with-sdks-for-command-line-use" id="markdown-toc-registering-your-runner-with-sdks-for-command-line-use">Registering your runner with SDKs for command line use</a></li> + </ul> + </li> + <li><a href="#integrating-with-the-python-sdk" id="markdown-toc-integrating-with-the-python-sdk">Integrating with the Python SDK</a></li> + </ul> + </li> + <li><a href="#writing-an-sdk-independent-runner" id="markdown-toc-writing-an-sdk-independent-runner">Writing an SDK-independent runner</a> <ul> + <li><a href="#the-fn-api" id="markdown-toc-the-fn-api">The Fn API</a></li> + <li><a href="#the-runner-api" id="markdown-toc-the-runner-api">The Runner API</a></li> + </ul> + </li> + <li><a href="#the-runner-api-protos" id="markdown-toc-the-runner-api-protos">The Runner API protos</a> <ul> + <li><a href="#functionspec-proto" id="markdown-toc-functionspec-proto"><code class="highlighter-rouge">FunctionSpec</code> proto</a></li> + <li><a href="#sdkfunctionspec-proto" id="markdown-toc-sdkfunctionspec-proto"><code class="highlighter-rouge">SdkFunctionSpec</code> proto</a></li> + <li><a href="#primitive-transform-payload-protos" id="markdown-toc-primitive-transform-payload-protos">Primitive transform payload protos</a> <ul> + <li><a href="#pardopayload-proto" id="markdown-toc-pardopayload-proto"><code class="highlighter-rouge">ParDoPayload</code> proto</a></li> + <li><a href="#readpayload-proto" id="markdown-toc-readpayload-proto"><code class="highlighter-rouge">ReadPayload</code> proto</a></li> + <li><a href="#windowintopayload-proto" id="markdown-toc-windowintopayload-proto"><code class="highlighter-rouge">WindowIntoPayload</code> proto</a></li> + <li><a href="#combinepayload-proto" id="markdown-toc-combinepayload-proto"><code class="highlighter-rouge">CombinePayload</code> proto</a></li> + </ul> + </li> + <li><a href="#ptransform-proto" id="markdown-toc-ptransform-proto"><code class="highlighter-rouge">PTransform</code> proto</a></li> + <li><a href="#pcollection-proto" id="markdown-toc-pcollection-proto"><code class="highlighter-rouge">PCollection</code> proto</a></li> + <li><a href="#coder-proto" id="markdown-toc-coder-proto"><code class="highlighter-rouge">Coder</code> proto</a></li> + </ul> + </li> + <li><a href="#the-runner-api-rpcs" id="markdown-toc-the-runner-api-rpcs">The Runner API RPCs</a> <ul> + <li><a href="#pipelinerunnerrunpipeline-rpc" id="markdown-toc-pipelinerunnerrunpipeline-rpc"><code class="highlighter-rouge">PipelineRunner.run(Pipeline)</code> RPC</a></li> + <li><a href="#pipelineresult-aka-job-api" id="markdown-toc-pipelineresult-aka-job-api"><code class="highlighter-rouge">PipelineResult</code> aka âJob APIâ</a></li> + </ul> + </li> +</ul> + +<h2 id="basics-of-the-beam-model">Basics of the Beam model</h2> + +<p>Suppose you have a data processing engine that can pretty easily process graphs +of operations. You want to integrate it with the Beam ecosystem to get access +to other languages, great event time processing, and a library of connectors. +You need to know the core vocabulary:</p> + +<ul> + <li><a href="#pipeline"><em>Pipeline</em></a> - A pipeline is a graph of transformations that a user constructs +that defines the data processing they want to do.</li> + <li><a href="#pcollections"><em>PCollection</em></a> - Data being processed in a pipeline is part of a PCollection.</li> + <li><a href="#ptransforms"><em>PTransforms</em></a> - The operations executed within a pipeline. These are best +thought of as operations on PCollections.</li> + <li><em>SDK</em> - A language-specific library for pipeline authors (we often call them +âusersâ even though we have many kinds of users) to build transforms, +construct their pipelines and submit them to a runner</li> + <li><em>Runner</em> - You are going to write a piece of software called a runner that +takes a Beam pipeline and executes it using the capabilities of your data +processing engine.</li> +</ul> + +<p>These concepts may be very similar to your processing engineâs concepts. Since +Beamâs design is for cross-language operation and reusable libraries of +transforms, there are some special features worth highlighting.</p> + +<h3 id="pipeline">Pipeline</h3> + +<p>A pipeline in Beam is a graph of PTransforms operating on PCollections. A +pipeline is constructed by a user in their SDK of choice, and makes its way to +your runner either via the SDK directly or via the Runner APIâs (forthcoming) +RPC interfaces.</p> + +<h3 id="ptransforms">PTransforms</h3> + +<p>In Beam, a PTransform can be one of the five primitives or it can be a +composite transform encapsulating a subgraph. The primitives are:</p> + +<ul> + <li><a href="#implementing-the-read-primitive"><em>Read</em></a> - parallel connectors to external +systems</li> + <li><a href="#implementing-the-pardo-primitive"><em>ParDo</em></a> - per element processing</li> + <li><a href="#implementing-the-groupbykey-and-window-primitive"><em>GroupByKey</em></a> - +aggregating elements per key and window</li> + <li><a href="#implementing-the-flatten-primitive"><em>Flatten</em></a> - union of PCollections</li> + <li><a href="#implementing-the-window-primitive"><em>Window</em></a> - set the windowing strategy +for a PCollection</li> +</ul> + +<p>When implementing a runner, these are the operations you need to implement. +Composite transforms may or may not be important to your runner. If you expose +a UI, maintaining some of the composite structure will make the pipeline easier +for a user to understand. But the result of processing is not changed.</p> + +<h3 id="pcollections">PCollections</h3> + +<p>A PCollection is an unordered bag of elements. Your runner will be responsible +for storing these elements. There are some major aspects of a PCollection to +note:</p> + +<h4 id="bounded-vs-unbounded">Bounded vs Unbounded</h4> + +<p>A PCollection may be bounded or unbounded.</p> + +<ul> + <li><em>Bounded</em> - it is finite and you know it, as in batch use cases</li> + <li><em>Unbounded</em> - it may be never end, you donât know, as in streaming use cases</li> +</ul> + +<p>These derive from the intuitions of batch and stream processing, but the two +are unified in Beam and bounded and unbounded PCollections can coexist in the +same pipeline. If your runner can only support bounded PCollections, youâll +need to reject pipelines that contain unbounded PCollections. If your +runner is only really targeting streams, there are adapters in our support code +to convert everything to APIs targeting unbounded data.</p> + +<h4 id="timestamps">Timestamps</h4> + +<p>Every element in a PCollection has a timestamp associated with it.</p> + +<p>When you execute a primitive connector to some storage system, that connector +is responsible for providing initial timestamps. Your runner will need to +propagate and aggregate timestamps. If the timestamp is not important, as with +certain batch processing jobs where elements do not denote events, they will be +the minimum representable timestamp, often referred to colloquially as +ânegative infinityâ.</p> + +<h4 id="watermarks">Watermarks</h4> + +<p>Every PCollection has to have a watermark that estimates how complete the +PCollection is.</p> + +<p>The watermark is a guess that âweâll never see an element with an earlier +timestampâ. Sources of data are responsible for producing a watermark. Your +runner needs to implement watermark propagation as PCollections are processed, +merged, and partitioned.</p> + +<p>The contents of a PCollection are complete when a watermark advances to +âinfinityâ. In this manner, you may discover that an unbounded PCollection is +finite.</p> + +<h4 id="windowed-elements">Windowed elements</h4> + +<p>Every element in a PCollection resides in a window. No element resides in +multiple windows (two elements can be equal except for their window, but they +are not the same).</p> + +<p>When elements are read from the outside world they arrive in the global window. +When they are written to the outside world, they are effectively placed back +into the global window (any writing transform that doesnât take this +perspective probably risks data loss).</p> + +<p>A window has a maximum timestamp, and when the watermark exceeds this plus +user-specified allowed lateness the window is expired. All data related +to an expired window may be discarded at any time.</p> + +<h4 id="coder">Coder</h4> + +<p>Every PCollection has a coder, a specification of the binary format of the elements.</p> + +<p>In Beam, the userâs pipeline may be written in a language other than the +language of the runner. There is no expectation that the runner can actually +deserialize user data. So the Beam model operates principally on encoded data - +âjust bytesâ. Each PCollection has a declared encoding for its elements, called +a coder. A coder has a URN that identifies the encoding, and may have +additional sub-coders (for example, a coder for lists may contain a coder for +the elements of the list). Language-specific serialization techniques can, and +frequently are used, but there are a few key formats - such as key-value pairs +and timestamps - that are common so your runner can understand them.</p> + +<h4 id="windowing-strategy">Windowing Strategy</h4> + +<p>Every PCollection has a windowing strategy, a specification of essential +information for grouping and triggering operations.</p> + +<p>The details will be discussed below when we discuss the +<a href="#implementing-the-window-primitive">Window</a> primitive, which sets up the +windowing strategy, and +<a href="#implementing-the-groupbykey-and-window-primitive">GroupByKey</a> primitive, +which has behavior governed by the windowing strategy.</p> + +<h3 id="user-defined-functions-udfs">User-Defined Functions (UDFs)</h3> + +<p>Beam has seven varieties of user-defined function (UDF). A Beam pipeline +may contain UDFs written in a language other than your runner, or even multiple +languages in the same pipeline (see the <a href="#the-runner-api">Runner API</a>) so the +definitions are language-independent (see the <a href="#the-fn-api">Fn API</a>).</p> + +<p>The UDFs of Beam are:</p> + +<ul> + <li><em>DoFn</em> - per-element processing function (used in ParDo)</li> + <li><em>WindowFn</em> - places elements in windows and merges windows (used in Window +and GroupByKey)</li> + <li><em>Source</em> - emits data read from external sources, including initial and +dynamic splitting for parallelism (used in Read)</li> + <li><em>ViewFn</em> - adapts a materialized PCollection to a particular interface (used +in side inputs)</li> + <li><em>WindowMappingFn</em> - maps one elementâs window to another, and specifies +bounds on how far in the past the result window will be (used in side +inputs)</li> + <li><em>CombineFn</em> - associative and commutative aggregation (used in Combine and +state)</li> + <li><em>Coder</em> - encodes user data; some coders have standard formats and are not really UDFs</li> +</ul> + +<p>The various types of user-defined functions will be described further alongside +the primitives that use them.</p> + +<h3 id="runner">Runner</h3> + +<p>The term ârunnerâ is used for a couple of things. It generally refers to the +software that takes a Beam pipeline and executes it somehow. Often, this is the +translation code that you write. It usually also includes some customized +operators for your data processing engine, and is sometimes used to refer to +the full stack.</p> + +<p>A runner has just a single method <code class="highlighter-rouge">run(Pipeline)</code>. From here on, I will often +use code font for proper nouns in our APIs, whether or not the identifiers +match across all SDKs.</p> + +<p>The <code class="highlighter-rouge">run(Pipeline)</code> method should be asynchronous and results in a +PipelineResult which generally will be a job descriptor for your data +processing engine, provides methods for checking its status, canceling it, and +waiting for it to terminate.</p> + +<h2 id="implementing-the-beam-primitives">Implementing the Beam Primitives</h2> + +<p>Aside from encoding and persisting data - which presumably your engine already +does in some way or another - most of what you need to do is implement the Beam +primitives. This section provides a detailed look at each primitive, covering +what you need to know that might not be obvious and what support code is +provided.</p> + +<p>The primitives are designed for the benefit of pipeline authors, not runner +authors. Each represents a different conceptual mode of operation (external IO, +element-wise, grouping, windowing, union) rather than a specific implementation +decision. The same primitive may require very different implementation based +on how the user instantiates it. For example, a <code class="highlighter-rouge">ParDo</code> that uses state or +timers may require key partitioning, a <code class="highlighter-rouge">GroupByKey</code> with speculative triggering +may require a more costly or complex implementation, and <code class="highlighter-rouge">Read</code> is completely +different for bounded and unbounded data.</p> + +<h3 id="what-if-you-havent-implemented-some-of-these-features">What if you havenât implemented some of these features?</h3> + +<p>Thatâs OK! You donât have to do it all at once, and there may even be features +that donât make sense for your runner to ever support. We maintain a +<a href="/documentation/runners/capability-matrix/">capability matrix</a> on the Beam site so you can tell +users what you support. When you receive a <code class="highlighter-rouge">Pipeline</code>, you should traverse it +and determine whether or not you can execute each <code class="highlighter-rouge">DoFn</code> that you find. If +you cannot execute some <code class="highlighter-rouge">DoFn</code> in the pipeline (or if there is any other +requirement that your runner lacks) you should reject the pipeline. In your +native environment, this may look like throwing an +<code class="highlighter-rouge">UnsupportedOperationException</code>. The Runner API RPCs will make this explicit, +for cross-language portability.</p> + +<h3 id="implementing-the-pardo-primitive">Implementing the ParDo primitive</h3> + +<p>The <code class="highlighter-rouge">ParDo</code> primitive describes element-wise transformation for a +<code class="highlighter-rouge">PCollection</code>. <code class="highlighter-rouge">ParDo</code> is the most complex primitive, because it is where any +per-element processing is described. In addition to very simple operations like +standard <code class="highlighter-rouge">map</code> or <code class="highlighter-rouge">flatMap</code> from functional programming, <code class="highlighter-rouge">ParDo</code> also supports +multiple outputs, side inputs, initialization, flushing, teardown, and stateful +processing.</p> + +<p>The UDF that is applied to each element is called a <code class="highlighter-rouge">DoFn</code>. The exact APIs for +a <code class="highlighter-rouge">DoFn</code> can vary per language/SDK but generally follow the same pattern, so we +can discuss it with pseudocode. I will also often refer to the Java support +code, since I know it and most of our current and future runners are +Java-based.</p> + +<h4 id="bundles">Bundles</h4> + +<p>For correctness, a <code class="highlighter-rouge">DoFn</code> <em>should</em> represent an element-wise function, but in +fact is a long-lived object that processes elements in small groups called +bundles.</p> + +<p>Your runner decides how many elements, and which elements, to include in a +bundle, and can even decide dynamically in the middle of processing that the +current bundle has âendedâ. How a bundle is processed ties in with the rest of +a DoFnâs lifecycle.</p> + +<p>It will generally improve throughput to make the largest bundles possible, so +that initialization and finalization costs are amortized over many elements. +But if your data is arriving as a stream, then you will want to terminate a +bundle in order to achieve appropriate latency, so bundles may be just a few +elements.</p> + +<h4 id="the-dofn-lifecycle">The DoFn Lifecycle</h4> + +<p>While each languageâs SDK is free to make different decisions, the Python and +Java SDKs share an API with the following stages of a DoFnâs lifecycle.</p> + +<p>However, if you choose to execute a DoFn directly to improve performance or +single-language simplicity, then your runner is responsible for implementing +the following sequence:</p> + +<ul> + <li><em>Setup</em> - called once per DoFn instance before anything else; this has not been +implemented in the Python SDK so the user can work around just with lazy +initialization</li> + <li><em>StartBundle</em> - called once per bundle as initialization (actually, lazy +initialization is almost always equivalent and more efficient, but this hook +remains for simplicity for users)</li> + <li><em>ProcessElement</em> / <em>OnTimer</em> - called for each element and timer activation</li> + <li><em>FinishBundle</em> - essentially âflushâ; required to be called before +considering elements actually processed</li> + <li><em>Teardown</em> - release resources that were used across bundles; calling this +can be best effort due to failures</li> +</ul> + +<h4 id="dofnrunners">DoFnRunner(s)</h4> + +<p>This is a support class that has manifestations in both the Java codebase and +the Python codebase.</p> + +<p><strong>Java</strong></p> + +<p>In Java, the <code class="highlighter-rouge">beam-runners-core-java</code> library provides an interface +<code class="highlighter-rouge">DoFnRunner</code> for bundle processing, with implementations for many situations.</p> + +<div class="language-java no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">interface</span> <span class="nc">DoFnRunner</span><span class="o"><</span><span class="n">InputT</span><span class="o">,</span> <span class="n">OutputT</span><span class="o">></span> <span class="o">{</span> + <span class="kt">void</span> <span class="nf">startBundle</span><span class="o">();</span> + <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">WindowedValue</span><span class="o"><</span><span class="n">InputT</span><span class="o">></span> <span class="n">elem</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">onTimer</span><span class="o">(</span><span class="n">String</span> <span class="n">timerId</span><span class="o">,</span> <span class="n">BoundedWindow</span> <span class="n">window</span><span class="o">,</span> <span class="n">Instant</span> <span class="n">timestamp</span><span class="o">,</span> <span class="n">TimeDomain</span> <span class="n">timeDomain</span><span class="o">);</span> + <span class="kt">void</span> <span class="nf">finishBundle</span><span class="o">();</span> +<span class="o">}</span> +</code></pre> +</div> + +<p>There are some implementations and variations of this for different scenarios:</p> + +<ul> + <li><a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java"><code class="highlighter-rouge">SimpleDoFnRunner</code></a> - +not actually simple at all; implements lots of the core functionality of +<code class="highlighter-rouge">ParDo</code>. This is how most runners execute most <code class="highlighter-rouge">DoFns</code>.</li> + <li><a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java"><code class="highlighter-rouge">LateDataDroppingDoFnRunner</code></a> - +wraps a <code class="highlighter-rouge">DoFnRunner</code> and drops data from expired windows so the wrapped +<code class="highlighter-rouge">DoFnRunner</code> doesnât get any unpleasant surprises</li> + <li><a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java"><code class="highlighter-rouge">StatefulDoFnRunner</code></a> - +handles collecting expired state</li> + <li><a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java"><code class="highlighter-rouge">PushBackSideInputDoFnRunner</code></a> - +buffers input while waiting for side inputs to be ready</li> +</ul> + +<p>These are all used heavily in implementations of Java runners. Invocations +via the <a href="#the-fn-api">Fn API</a> may manifest as another implementation of +<code class="highlighter-rouge">DoFnRunner</code> even though it will be doing far more than running a <code class="highlighter-rouge">DoFn</code>.</p> + +<p><strong>Python</strong></p> + +<p>See the <a href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.runners.html#apache_beam.runners.common.DoFnRunner">DoFnRunner pydoc</a>.</p> + +<h4 id="side-inputs">Side Inputs</h4> + +<p><em>Main design document: +<a href="https://s.apache.org/beam-side-inputs-1-pager">https://s.apache.org/beam-side-inputs-1-pager</a></em></p> + +<p>A side input is a global view of a window of a <code class="highlighter-rouge">PCollection</code>. This distinguishes +it from the main input, which is processed one element at a time. The SDK/user +prepares a <code class="highlighter-rouge">PCollection</code> adequately, the runner materializes it, and then the +runner feeds it to the <code class="highlighter-rouge">DoFn</code>. See the</p> + +<p>What you will need to implement is to inspect the materialization requested for +the side input, and prepare it appropriately, and corresponding interactions +when a <code class="highlighter-rouge">DoFn</code> reads the side inputs.</p> + +<p>The details and available support code vary by language.</p> + +<p><strong>Java</strong></p> + +<p>If you are using one of the above <code class="highlighter-rouge">DoFnRunner</code> classes, then the interface for +letting them request side inputs is +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java"><code class="highlighter-rouge">SideInputReader</code></a>. +It is a simple mapping from side input and window to a value. The <code class="highlighter-rouge">DoFnRunner</code> +will perform a mapping with the +<a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java"><code class="highlighter-rouge">WindowMappingFn</code></a> +to request the appropriate window so you do not worry about invoking this UDF. +When using the Fn API, it will be the SDK harness that maps windows as well.</p> + +<p>A simple, but not necessarily optimal approach to building a +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java"><code class="highlighter-rouge">SideInputReader</code></a> +is to use a state backend. In our Java support code, this is called +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java"><code class="highlighter-rouge">StateInternals</code></a> +and you can build a +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java"><code class="highlighter-rouge">SideInputHandler</code></a> +that will use your <code class="highlighter-rouge">StateInternals</code> to materialize a <code class="highlighter-rouge">PCollection</code> into the +appropriate side input view and then yield the value when requested for a +particular side input and window.</p> + +<p>When a side input is needed but the side input has no data associated with it +for a given window, elements in that window must be deferred until the side +input has some data. The aforementioned +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java"><code class="highlighter-rouge">PushBackSideInputDoFnRunner</code></a> +is used to implement this.</p> + +<p><strong>Python</strong></p> + +<p>In Python, <a href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.sideinputs.SideInputMap"><code class="highlighter-rouge">SideInputMap</code></a> maps +windows to side input values. The <code class="highlighter-rouge">WindowMappingFn</code> manifests as a simple +function. See +<a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sideinputs.py">sideinputs.py</a>.</p> + +<h4 id="state-and-timers">State and Timers</h4> + +<p><em>Main design document: <a href="https://s.apache.org/beam-state">https://s.apache.org/beam-state</a></em></p> + +<p>When <code class="highlighter-rouge">ParDo</code> includes state and timers, its execution on your runner is usually +very different. See the full details beyond those covered here.</p> + +<p>State and timers are partitioned per key and window. You may need or want to +explicitly shuffle data to support this.</p> + +<p><strong>Java</strong></p> + +<p>We provide +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java"><code class="highlighter-rouge">StatefulDoFnRunner</code></a> +to help with state cleanup. The non-user-facing interface +<a href="https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java"><code class="highlighter-rouge">StateInternals</code></a> +is what a runner generally implements, and then the Beam support code can use +this to implement user-facing state.</p> + +<h4 id="splittable-dofn">Splittable DoFn</h4> + +<p><em>Main design document: <a href="https://s.apache.org/splittable-do-fn">https://s.apache.org/splittable-do-fn</a></em></p> + +<p>Splittable <code class="highlighter-rouge">DoFn</code> is a generalization and combination of <code class="highlighter-rouge">ParDo</code> and <code class="highlighter-rouge">Read</code>. It +is per-element processing where each element the capabilities of being âsplitâ +in the same ways as a <code class="highlighter-rouge">BoundedSource</code> or <code class="highlighter-rouge">UnboundedSource</code>. This enables better +performance for use cases such as a <code class="highlighter-rouge">PCollection</code> of names of large files where +you want to read each of them. Previously they would have to be static data in +the pipeline or be read in a non-splittable manner.</p> + +<p>This feature is still under development, but likely to become the new primitive +for reading. It is best to be aware of it and follow developments.</p> + +<h3 id="implementing-the-groupbykey-and-window-primitive">Implementing the GroupByKey (and window) primitive</h3> + +<p>The <code class="highlighter-rouge">GroupByKey</code> operation (sometimes called GBK for short) groups a +<code class="highlighter-rouge">PCollection</code> of key-value pairs by key and window, emitting results according +to the <code class="highlighter-rouge">PCollection</code>âs triggering configuration.</p> + +<p>It is quite a bit more elaborate than simply colocating elements with the same +key, and uses many fields from the <code class="highlighter-rouge">PCollection</code>âs windowing strategy.</p> + +<h4 id="group-by-encoded-bytes">Group By Encoded Bytes</h4> + +<p>For both the key and window, your runner sees them as âjust bytesâ. So you need +to group in a way that is consistent with grouping by those bytes, even if you +have some special knowledge of the types involved.</p> + +<p>The elements you are processing will be key-value pairs, and youâll need to extract +the keys. For this reason, the format of key-value pairs is standardized and +shared across all SDKS. See either +<a href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/coders/KvCoder.html"><code class="highlighter-rouge">KvCoder</code></a> +in Java or +<a href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.coders.html#apache_beam.coders.coders.TupleCoder.key_coder"><code class="highlighter-rouge">TupleCoder</code></a> +in Python for documentation on the binary format.</p> + +<h4 id="window-merging">Window Merging</h4> + +<p>As well as grouping by key, your runner must group elements by their window. A +<code class="highlighter-rouge">WindowFn</code> has the option of declaring that it merges windows on a per-key +basis. For example, session windows for the same key will be merged if they +overlap. So your runner must invoke the merge method of the <code class="highlighter-rouge">WindowFn</code> during +grouping.</p> + +<h4 id="implementing-via-groupbykeyonly--groupalsobywindow">Implementing via GroupByKeyOnly + GroupAlsoByWindow</h4> + +<p>The Java codebase includes support code for a particularly common way of +implement the full <code class="highlighter-rouge">GroupByKey</code> operation: first group the keys, and then group +by window. For merging windows, this is essentially required, since merging is +per key.</p> + +<h4 id="dropping-late-data">Dropping late data</h4> + +<p><em>Main design document: +<a href="https://s.apache.org/beam-lateness">https://s.apache.org/beam-lateness</a></em></p> + +<p>A window is expired in a <code class="highlighter-rouge">PCollection</code> if the watermark of the input PCollection +has exceeded the end of the window by at least the input <code class="highlighter-rouge">PCollection</code>âs +allowed lateness.</p> + +<p>Data for an expired window can be dropped any time and should be dropped at a +<code class="highlighter-rouge">GroupByKey</code>. If you are using <code class="highlighter-rouge">GroupAlsoByWindow</code>, then just before executing +this transform. You may shuffle less data if you drop data prior to +<code class="highlighter-rouge">GroupByKeyOnly</code>, but should only safely be done for non-merging windows, as a +window that appears expired may merge to become not expired.</p> + +<h4 id="triggering">Triggering</h4> + +<p><em>Main design document: +<a href="https://s.apache.org/beam-triggers">https://s.apache.org/beam-triggers</a></em></p> + +<p>The input <code class="highlighter-rouge">PCollection</code>âs trigger and accumulation mode specify when and how +outputs should be emitted from the <code class="highlighter-rouge">GroupByKey</code> operation.</p> + +<p>In Java, there is a lot of support code for executing triggers in the +<code class="highlighter-rouge">GroupAlsoByWindow</code> implementations, <code class="highlighter-rouge">ReduceFnRunner</code> (legacy name), and +<code class="highlighter-rouge">TriggerStateMachine</code>, which is an obvious way of implementing all triggers as +an event-driven machine over elements and timers.</p> + +<h4 id="timestampcombiner">TimestampCombiner</h4> + +<p>When an aggregated output is produced from multiple inputs, the <code class="highlighter-rouge">GroupByKey</code> +operation has to choose a timestamp for the combination. To do so, first the +WindowFn has a chance to shift timestamps - this is needed to ensure watermarks +do not prevent progress of windows like sliding windows (the details are beyond +this doc). Then, the shifted timestamps need to be combined - this is specified +by a <code class="highlighter-rouge">TimestampCombiner</code>, which can either select the minimum or maximum of its +inputs, or just ignore inputs and choose the end of the window.</p> + +<h3 id="implementing-the-window-primitive">Implementing the Window primitive</h3> + +<p>The window primitive applies a <code class="highlighter-rouge">WindowFn</code> UDF to place each input element into +one or more windows of its output PCollection. Note that the primitive also +generally configures other aspects of the windowing strategy for a <code class="highlighter-rouge">PCollection</code>, +but the fully constructed graph that your runner receive will already have a +complete windowing strategy for each <code class="highlighter-rouge">PCollection</code>.</p> + +<p>To implement this primitive, you need to invoke the provided WindowFn on each +element, which will return some set of windows for that element to be a part of +in the output <code class="highlighter-rouge">PCollection</code>.</p> + +<p><strong>Implementation considerations</strong></p> + +<p>A âwindowâ is just a second grouping key that has a âmaximum timestampâ. It can +be any arbitrary user-defined type. The <code class="highlighter-rouge">WindowFn</code> provides the coder for the +window type.</p> + +<p>Beamâs support code provides <code class="highlighter-rouge">WindowedValue</code> which is a compressed +representation of an element in multiple windows. You may want to do use this, +or your own compressed representation. Remember that it simply represents +multiple elements at the same time; there is no such thing as an element âin +multiple windowsâ.</p> + +<p>For values in the global window, you may want to use an even further compressed +representation that doesnât bother including the window at all.</p> + +<p>In the future, this primitive may be retired as it can be implemented as a +ParDo if the capabilities of ParDo are enhanced to allow output to new windows.</p> + +<h3 id="implementing-the-read-primitive">Implementing the Read primitive</h3> + +<p>You implement this primitive to read data from an external system. The APIs are +carefully crafted to enable efficient parallel execution. Reading from an +<code class="highlighter-rouge">UnboundedSource</code> is a bit different than reading from a <code class="highlighter-rouge">BoundedSource</code>.</p> + +<h4 id="reading-from-an-unboundedsource">Reading from an UnboundedSource</h4> + +<p>An <code class="highlighter-rouge">UnboundedSource</code> is a source of potentially infinite data; you can think of +it like a stream. The capabilities are:</p> + +<ul> + <li><code class="highlighter-rouge">split(int)</code> - your runner should call this to get the desired parallelism</li> + <li><code class="highlighter-rouge">createReader(...)</code> - call this to start reading elements; it is an enhanced iterator that also vends:</li> + <li>watermark (for this source) which you should propagate downstream +timestamps, which you should associate with elements read</li> + <li>record identifiers, so you can dedup downstream if needed</li> + <li>progress indication of its backlog</li> + <li>checkpointing</li> + <li><code class="highlighter-rouge">requiresDeduping</code> - this indicates that there is some chance that the source +may emit dupes; your runner should do its best to dedupe based on the +identifier attached to emitted records</li> +</ul> + +<p>An unbounded source has a custom type of checkpoints and an associated coder for serializing them.</p> + +<h4 id="reading-from-a-boundedsource">Reading from a BoundedSource</h4> + +<p>A <code class="highlighter-rouge">BoundedSource</code> is a source of data that you know is finite, such as a static +collection of log files, or a database table. The capabilities are:</p> + +<ul> + <li><code class="highlighter-rouge">split(int)</code> - your runner should call this to get desired initial parallelism (but you can often steal work later)</li> + <li><code class="highlighter-rouge">getEstimatedSizeBytes(...)</code> - self explanatory</li> + <li><code class="highlighter-rouge">createReader(...)</code> - call this to start reading elements; it is an enhanced iterator, with also:</li> + <li>timestamps to associate with each element read</li> + <li><code class="highlighter-rouge">splitAtFraction</code> for dynamic splitting to enable work stealing, and other +methods to support it - see the <a href="https://beam.apache.org/blog/2016/05/18/splitAtFraction-method.html">Beam blog post on dynamic work +rebalancing</a></li> +</ul> + +<p>The <code class="highlighter-rouge">BoundedSource</code> does not report a watermark currently. Most of the time, reading +from a bounded source can be parallelized in ways that result in utterly out-of-order +data, so a watermark is not terribly useful. +Thus the watermark for the output <code class="highlighter-rouge">PCollection</code> from a bounded read should +remain at the minimum timestamp throughout reading (otherwise data might get +dropped) and advance to the maximum timestamp when all data is exhausted.</p> + +<h3 id="implementing-the-flatten-primitive">Implementing the Flatten primitive</h3> + +<p>This one is easy - take as input a finite set of <code class="highlighter-rouge">PCollections</code> and outputs their +bag union, keeping windows intact.</p> + +<p>For this operation to make sense, it is the SDKâs responsibility to make sure +the windowing strategies are compatible.</p> + +<p>Also note that there is no requirement that the coders for all the <code class="highlighter-rouge">PCollections</code> +be the same. If your runner wants to require that (to avoid tedious +re-encoding) you have to enforce it yourself. Or you could just implement the +fast path as an optimization.</p> + +<h3 id="special-mention-the-combine-composite">Special mention: the Combine composite</h3> + +<p>A composite transform that is almost always treated specially by a runner is +<code class="highlighter-rouge">Combine</code> (per key), which applies an associative and commutative operator to +the elements of a <code class="highlighter-rouge">PCollection</code>. This composite is not a primitive. It is +implemented in terms of <code class="highlighter-rouge">ParDo</code> and <code class="highlighter-rouge">GroupByKey</code>, so your runner will work +without treating it - but it does carry additional information that you +probably want to use for optimizations: the associative-commutative operator, +known as a <code class="highlighter-rouge">CombineFn</code>.</p> + +<h2 id="working-with-pipelines">Working with pipelines</h2> + +<p>When you receive a pipeline from a user, you will need to translate it. This is +a tour of the APIs that youâll use to do it.</p> + +<h3 id="traversing-a-pipeline">Traversing a pipeline</h3> + +<p>Something you will likely do is to traverse a pipeline, probably to translate +it into primitives for your engine. The general pattern is to write a visitor +that builds a job specification as it walks the graph of <code class="highlighter-rouge">PTransforms</code>.</p> + +<p>The entry point for this in Java is +<a href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#traverseTopologically-org.apache.beam.sdk.Pipeline.PipelineVisitor-"><code class="highlighter-rouge">Pipeline.traverseTopologically</code></a> +and +<a href="https://beam.apache.org/documentation/sdks/pydoc/2.0.0/apache_beam.html#apache_beam.pipeline.Pipeline.visit"><code class="highlighter-rouge">Pipeline.visit</code></a> +in Python. See the generated documentation for details.</p> + +<h3 id="altering-a-pipeline">Altering a pipeline</h3> + +<p>Often, the best way to keep your +translator simple will be to alter the pipeline prior to translation. Some +alterations you might perform:</p> + +<ul> + <li>Elaboration of a Beam primitive into a composite transform that uses +multiple runner-specific primitives</li> + <li>Optimization of a Beam composite into a specialized primitive for your +runner</li> + <li>Replacement of a Beam composite with a different expansion more suitable for +your runner</li> +</ul> + +<p>The Java SDK and the ârunners core constructionâ library (the artifact is +<code class="highlighter-rouge">beam-runners-core-construction-java</code> and the namespaces is +<code class="highlighter-rouge">org.apache.beam.runners.core.construction</code>) contain helper code for this sort +of work. In Python, support code is still under development.</p> + +<p>All pipeline alteration is done via +<a href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#replaceAll-java.util.List-"><code class="highlighter-rouge">Pipeline.replaceAll(PTransformOverride)</code></a> +method. A +<a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java"><code class="highlighter-rouge">PTransformOverride</code></a> +is a pair of a +<a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java"><code class="highlighter-rouge">PTransformMatcher</code></a> +to select transforms for replacement and a +<a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java"><code class="highlighter-rouge">PTransformOverrideFactory</code></a> +to produce the replacement. All <code class="highlighter-rouge">PTransformMatchers</code> that have been needed by +runners to date are provided. Examples include: matching a specific class, +matching a <code class="highlighter-rouge">ParDo</code> where the <code class="highlighter-rouge">DoFn</code> uses state or timers, etc.</p> + +<h2 id="testing-your-runner">Testing your runner</h2> + +<p>The Beam Java SDK and Python SDK have suites of runner validation tests. The +configuration may evolve faster than this document, so check the configuration +of other Beam runners. But be aware that we have tests and you can use them +very easily! To enable these tests in a Java-based runner using Maven, you +scan the dependencies of the SDK for tests with the JUnit category +<code class="highlighter-rouge">ValidatesRunner</code>.</p> + +<div class="language-xml no-toggle highlighter-rouge"><pre class="highlight"><code><span class="nt"><plugin></span> + <span class="nt"><groupId></span>org.apache.maven.plugins<span class="nt"></groupId></span> + <span class="nt"><artifactId></span>maven-surefire-plugin<span class="nt"></artifactId></span> + <span class="nt"><executions></span> + <span class="nt"><execution></span> + <span class="nt"><id></span>validates-runner-tests<span class="nt"></id></span> + <span class="nt"><phase></span>integration-test<span class="nt"></phase></span> + <span class="nt"><goals><goal></span>test<span class="nt"></goal></goals></span> + <span class="nt"><configuration></span> + <span class="nt"><groups></span>org.apache.beam.sdk.testing.ValidatesRunner<span class="nt"></groups></span> + <span class="nt"><dependenciesToScan></span> + <span class="nt"><dependency></span>org.apache.beam:beam-sdks-java-core<span class="nt"></dependency></span> + <span class="nt"></dependenciesToScan></span> + <span class="nt"><systemPropertyVariables></span> + <span class="nt"><beamTestPipelineOptions></span> + [ + "--runner=MyRunner", + ⦠misc test options ⦠+ ] + <span class="nt"></beamTestPipelineOptions></span> + <span class="nt"></systemPropertyVariables></span> + <span class="nt"></configuration></span> + <span class="nt"></execution></span> + <span class="nt"></executions></span> +<span class="nt"></plugin></span> +</code></pre> +</div> + +<p>Enable these tests in other languages is unexplored.</p> + +<h2 id="integrating-your-runner-nicely-with-sdks">Integrating your runner nicely with SDKs</h2> + +<p>Whether or not your runner is based in the same language as an SDK (such as +Java), you will want to provide a shim to invoke it from another SDK if you +want the users of that SDK (such as Python) to use it.</p> + +<h3 id="integrating-with-the-java-sdk">Integrating with the Java SDK</h3> + +<h4 id="allowing-users-to-pass-options-to-your-runner">Allowing users to pass options to your runner</h4> + +<p>The mechanism for configuration is +<a href="https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/options/PipelineOptions.html"><code class="highlighter-rouge">PipelineOptions</code></a>, +an interface that works completely differently than normal Java objects. Forget +what you know, and follow the rules, and <code class="highlighter-rouge">PipelineOptions</code> will treat you well.</p> + +<p>You must implement a sub-interface for your runner with getters and setters +with matching names, like so:</p> + +<div class="language-java no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MyRunnerOptions</span> <span class="kd">extends</span> <span class="n">PipelineOptions</span> <span class="o">{</span> + <span class="nd">@Description</span><span class="o">(</span><span class="s">"The Foo to use with MyRunner"</span><span class="o">)</span> + <span class="nd">@Required</span> + <span class="kd">public</span> <span class="n">Foo</span> <span class="nf">getMyRequiredFoo</span><span class="o">();</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setMyRequiredFoo</span><span class="o">(</span><span class="n">Foo</span> <span class="n">newValue</span><span class="o">);</span> + + <span class="nd">@Description</span><span class="o">(</span><span class="s">"Enable Baz; on by default"</span><span class="o">)</span> + <span class="nd">@Default</span><span class="o">.</span><span class="na">Boolean</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span> + <span class="kd">public</span> <span class="n">Boolean</span> <span class="nf">isBazEnabled</span><span class="o">();</span> + <span class="kd">public</span> <span class="kt">void</span> <span class="nf">setBazEnabled</span><span class="o">(</span><span class="n">Boolean</span> <span class="n">newValue</span><span class="o">);</span> +<span class="o">}</span> +</code></pre> +</div> + +<p>You can set up defaults, etc. See the javadoc for details. When your runner is +instantiated with a <code class="highlighter-rouge">PipelineOptions</code> object, you access your interface by +<code class="highlighter-rouge">options.as(MyRunnerOptions.class)</code>.</p> + +<p>To make these options available on the command line, you register your options +with a <code class="highlighter-rouge">PipelineOptionsRegistrar</code>. It is easy if you use <code class="highlighter-rouge">@AutoService</code>:</p> + +<div class="language-java no-toggle highlighter-rouge"><pre class="highlight"><code><span class="nd">@AutoService</span><span class="o">(</span><span class="n">PipelineOptionsRegistrar</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyOptionsRegistrar</span> <span class="kd">implements</span> <span class="n">PipelineOptionsRegistrar</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Iterable</span><span class="o"><</span><span class="n">Class</span><span class="o"><?</span> <span class="kd">extends</span> <span class="n">PipelineOptions</span><span class="o">>></span> <span class="nf">getPipelineOptions</span><span class="o">()</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">ImmutableList</span><span class="o">.<</span><span class="n">Class</span><span class="o"><?</span> <span class="kd">extends</span> <span class="n">PipelineOptions</span><span class="o">>></span><span class="n">of</span><span class="o">(</span><span class="n">MyRunnerOptions</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}</span> +</code></pre> +</div> + +<h4 id="registering-your-runner-with-sdks-for-command-line-use">Registering your runner with SDKs for command line use</h4> + +<p>To make your runner available on the command line, you register your options +with a <code class="highlighter-rouge">PipelineRunnerRegistrar</code>. It is easy if you use <code class="highlighter-rouge">@AutoService</code>:</p> + +<div class="language-java no-toggle highlighter-rouge"><pre class="highlight"><code><span class="nd">@AutoService</span><span class="o">(</span><span class="n">PipelineRunnerRegistrar</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> +<span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">MyRunnerRegistrar</span> <span class="kd">implements</span> <span class="n">PipelineRunnerRegistrar</span> <span class="o">{</span> + <span class="nd">@Override</span> + <span class="kd">public</span> <span class="n">Iterable</span><span class="o"><</span><span class="n">Class</span><span class="o"><?</span> <span class="kd">extends</span> <span class="n">PipelineRunner</span><span class="o">>></span> <span class="nf">getPipelineRunners</span><span class="o">()</span> <span class="o">{</span> + <span class="k">return</span> <span class="n">ImmutableList</span><span class="o">.<</span><span class="n">Class</span><span class="o"><?</span> <span class="kd">extends</span> <span class="n">PipelineRunner</span><span class="o">>></span><span class="n">of</span><span class="o">(</span><span class="n">MyRunner</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> + <span class="o">}</span> +<span class="o">}</span> +</code></pre> +</div> + +<h3 id="integrating-with-the-python-sdk">Integrating with the Python SDK</h3> + +<p>In the Python SDK the registration of the code is not automatic. So there are +few things to keep in mind when creating a new runner.</p> + +<p>Any dependencies on packages for the new runner should be options so create a +new target in <code class="highlighter-rouge">extra_requires</code> in <code class="highlighter-rouge">setup.py</code> that is needed for the new runner.</p> + +<p>All runner code should go in itâs own package in <code class="highlighter-rouge">apache_beam/runners</code> directory.</p> + +<p>Register the new runner in the <code class="highlighter-rouge">create_runner</code> function of <code class="highlighter-rouge">runner.py</code> so that the +partial name is matched with the correct class to be used.</p> + +<h2 id="writing-an-sdk-independent-runner">Writing an SDK-independent runner</h2> + +<p>There are two aspects to making your runner SDK-independent, able to run +pipelines written in other languages: The Fn API and the Runner API.</p> + +<h3 id="the-fn-api">The Fn API</h3> + +<p><em>Design documents:</em></p> + +<ul> + <li><em><a href="https://s.apache.org/beam-fn-api">https://s.apache.org/beam-fn-api</a></em></li> + <li><em><a href="https://s.apache.org/beam-fn-api-processing-a-bundle">https://s.apache.org/beam-fn-api-processing-a-bundle</a></em></li> + <li><em><a href="https://s.apache.org/beam-fn-api-send-and-receive-data">https://s.apache.org/beam-fn-api-send-and-receive-data</a></em></li> +</ul> + +<p>To run a userâs pipeline, you need to be able to invoke their UDFs. The Fn API +is an RPC interface for the standard UDFs of Beam, implemented using protocol +buffers over gRPC.</p> + +<p>The Fn API includes:</p> + +<ul> + <li>APIs for registering a subgraph of UDFs</li> + <li>APIs for streaming elements of a bundle</li> + <li>Shared data formats (key-value pairs, timestamps, iterables, etc)</li> +</ul> + +<p>You are fully welcome to <em>also</em> use the SDK for your language for utility code, +or provide optimized implementations of bundle processing for same-language +UDFs.</p> + +<h3 id="the-runner-api">The Runner API</h3> + +<p>The Runner API is an SDK-independent schema for a pipeline along with RPC +interfaces for launching a pipeline and checking the status of a job. The RPC +interfaces are still in development so for now we focus on the SDK-agnostic +representation of a pipeline. By examining a pipeline only through Runner API +interfaces, you remove your runnerâs dependence on the SDK for its language for +pipeline analysis and job translation.</p> + +<p>To execute such an SDK-independent pipeline, you will need to support the Fn +API. UDFs are embedded in the pipeline as a specification of the function +(often just opaque serialized bytes for a particular language) plus a +specification of an environment that can execute it (essentially a particular +SDK). So far, this specification is expected to be a URI for a Docker container +hosting the SDKâs Fn API harness.</p> + +<p>You are fully welcome to <em>also</em> use the SDK for your language, which may offer +useful utility code.</p> + +<p>The language-independent definition of a pipeline is described via a protocol +buffers schema, covered below for reference. But your runner <em>should not</em> +directly manipulate protobuf messages. Instead, the Beam codebase provides +utilities for working with pipelines so that you donât need to be aware of +whether or not the pipeline has ever been serialized or transmitted, or what +language it may have been written in to begin with.</p> + +<p><strong>Java</strong></p> + +<p>If your runner is Java-based, the tools to interact with pipelines in an +SDK-agnostic manner are in the <code class="highlighter-rouge">beam-runners-core-construction-java</code> +artifact, in the <code class="highlighter-rouge">org.apache.beam.runners.core.construction</code> namespace. +The utilities are named consistently, like so:</p> + +<ul> + <li><code class="highlighter-rouge">PTransformTranslation</code> - registry of known transforms and standard URNs</li> + <li><code class="highlighter-rouge">ParDoTranslation</code> - utilities for working with <code class="highlighter-rouge">ParDo</code> in a +language-independent manner</li> + <li><code class="highlighter-rouge">WindowIntoTranslation</code> - same for <code class="highlighter-rouge">Window</code></li> + <li><code class="highlighter-rouge">FlattenTranslation</code> - same for <code class="highlighter-rouge">Flatten</code></li> + <li><code class="highlighter-rouge">WindowingStrategyTranslation</code> - same for windowing strategies</li> + <li><code class="highlighter-rouge">CoderTranslation</code> - same for coders</li> + <li>⦠etc, etc â¦</li> +</ul> + +<p>By inspecting transforms only through these classes, your runner will not +depend on the particulars of the Java SDK.</p> + +<h2 id="the-runner-api-protos">The Runner API protos</h2> + +<p><a href="https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto">The Runner +API</a> +refers to a specific manifestation of the concepts in the Beam model, as a +protocol buffers schema. Even though you should not manipulate these messages +directly, it can be helpful to know the canonical data that makes up a +pipeline.</p> + +<p>Most of the API is exactly the same as the high-level description; you can get +started implementing a runner without understanding all the low-level details.</p> + +<p>The most important takeaway of the Runner API for you is that it is a +language-independent definition of a Beam pipeline. You will probably always +interact via a particular SDKâs support code wrapping these definitions with +sensible idiomatic APIs, but always be aware that this is the specification and +any other data is not necessarily inherent to the pipeline, but may be +SDK-specific enrichments (or bugs!).</p> + +<p>The UDFs in the pipeline may be written for any Beam SDK, or even multiple in +the same pipeline. So this is where we will start, taking a bottom-up approach +to understanding the protocol buffers definitions for UDFs before going back to +the higher-level, mostly obvious, record definitions.</p> + +<h3 id="functionspec-proto"><code class="highlighter-rouge">FunctionSpec</code> proto</h3> + +<p>The heart of cross-language portability is the <code class="highlighter-rouge">FunctionSpec</code>. This is a +language-independent specification of a function, in the usual programming +sense that includes side effects, etc.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">FunctionSpec</span> <span class="p">{</span> + <span class="kt">string</span> <span class="n">urn</span><span class="p">;</span> + <span class="n">google.protobuf.Any</span> <span class="n">parameter</span><span class="p">;</span> +<span class="p">}</span> +</code></pre> +</div> + +<p>A <code class="highlighter-rouge">FunctionSpec</code> includes a URN identifying the function as well as an arbitrary +fixed parameter. For example the (hypothetical) âmaxâ CombineFn might have the +URN <code class="highlighter-rouge">urn:beam:combinefn:max:0.1</code> and a parameter that indicates by what +comparison to take the max.</p> + +<p>For most UDFs in a pipeline constructed using a particular languageâs SDK, the +URN will indicate that the SDK must interpret it, for example +<code class="highlighter-rouge">urn:beam:dofn:javasdk:0.1</code> or <code class="highlighter-rouge">urn:beam:dofn:pythonsdk:0.1</code>. The parameter +will contain serialized code, such as a Java-serialized <code class="highlighter-rouge">DoFn</code> or a Python +pickled <code class="highlighter-rouge">DoFn</code>.</p> + +<p>A <code class="highlighter-rouge">FunctionSpec</code> is not only for UDFs. It is just a generic way to name/specify +any function. It is also used as the specification for a <code class="highlighter-rouge">PTransform</code>. But when +used in a <code class="highlighter-rouge">PTransform</code> it describes a function from <code class="highlighter-rouge">PCollection</code> to <code class="highlighter-rouge">PCollection</code> +and cannot be specific to an SDK because the runner is in charge of evaluating +transforms and producing <code class="highlighter-rouge">PCollections</code>.</p> + +<h3 id="sdkfunctionspec-proto"><code class="highlighter-rouge">SdkFunctionSpec</code> proto</h3> + +<p>When a <code class="highlighter-rouge">FunctionSpec</code> represents a UDF, in general only the SDK that serialized +it will be guaranteed to understand it. So in that case, it will always come +with an environment that can understand and execute the function. This is +represented by the <code class="highlighter-rouge">SdkFunctionSpec</code>.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">SdkFunctionSpec</span> <span class="p">{</span> + <span class="n">FunctionSpec</span> <span class="n">spec</span><span class="p">;</span> + <span class="kt">bytes</span> <span class="n">environment_id</span><span class="p">;</span> +<span class="p">}</span> +</code></pre> +</div> + +<p>In the Runner API, many objects are stored by reference. Here in the +<code class="highlighter-rouge">environment_id</code> is a pointer, local to the pipeline and just made up by the +SDK that serialized it, that can be dereferenced to yield the actual +environment proto.</p> + +<p>Thus far, an environment is expected to be a Docker container specification for +an SDK harness that can execute the specified UDF.</p> + +<h3 id="primitive-transform-payload-protos">Primitive transform payload protos</h3> + +<p>The payload for the primitive transforms are just proto serializations of their +specifications. Rather than reproduce their full code here, I will just +highlight the important pieces to show how they fit together.</p> + +<p>It is worth emphasizing again that while you probably will not interact +directly with these payloads, they are the only data that is inherently part of +the transform.</p> + +<h4 id="pardopayload-proto"><code class="highlighter-rouge">ParDoPayload</code> proto</h4> + +<p>A <code class="highlighter-rouge">ParDo</code> transform carries its <code class="highlighter-rouge">DoFn</code> in an <code class="highlighter-rouge">SdkFunctionSpec</code> and then +provides language-independent specifications for its other features - side +inputs, state declarations, timer declarations, etc.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">ParDoPayload</span> <span class="p">{</span> + <span class="n">SdkFunctionSpec</span> <span class="n">do_fn</span><span class="p">;</span> + <span class="n">map</span><span class="o"><</span><span class="kt">string</span><span class="p">,</span> <span class="n">SideInput</span><span class="err">></span> <span class="n">side_inputs</span><span class="p">;</span> + <span class="n">map</span><span class="o"><</span><span class="kt">string</span><span class="p">,</span> <span class="n">StateSpec</span><span class="err">></span> <span class="n">state_specs</span><span class="p">;</span> + <span class="n">map</span><span class="o"><</span><span class="kt">string</span><span class="p">,</span> <span class="n">TimerSpec</span><span class="err">></span> <span class="n">timer_specs</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + +<h4 id="readpayload-proto"><code class="highlighter-rouge">ReadPayload</code> proto</h4> + +<p>A <code class="highlighter-rouge">Read</code> transform carries an <code class="highlighter-rouge">SdkFunctionSpec</code> for its <code class="highlighter-rouge">Source</code> UDF.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">ReadPayload</span> <span class="p">{</span> + <span class="n">SdkFunctionSpec</span> <span class="n">source</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + +<h4 id="windowintopayload-proto"><code class="highlighter-rouge">WindowIntoPayload</code> proto</h4> + +<p>A <code class="highlighter-rouge">Window</code> transform carries an <code class="highlighter-rouge">SdkFunctionSpec</code> for its <code class="highlighter-rouge">WindowFn</code> UDF. It is +part of the Fn API that the runner passes this UDF along and tells the SDK +harness to use it to assign windows (as opposed to merging).</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">WindowIntoPayload</span> <span class="p">{</span> + <span class="n">SdkFunctionSpec</span> <span class="n">window_fn</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + +<h4 id="combinepayload-proto"><code class="highlighter-rouge">CombinePayload</code> proto</h4> + +<p><code class="highlighter-rouge">Combine</code> is not a primitive. But non-primitives are perfectly able to carry +additional information for better optimization. The most important thing that a +<code class="highlighter-rouge">Combine</code> transform carries is the <code class="highlighter-rouge">CombineFn</code> in an <code class="highlighter-rouge">SdkFunctionSpec</code> record. +In order to effectively carry out the optimizations desired, it is also +necessary to know the coder for intermediate accumulations, so it also carries +a reference to this coder.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">CombinePayload</span> <span class="p">{</span> + <span class="n">SdkFunctionSpec</span> <span class="n">combine_fn</span><span class="p">;</span> + <span class="kt">string</span> <span class="n">accumulator_coder_id</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + +<h3 id="ptransform-proto"><code class="highlighter-rouge">PTransform</code> proto</h3> + +<p>A <code class="highlighter-rouge">PTransform</code> is a function from <code class="highlighter-rouge">PCollection</code> to <code class="highlighter-rouge">PCollection</code>. This is +represented in the proto using a FunctionSpec. Note that this is not an +<code class="highlighter-rouge">SdkFunctionSpec</code>, since it is the runner that observes these. They will never +be passed back to an SDK harness; they do not represent a UDF.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">PTransform</span> <span class="p">{</span> + <span class="n">FunctionSpec</span> <span class="n">spec</span><span class="p">;</span> + <span class="k">repeated</span> <span class="kt">string</span> <span class="n">subtransforms</span><span class="p">;</span> + + <span class="c1">// Maps from local string names to PCollection ids +</span> <span class="n">map</span><span class="o"><</span><span class="kt">string</span><span class="p">,</span> <span class="kt">bytes</span><span class="err">></span> <span class="n">inputs</span><span class="p">;</span> + <span class="n">map</span><span class="o"><</span><span class="kt">string</span><span class="p">,</span> <span class="kt">bytes</span><span class="err">></span> <span class="n">outputs</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + +<p>A <code class="highlighter-rouge">PTransform</code> may have subtransforms if it is a composite, in which case the +<code class="highlighter-rouge">FunctionSpec</code> may be omitted since the subtransforms define its behavior.</p> + +<p>The input and output <code class="highlighter-rouge">PCollections</code> are unordered and referred to by a local +name. The SDK decides what this name is, since it will likely be embedded in +serialized UDFs.</p> + +<h3 id="pcollection-proto"><code class="highlighter-rouge">PCollection</code> proto</h3> + +<p>A <code class="highlighter-rouge">PCollection</code> just stores a coder, windowing strategy, and whether or not it +is bounded.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">PCollection</span> <span class="p">{</span> + <span class="kt">string</span> <span class="n">coder_id</span><span class="p">;</span> + <span class="n">IsBounded</span> <span class="n">is_bounded</span><span class="p">;</span> + <span class="kt">string</span> <span class="n">windowing_strategy_id</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + +<h3 id="coder-proto"><code class="highlighter-rouge">Coder</code> proto</h3> + +<p>This is a very interesting proto. A coder is a parameterized function that may +only be understood by a particular SDK, hence an <code class="highlighter-rouge">SdkFunctionSpec</code>, but also +may have component coders that fully define it. For example, a <code class="highlighter-rouge">ListCoder</code> is +only a meta-format, while <code class="highlighter-rouge">ListCoder(VarIntCoder)</code> is a fully specified format.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">Coder</span> <span class="p">{</span> + <span class="n">SdkFunctionSpec</span> <span class="n">spec</span><span class="p">;</span> + <span class="k">repeated</span> <span class="kt">string</span> <span class="n">component_coder_ids</span><span class="p">;</span> +<span class="p">}</span> +</code></pre> +</div> + +<h2 id="the-runner-api-rpcs">The Runner API RPCs</h2> + +<p>While your languageâs SDK will probably insulate you from touching the Runner +API protos directly, you may need to implement adapters for your runner, to +expose it to another language. So this section covers proto that you will +possibly interact with quite directly.</p> + +<p>The specific manner in which the existing runner method calls will be expressed +as RPCs is not implemented as proto yet. This RPC layer is to enable, for +example, building a pipeline using the Python SDK and launching it on a runner +that is written in Java. It is expected that a small Python shim will +communicate with a Java process or service hosting the Runner API.</p> + +<p>The RPCs themselves will necessarily follow the existing APIs of PipelineRunner +and PipelineResult, but altered to be the minimal backend channel, versus a +rich and convenient API.</p> + +<h3 id="pipelinerunnerrunpipeline-rpc"><code class="highlighter-rouge">PipelineRunner.run(Pipeline)</code> RPC</h3> + +<p>This will take the same form, but <code class="highlighter-rouge">PipelineOptions</code> will have to be serialized +to JSON (or a proto <code class="highlighter-rouge">Struct</code>) and passed along.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">RunPipelineRequest</span> <span class="p">{</span> + <span class="n">Pipeline</span> <span class="n">pipeline</span><span class="p">;</span> + <span class="n">Struct</span> <span class="n">pipeline_options</span><span class="p">;</span> +<span class="p">}</span> +</code></pre> +</div> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">RunPipelineResponse</span> <span class="p">{</span> + <span class="kt">bytes</span> <span class="n">pipeline_id</span><span class="p">;</span> + + <span class="c1">// TODO: protocol for rejecting pipelines that cannot be executed +</span> <span class="c1">// by this runner. May just be REJECTED job state with error message. +</span> + <span class="c1">// totally opaque to the SDK; for the shim to interpret +</span> <span class="n">Any</span> <span class="n">contents</span><span class="p">;</span> +<span class="p">}</span> +</code></pre> +</div> + +<h3 id="pipelineresult-aka-job-api"><code class="highlighter-rouge">PipelineResult</code> aka âJob APIâ</h3> + +<p>The two core pieces of functionality in this API today are getting the state of +a job and canceling the job. It is very much likely to evolve, for example to +be generalized to support draining a job (stop reading input and let watermarks +go to infinity). Today, verifying our test framework benefits (but does not +depend upon wholly) querying metrics over this channel.</p> + +<div class="language-proto no-toggle highlighter-rouge"><pre class="highlight"><code><span class="kd">message</span> <span class="nc">CancelPipelineRequest</span> <span class="p">{</span> + <span class="kt">bytes</span> <span class="n">pipeline_id</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> + +<span class="kd">message</span> <span class="nc">GetStateRequest</span> <span class="p">{</span> + <span class="kt">bytes</span> <span class="n">pipeline_id</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> + +<span class="kd">message</span> <span class="nc">GetStateResponse</span> <span class="p">{</span> + <span class="n">JobState</span> <span class="n">state</span><span class="p">;</span> + <span class="o">...</span> +<span class="p">}</span> + +<span class="kd">enum</span> <span class="n">JobState</span> <span class="p">{</span> + <span class="o">...</span> +<span class="p">}</span> +</code></pre> +</div> + + + </div> + <footer class="footer"> + <div class="footer__contained"> + <div class="footer__cols"> + <div class="footer__cols__col"> + <div class="footer__cols__col__logo"> + <img src="/images/beam_logo_circle.svg" class="footer__logo" alt="Beam logo"> + </div> + <div class="footer__cols__col__logo"> + <img src="/images/apache_logo_circle.svg" class="footer__logo" alt="Apache logo"> + </div> + </div> + <div class="footer__cols__col footer__cols__col--md"> + <div class="footer__cols__col__title">Start</div> + <div class="footer__cols__col__link"><a href="/get-started/beam-overview/">Overview</a></div> + <div class="footer__cols__col__link"><a href="/get-started/quickstart-java/">Quickstart (Java)</a></div> + <div class="footer__cols__col__link"><a href="/get-started/quickstart-py/">Quickstart (Python)</a></div> + <div class="footer__cols__col__link"><a href="/get-started/downloads/">Downloads</a></div> + </div> + <div class="footer__cols__col footer__cols__col--md"> + <div class="footer__cols__col__title">Docs</div> + <div class="footer__cols__col__link"><a href="/documentation/programming-guide/">Concepts</a></div> + <div class="footer__cols__col__link"><a href="/documentation/pipelines/design-your-pipeline/">Pipelines</a></div> + <div class="footer__cols__col__link"><a href="/documentation/runners/capability-matrix/">Runners</a></div> + </div> + <div class="footer__cols__col footer__cols__col--md"> + <div class="footer__cols__col__title">Community</div> + <div class="footer__cols__col__link"><a href="/contribute/">Contribute</a></div> + <div class="footer__cols__col__link"><a href="/contribute/team/">Team</a></div> + <div class="footer__cols__col__link"><a href="/contribute/presentation-materials/">Media</a></div> +
<TRUNCATED>