This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8650c23  ARROW-13227: [Documentation][Compute] Document ExecNode
8650c23 is described below

commit 8650c23116730f31b844591d9f8eba0530b6f172
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Fri Oct 15 14:43:16 2021 -0400

    ARROW-13227: [Documentation][Compute] Document ExecNode
    
    Closes #11309 from bkietz/13227-Document-ExecNode-ExecPla
    
    Authored-by: Benjamin Kietzman <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/compute/exec/doc/exec_node.md | 147 -------------
 docs/source/cpp/compute.rst                 |   2 +
 docs/source/cpp/getting_started.rst         |   1 +
 docs/source/cpp/simple_graph.svg            | 139 +++++++++++++
 docs/source/cpp/streaming_execution.rst     | 308 ++++++++++++++++++++++++++++
 5 files changed, 450 insertions(+), 147 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/doc/exec_node.md 
b/cpp/src/arrow/compute/exec/doc/exec_node.md
deleted file mode 100644
index 797cc87..0000000
--- a/cpp/src/arrow/compute/exec/doc/exec_node.md
+++ /dev/null
@@ -1,147 +0,0 @@
-<!---
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied.  See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-
-# ExecNodes and logical operators
-
-`ExecNode`s are intended to implement individual logical operators
-in a streaming execution graph. Each node receives batches from
-upstream nodes (inputs), processes them in some way, then pushes
-results to downstream nodes (outputs). `ExecNode`s are owned and
-(to an extent) coordinated by an `ExecPlan`.
-
-> Terminology: "operator" and "node" are mostly interchangable, like
-> "Interface" and "Abstract Base Class" in c++ space. The latter is
-> a formal and specific bit of code which implements the abstract
-> concept.
-
-## Types of logical operators
-
-Each of these will have at least one corresponding concrete
-`ExecNode`. Where possible, compatible implementations of a
-logical operator will *not* be exposed as independent subclasses
-of `ExecNode`. Instead we prefer that they be
-be encapsulated internally by a single subclass of `ExecNode`
-to permit switching between them during a query.
-
-- Scan: materializes in-memory batches from storage (e.g. Parquet
-  files, flight stream, ...)
-- Filter: evaluates an `Expression` on each input batch and outputs
-  a copy with any rows excluded for which the filter did not return
-  `true`.
-- Project: evaluates `Expression`s on each input batch to produce
-  the columns of an output batch.
-- Grouped Aggregate: identify groups based on one or more key columns
-  in each input batch, then update aggregates corresponding to those
-  groups. Node that this is a pipeline breaker; it will wait for its
-  inputs to complete before outputting any batches.
-- Union: merge two or more streams of batches into a single stream
-  of batches.
-- Write: write each batch to storage
-- ToTable: Collect batches into a `Table` with stable row ordering where
-  possible.
-
-#### Not in scope for Arrow 5.0:
-
-- Join: perform an inner, left, outer, semi, or anti join given some
-  join predicates.
-- Sort: accumulate all input batches into a single table, reorder its
-  rows by some sorting condition, then stream the sorted table out as
-  batches
-- Top-K: retrieve a limited subset of rows from a table as though it
-  were in sorted order.
-
-For example: a dataset scan with only a filter and a
-projection will correspond to a fairly trivial graph:
-
-```
-ScanNode -> FilterNode -> ProjectNode -> ToTableNode
-```
-
-A scan node loads batches from disk and pushes to a filter node.
-The filter node excludes some rows based on an `Expression` then
-pushes filtered batches to a project node. The project node
-materializes new columns based on `Expression`s then pushes those
-batches to a table collection node. The table collection node
-assembles these batches into a `Table` which is handed off as the
-result of the `ExecPlan`.
-
-## Parallelism, pipelines
-
-The execution graph is orthogonal to parallelism; any
-node may push to any other node from any thread. A scan node causes
-each batch to arrive on a thread after which it will pass through
-each node in the example graph above, never leaving that thread
-(memory/other resource pressure permitting).
-
-The example graph above happens to be simple enough that processing
-of any batch by any node is independent of other nodes and other
-batches; it is a pipeline. Note that there is no explicit `Pipeline`
-class- pipelined execution is an emergent property of some sub
-graphs.
-
-Nodes which do not share this property (pipeline breakers) are
-responsible for deciding when they have received sufficient input,
-when they can start emitting output, etc. For example a `GroupByNode`
-will wait for its input to be exhausted before it begins pushing
-batches to its own outputs.
-
-Parallelism is "seeded" by `ScanNode` (or other source nodes)- it
-owns a reference to the thread pool on which the graph is executing
-and fans out pushing to its outputs across that pool. A subsequent
-`ProjectNode` will process the batch immediately after it is handed
-off by the `ScanNode`- no explicit scheduling required.
-Eventually, individual nodes may internally
-parallelize processing of individual batches (for example, if a
-`FilterNode`'s filter expression is slow). This decision is also left
-up to each `ExecNode` implementation.
-
-# ExecNode interface and usage
-
-`ExecNode`s are constructed using one of the available factory
-functions, such as `arrow::compute::MakeFilterNode`
-or `arrow::dataset::MakeScanNode`. Any inputs to an `ExecNode`
-must be provided when the node is constructed, so the first
-nodes to be constructed are source nodes with no inputs
-such as `ScanNode`.
-
-The batches yielded by an `ExecNode` always conform precisely
-to its output schema. NB: no by-name field lookups or type
-checks are performed during execution. The output schema
-is usually derived from the output schemas of inputs. For
-example a `FilterNode`'s output schema is always identical to
-that of its input since batches are only modified by exclusion
-of some rows.
-
-An `ExecNode` will begin producing batches when
-`node->StartProducing()` is invoked and will proceed until stopped
-with `node->StopProducing()`. Started nodes may not be destroyed
-until stopped. `ExecNode`s are not currently restartable.
-An `ExecNode` pushes batches to its outputs by passing each batch
-to `output->InputReceived()`. It signals exhaustion by invoking
-`output->InputFinished()`.
-
-Error recovery is permitted within a node. For example, if evaluation
-of an `Expression` runs out of memory the governing node may
-try that evaluation again after some memory has been freed up.
-If a node experiences an error from which it cannot recover (for
-example an IO error while parsing a CSV file) then it reports this
-with `output->ErrorReceived()`. An error which escapes the scope of
-a single node should not be considered recoverable (no `FilterNode`
-should `try/catch` the IO error above).
-
diff --git a/docs/source/cpp/compute.rst b/docs/source/cpp/compute.rst
index 4131097..dd56960 100644
--- a/docs/source/cpp/compute.rst
+++ b/docs/source/cpp/compute.rst
@@ -50,6 +50,8 @@ both array (chunked or not) and scalar inputs, however some 
will mandate
 either.  For example, while ``sort_indices`` requires its first and only
 input to be an array.
 
+.. _invoking-compute-functions:
+
 Invoking functions
 ------------------
 
diff --git a/docs/source/cpp/getting_started.rst 
b/docs/source/cpp/getting_started.rst
index 3c7b7f9..36ea480 100644
--- a/docs/source/cpp/getting_started.rst
+++ b/docs/source/cpp/getting_started.rst
@@ -31,6 +31,7 @@ User Guide
    datatypes
    tables
    compute
+   streaming_execution
    io
    ipc
    parquet
diff --git a/docs/source/cpp/simple_graph.svg b/docs/source/cpp/simple_graph.svg
new file mode 100644
index 0000000..d875072
--- /dev/null
+++ b/docs/source/cpp/simple_graph.svg
@@ -0,0 +1,139 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<svg width="320pt" height="404pt"
+ viewBox="0.00 0.00 388.02 404.00" xmlns="http://www.w3.org/2000/svg"; 
xmlns:xlink="http://www.w3.org/1999/xlink";>
+<g id="graph0" class="graph" transform="scale(1 1) rotate(0) translate(4 400)">
+<title>G</title>
+<polygon fill="#ffffff" stroke="transparent" points="-4,4 -4,-400 
384.0173,-400 384.0173,4 -4,4"/>
+<!-- scan lineitem -->
+<g id="node1" class="node">
+<title>scan lineitem</title>
+<ellipse fill="none" stroke="#000000" cx="62.2569" cy="-378" rx="62.0148" 
ry="18"/>
+<text text-anchor="middle" x="62.2569" y="-373.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">scan lineitem</text>
+</g>
+<!-- filter -->
+<g id="node2" class="node">
+<title>filter</title>
+<ellipse fill="none" stroke="#000000" cx="86.2569" cy="-306" rx="29.6089" 
ry="18"/>
+<text text-anchor="middle" x="86.2569" y="-301.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">filter</text>
+</g>
+<!-- scan lineitem&#45;&gt;filter -->
+<g id="edge1" class="edge">
+<title>scan lineitem&#45;&gt;filter</title>
+<path fill="none" stroke="#000000" d="M68.3132,-359.8314C70.9767,-351.8406 
74.163,-342.2819 77.1065,-333.4514"/>
+<polygon fill="#000000" stroke="#000000" points="80.4439,-334.5071 
80.2858,-323.9134 73.8031,-332.2934 80.4439,-334.5071"/>
+</g>
+<!-- join -->
+<g id="node3" class="node">
+<title>join</title>
+<ellipse fill="none" stroke="#000000" cx="184.2569" cy="-234" rx="27" ry="18"/>
+<text text-anchor="middle" x="184.2569" y="-229.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">join</text>
+</g>
+<!-- filter&#45;&gt;join -->
+<g id="edge2" class="edge">
+<title>filter&#45;&gt;join</title>
+<path fill="none" stroke="#000000" d="M105.6186,-291.7751C120.5341,-280.8168 
141.3184,-265.5467 157.7735,-253.4572"/>
+<polygon fill="#000000" stroke="#000000" points="159.9433,-256.2062 
165.9299,-247.4648 155.7988,-250.565 159.9433,-256.2062"/>
+</g>
+<!-- join again -->
+<g id="node4" class="node">
+<title>join again</title>
+<ellipse fill="none" stroke="#000000" cx="231.2569" cy="-162" rx="49.2784" 
ry="18"/>
+<text text-anchor="middle" x="231.2569" y="-157.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">join again</text>
+</g>
+<!-- join&#45;&gt;join again -->
+<g id="edge3" class="edge">
+<title>join&#45;&gt;join again</title>
+<path fill="none" stroke="#000000" d="M195.1578,-217.3008C200.8051,-208.6496 
207.8305,-197.8873 214.1788,-188.1623"/>
+<polygon fill="#000000" stroke="#000000" points="217.224,-189.9002 
219.7594,-179.6132 211.3623,-186.0738 217.224,-189.9002"/>
+</g>
+<!-- filter again -->
+<g id="node9" class="node">
+<title>filter again</title>
+<ellipse fill="none" stroke="#000000" cx="231.2569" cy="-90" rx="53.2645" 
ry="18"/>
+<text text-anchor="middle" x="231.2569" y="-85.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">filter again</text>
+</g>
+<!-- join again&#45;&gt;filter again -->
+<g id="edge8" class="edge">
+<title>join again&#45;&gt;filter again</title>
+<path fill="none" stroke="#000000" d="M231.2569,-143.8314C231.2569,-136.131 
231.2569,-126.9743 231.2569,-118.4166"/>
+<polygon fill="#000000" stroke="#000000" points="234.757,-118.4132 
231.2569,-108.4133 227.757,-118.4133 234.757,-118.4132"/>
+</g>
+<!-- scan orders -->
+<g id="node5" class="node">
+<title>scan orders</title>
+<ellipse fill="none" stroke="#000000" cx="197.2569" cy="-378" rx="54.9752" 
ry="18"/>
+<text text-anchor="middle" x="197.2569" y="-373.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">scan orders</text>
+</g>
+<!-- project -->
+<g id="node6" class="node">
+<title>project</title>
+<ellipse fill="none" stroke="#000000" cx="184.2569" cy="-306" rx="37.6986" 
ry="18"/>
+<text text-anchor="middle" x="184.2569" y="-301.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">project</text>
+</g>
+<!-- scan orders&#45;&gt;project -->
+<g id="edge4" class="edge">
+<title>scan orders&#45;&gt;project</title>
+<path fill="none" stroke="#000000" d="M193.9765,-359.8314C192.5861,-352.131 
190.9329,-342.9743 189.3877,-334.4166"/>
+<polygon fill="#000000" stroke="#000000" points="192.8028,-333.6322 
187.5816,-324.4133 185.9142,-334.8761 192.8028,-333.6322"/>
+</g>
+<!-- project&#45;&gt;join -->
+<g id="edge5" class="edge">
+<title>project&#45;&gt;join</title>
+<path fill="none" stroke="#000000" d="M184.2569,-287.8314C184.2569,-280.131 
184.2569,-270.9743 184.2569,-262.4166"/>
+<polygon fill="#000000" stroke="#000000" points="187.757,-262.4132 
184.2569,-252.4133 180.757,-262.4133 187.757,-262.4132"/>
+</g>
+<!-- scan customers -->
+<g id="node7" class="node">
+<title>scan customers</title>
+<ellipse fill="none" stroke="#000000" cx="310.2569" cy="-306" rx="69.5216" 
ry="18"/>
+<text text-anchor="middle" x="310.2569" y="-301.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">scan customers</text>
+</g>
+<!-- aggregate -->
+<g id="node8" class="node">
+<title>aggregate</title>
+<ellipse fill="none" stroke="#000000" cx="294.2569" cy="-234" rx="48.6346" 
ry="18"/>
+<text text-anchor="middle" x="294.2569" y="-229.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">aggregate</text>
+</g>
+<!-- scan customers&#45;&gt;aggregate -->
+<g id="edge6" class="edge">
+<title>scan customers&#45;&gt;aggregate</title>
+<path fill="none" stroke="#000000" d="M306.2195,-287.8314C304.5083,-280.131 
302.4735,-270.9743 300.5717,-262.4166"/>
+<polygon fill="#000000" stroke="#000000" points="303.9348,-261.4159 
298.3488,-252.4133 297.1015,-262.9344 303.9348,-261.4159"/>
+</g>
+<!-- aggregate&#45;&gt;join again -->
+<g id="edge7" class="edge">
+<title>aggregate&#45;&gt;join again</title>
+<path fill="none" stroke="#000000" d="M279.0064,-216.5708C271.1906,-207.6385 
261.5369,-196.6056 252.9595,-186.8029"/>
+<polygon fill="#000000" stroke="#000000" points="255.5861,-184.4897 
246.367,-179.2687 250.3181,-189.0993 255.5861,-184.4897"/>
+</g>
+<!-- write to disk -->
+<g id="node10" class="node">
+<title>write to disk</title>
+<ellipse fill="none" stroke="#000000" cx="231.2569" cy="-18" rx="59.1276" 
ry="18"/>
+<text text-anchor="middle" x="231.2569" y="-13.8" font-family="Times,serif" 
font-size="14.00" fill="#000000">write to disk</text>
+</g>
+<!-- filter again&#45;&gt;write to disk -->
+<g id="edge9" class="edge">
+<title>filter again&#45;&gt;write to disk</title>
+<path fill="none" stroke="#000000" d="M231.2569,-71.8314C231.2569,-64.131 
231.2569,-54.9743 231.2569,-46.4166"/>
+<polygon fill="#000000" stroke="#000000" points="234.757,-46.4132 
231.2569,-36.4133 227.757,-46.4133 234.757,-46.4132"/>
+</g>
+</g>
+</svg>
diff --git a/docs/source/cpp/streaming_execution.rst 
b/docs/source/cpp/streaming_execution.rst
new file mode 100644
index 0000000..5e5b29a
--- /dev/null
+++ b/docs/source/cpp/streaming_execution.rst
@@ -0,0 +1,308 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. default-domain:: cpp
+.. highlight:: cpp
+.. cpp:namespace:: arrow::compute
+
+==========================
+Streaming execution engine
+==========================
+
+.. warning::
+
+    The streaming execution engine is experimental, and a stable API
+    is not yet guaranteed.
+
+Motivation
+----------
+
+For many complex computations, successive direct :ref:`invocation of
+compute functions <invoking-compute-functions>` is not feasible
+in either memory or computation time. Doing so causes all intermediate
+data to be fully materialized. To facilitate arbitrarily large inputs
+and more efficient resource usage, Arrow also provides a streaming query
+engine with which computations can be formulated and executed.
+
+.. image:: simple_graph.svg
+
+   An example graph of a streaming execution workflow.
+
+:class:`ExecNode` is provided to reify the graph of operations in a query.
+Batches of data (:struct:`ExecBatch`) flow along edges of the graph from
+node to node. Structuring the API around streams of batches allows the
+working set for each node to be tuned for optimal performance independent
+of any other nodes in the graph. Each :class:`ExecNode` processes batches
+as they are pushed to it along an edge of the graph by upstream nodes
+(its inputs), and pushes batches along an edge of the graph to downstream
+nodes (its outputs) as they are finalized.
+
+..seealso::
+
+  `SHAIKHHA, A., DASHTI, M., & KOCH, C.
+  (2018). Push versus pull-based loop fusion in query engines.
+  Journal of Functional Programming, 28.
+  <https://doi.org/10.1017/s0956796818000102>`_
+
+Overview
+--------
+
+:class:`ExecNode`
+  Each node in the graph is an implementation of the :class:`ExecNode` 
interface.
+
+:class:`ExecPlan`
+  A set of :class:`ExecNode` is contained and (to an extent) coordinated by an
+  :class:`ExecPlan`.
+
+:class:`ExecFactoryRegistry`
+  Instances of :class:`ExecNode` are constructed by factory functions held
+  in a :class:`ExecFactoryRegistry`.
+
+:class:`ExecNodeOptions`
+  Heterogenous parameters for factories of :class:`ExecNode` are bundled in an
+  :class:`ExecNodeOptions`.
+
+:struct:`Declaration`
+  ``dplyr``-inspired helper for efficient construction of an :class:`ExecPlan`.
+
+:struct:`ExecBatch`
+  A lightweight container for a single chunk of data in the Arrow format. In
+  contrast to :class:`RecordBatch`, :struct:`ExecBatch` is intended for use
+  exclusively in a streaming execution context (for example, it doesn't have a
+  corresponding Python binding). Furthermore columns which happen to have a
+  constant value may be represented by a :class:`Scalar` instead of an
+  :class:`Array`. In addition, :struct:`ExecBatch` may carry
+  execution-relevant properties including a guaranteed-true-filter
+  for :class:`Expression` simplification.
+
+
+An example :class:`ExecNode` implementation which simply passes all input 
batches
+through unchanged::
+
+    class PassthruNode : public ExecNode {
+     public:
+      // InputReceived is the main entry point for ExecNodes. It is invoked
+      // by an input of this node to push a batch here for processing.
+      void InputReceived(ExecNode* input, ExecBatch batch) override {
+        // Since this is a passthru node we simply push the batch to our
+        // only output here.
+        outputs_[0]->InputReceived(this, batch);
+      }
+
+      // ErrorReceived is called by an input of this node to report an error.
+      // ExecNodes should always forward errors to their outputs unless they
+      // are able to fully handle the error (this is rare).
+      void ErrorReceived(ExecNode* input, Status error) override {
+        outputs_[0]->ErrorReceived(this, error);
+      }
+
+      // InputFinished is used to signal how many batches will ultimately 
arrive.
+      // It may be called with any ordering relative to 
InputReceived/ErrorReceived.
+      void InputFinished(ExecNode* input, int total_batches) override {
+        outputs_[0]->InputFinished(this, total_batches);
+      }
+
+      // ExecNodes may request that their inputs throttle production of batches
+      // until they are ready for more, or stop production if no further 
batches
+      // are required.  These signals should typically be forwarded to the 
inputs
+      // of the ExecNode.
+      void ResumeProducing(ExecNode* output) override { 
inputs_[0]->ResumeProducing(this); }
+      void PauseProducing(ExecNode* output) override { 
inputs_[0]->PauseProducing(this); }
+      void StopProducing(ExecNode* output) override { 
inputs_[0]->StopProducing(this); }
+
+      // An ExecNode has a single output schema to which all its batches 
conform.
+      using ExecNode::output_schema;
+
+      // ExecNodes carry basic introspection for debugging purposes
+      const char* kind_name() const override { return "PassthruNode"; }
+      using ExecNode::label;
+      using ExecNode::SetLabel;
+      using ExecNode::ToString;
+
+      // An ExecNode holds references to its inputs and outputs, so it is 
possible
+      // to walk the graph of execution if necessary.
+      using ExecNode::inputs;
+      using ExecNode::outputs;
+
+      // StartProducing() and StopProducing() are invoked by an ExecPlan to
+      // coordinate the graph-wide execution state.  These do not need to be
+      // forwarded to inputs or outputs.
+      Status StartProducing() override { return Status::OK(); }
+      void StopProducing() override {}
+      Future<> finished() override { return inputs_[0]->finished(); }
+    };
+
+Note that each method which is associated with an edge of the graph must be 
invoked
+with an ``ExecNode*`` to identify the node which invoked it. For example, in an
+:class:`ExecNode` which implements ``JOIN`` this tagging might be used to 
differentiate
+between batches from the left or right inputs.
+``InputReceived``, ``ErrorReceived``, ``InputFinished`` may only be invoked by
+the inputs of a node, while ``ResumeProducing``, ``PauseProducing``, 
``StopProducing``
+may only be invoked by outputs of a node.
+
+:class:`ExecPlan` contains the associated instances of :class:`ExecNode`
+and is used to start and stop execution of all nodes and for querying/awaiting
+their completion::
+
+    // construct an ExecPlan first to hold your nodes
+    ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make(default_exec_context()));
+
+    // ... add nodes to your ExecPlan
+
+    // start all nodes in the graph
+    ARROW_RETURN_NOT_OK(plan->StartProducing());
+
+    SetUserCancellationCallback([plan] {
+      // stop all nodes in the graph
+      plan->StopProducing();
+    });
+
+    // Complete will be marked finished when all nodes have run to completion
+    // or acknowledged a StopProducing() signal. The ExecPlan should be kept
+    // alive until this future is marked finished.
+    Future<> complete = plan->finished();
+
+
+Constructing ``ExecPlan``s
+--------------------------
+
+.. warning::
+
+    The following will be superceded by construction from Compute IR, see 
ARROW-14074.
+
+None of the concrete implementations of :class:`ExecNode` are exposed
+in headers, so they can't be constructed directly outside the
+translation unit where they are defined. Instead, factories to
+create them are provided in an extensible registry. This structure
+provides a number of benefits:
+
+- This enforces consistent construction.
+- It decouples implementations from consumers of the interface
+  (for example: we have two classes for scalar and grouped aggregate,
+   we can choose which to construct within the single factory by
+   checking whether grouping keys are provided)
+- This expedites integration with out-of-library extensions. For example
+  "scan" nodes are implemented in the separate ``libarrow_dataset.so`` library.
+- Since the class is not referencable outside the translation unit in which it
+  is defined, compilers can optimize more aggressively.
+
+Factories of :class:`ExecNode` can be retrieved by name from the registry.
+The default registry is available through
+:func:`arrow::compute::default_exec_factory_registry()`
+and can be queried for the built-in factories::
+
+    // get the factory for "filter" nodes:
+    ARROW_ASSIGN_OR_RAISE(auto make_filter,
+                          
default_exec_factory_registry()->GetFactory("filter"));
+
+    // factories take three arguments:
+    ARROW_ASSIGN_OR_RAISE(ExecNode* filter_node, *make_filter(
+        // the ExecPlan which should own this node
+        plan.get(),
+
+        // nodes which will send batches to this node (inputs)
+        {scan_node},
+
+        // parameters unique to "filter" nodes
+        FilterNodeOptions{filter_expression}));
+
+    // alternative shorthand:
+    ARROW_ASSIGN_OR_RAISE(filter_node, MakeExecNode("filter",
+        plan.get(), {scan_node}, FilterNodeOptions{filter_expression});
+
+Factories can also be added to the default registry as long as they are
+convertible to ``std::function<Result<ExecNode*>(
+ExecPlan*, std::vector<ExecNode*>, const ExecNodeOptions&)>``.
+
+To build an :class:`ExecPlan` representing a simple pipeline which
+reads from a :class:`RecordBatchReader` then filters, projects, and
+writes to disk::
+
+    std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
+    ExecNode* source_node = *MakeExecNode("source", plan.get(), {},
+                                          SourceNodeOptions::FromReader(
+                                              reader,
+                                              GetCpuThreadPool()));
+
+    ExecNode* filter_node = *MakeExecNode("filter", plan.get(), {source_node},
+                                          FilterNodeOptions{
+                                            greater(field_ref("score"), 
literal(3))
+                                          });
+
+    ExecNode* project_node = *MakeExecNode("project", plan.get(), 
{filter_node},
+                                           ProjectNodeOptions{
+                                             {add(field_ref("score"), 
literal(1))},
+                                             {"score + 1"}
+                                           });
+
+    arrow::dataset::internal::Initialize();
+    MakeExecNode("write", plan.get(), {project_node},
+                 WriteNodeOptions{/*base_dir=*/"/dat", /*...*/});
+
+:struct:`Declaration` is a `dplyr <https://dplyr.tidyverse.org>`_-inspired
+helper which further decreases the boilerplate associated with populating
+an :class:`ExecPlan` from C++::
+
+    arrow::dataset::internal::Initialize();
+
+    std::shared_ptr<RecordBatchReader> reader = GetStreamOfBatches();
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"source", SourceNodeOptions::FromReader(
+                           reader,
+                           GetCpuThreadPool())},
+                      {"filter", FilterNodeOptions{
+                           greater(field_ref("score"), literal(3))}},
+                      {"project", ProjectNodeOptions{
+                           {add(field_ref("score"), literal(1))},
+                           {"score + 1"}}},
+                      {"write", WriteNodeOptions{/*base_dir=*/"/dat", 
/*...*/}},
+                  })
+                  .AddToPlan(plan.get()));
+
+Note that a source node can wrap anything which resembles a stream of batches.
+For example, `PR#11032 <https://github.com/apache/arrow/pull/11032>`_ adds
+support for use of a `DuckDB <https://duckdb.org>`_ query as a source node.
+Similarly, a sink node can wrap anything which absorbs a stream of batches.
+In the example above we're writing completed
+batches to disk. However we can also collect these in memory into a 
:class:`Table`
+or forward them to a :class:`RecordBatchReader` as an out-of-graph stream.
+This flexibility allows an :class:`ExecPlan` to be used as streaming middleware
+between any endpoints which support Arrow formatted batches.
+
+An :class:`arrow::dataset::Dataset` can also be wrapped as a source node which
+pushes all the dataset's batches into an :class:`ExecPlan`. This factory is 
added
+to the default registry with the name ``"scan"`` by calling
+``arrow::dataset::internal::Initialize()``::
+
+    arrow::dataset::internal::Initialize();
+
+    std::shared_ptr<Dataset> dataset = GetDataset();
+
+    ASSERT_OK(Declaration::Sequence(
+                  {
+                      {"scan", ScanNodeOptions{dataset,
+                         /* push down predicate, projection, ... */}},
+                      {"filter", FilterNodeOptions{/* ... */}},
+                      // ...
+                  })
+                  .AddToPlan(plan.get()));
+
+Datasets may be scanned multiple times; just make multiple scan
+nodes from that dataset. (Useful for a self-join, for example.)
+Note that producing two scan nodes like this will perform all
+reads and decodes twice.

Reply via email to