This is an automated email from the ASF dual-hosted git repository.

zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8590bc621a GH-46475: [Documentation][C++][Compute] Consolidate Acero 
developer docs (#46476)
8590bc621a is described below

commit 8590bc621a868212bc8a184d0a1022f2ba87fd56
Author: Rossi Sun <[email protected]>
AuthorDate: Wed May 21 08:16:26 2025 -0700

    GH-46475: [Documentation][C++][Compute] Consolidate Acero developer docs 
(#46476)
    
    ### What changes are included in this PR?
    
    1. Move current Acero Developer's Guild under user doc to where under 
developer docs;
    2. Make it the main doc enclosing elaborating docs like Swiss table.
    
    ### Are these changes tested?
    
    No need.
    
    ### Are there any user-facing changes?
    
    None.
    * GitHub Issue: #46475
    
    Lead-authored-by: Rossi Sun <[email protected]>
    Co-authored-by: Bryce Mecum <[email protected]>
    Signed-off-by: Rossi Sun <[email protected]>
---
 .../cpp/{streaming_execution.rst => acero.rst}     |    6 +-
 docs/source/cpp/acero/developer_guide.rst          |  692 -----------
 docs/source/cpp/user_guide.rst                     |    2 +-
 docs/source/developers/cpp/acero.rst               | 1201 +++++++++++---------
 .../cpp/{ => acero}/img/swiss_table_1.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_10.jpg         |  Bin
 .../cpp/{ => acero}/img/swiss_table_11.jpg         |  Bin
 .../cpp/{ => acero}/img/swiss_table_2.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_3.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_4.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_5.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_6.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_7.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_8.jpg          |  Bin
 .../cpp/{ => acero}/img/swiss_table_9.jpg          |  Bin
 .../cpp/{acero.rst => acero/swiss_table.rst}       |   34 +-
 .../{cpp/acero => developers/cpp/img}/async.md     |    0
 .../{cpp/acero => developers/cpp/img}/async.svg    |    0
 .../acero => developers/cpp/img}/dist_plan.svg     |    0
 .../acero => developers/cpp/img}/microbatch.svg    |    0
 .../{cpp/acero => developers/cpp/img}/ordered.svg  |    0
 .../{cpp/acero => developers/cpp/img}/pipeline.svg |    0
 .../acero => developers/cpp/img}/pipeline_task.svg |    0
 docs/source/java/substrait.rst                     |    4 +-
 docs/source/python/api/acero.rst                   |    2 +-
 docs/source/python/integration/substrait.rst       |    4 +-
 26 files changed, 694 insertions(+), 1251 deletions(-)

diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/acero.rst
similarity index 90%
rename from docs/source/cpp/streaming_execution.rst
rename to docs/source/cpp/acero.rst
index 1cd1540428..ddeffffa74 100644
--- a/docs/source/cpp/streaming_execution.rst
+++ b/docs/source/cpp/acero.rst
@@ -27,6 +27,11 @@ Acero: A C++ streaming execution engine
 
     Acero is experimental and a stable API is not yet guaranteed.
 
+.. note::
+
+    If you are interested in contributing to Acero or learning about its
+    internals, please see the :doc:`Acero Developer's 
Guide<../developers/cpp/acero>`.
+
 For many complex computations, successive direct :ref:`invocation of
 compute functions <invoking-compute-functions>` is not feasible
 in either memory or computation time. To facilitate arbitrarily large inputs
@@ -44,4 +49,3 @@ be formulated and executed.
    acero/overview
    acero/user_guide
    acero/substrait
-   acero/developer_guide
diff --git a/docs/source/cpp/acero/developer_guide.rst 
b/docs/source/cpp/acero/developer_guide.rst
deleted file mode 100644
index 7dd08fe3ce..0000000000
--- a/docs/source/cpp/acero/developer_guide.rst
+++ /dev/null
@@ -1,692 +0,0 @@
-.. Licensed to the Apache Software Foundation (ASF) under one
-.. or more contributor license agreements.  See the NOTICE file
-.. distributed with this work for additional information
-.. regarding copyright ownership.  The ASF licenses this file
-.. to you under the Apache License, Version 2.0 (the
-.. "License"); you may not use this file except in compliance
-.. with the License.  You may obtain a copy of the License at
-
-..   http://www.apache.org/licenses/LICENSE-2.0
-
-.. Unless required by applicable law or agreed to in writing,
-.. software distributed under the License is distributed on an
-.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-.. KIND, either express or implied.  See the License for the
-.. specific language governing permissions and limitations
-.. under the License.
-
-.. default-domain:: cpp
-.. highlight:: cpp
-.. cpp:namespace:: arrow::acero
-
-=================
-Developer's Guide
-=================
-
-This page goes into more detail into the design of Acero.  It discusses how
-to create custom exec nodes and describes some of the philosophies behind 
Acero's
-design and implementation.  Finally, it gives an overview of how to extend 
Acero
-with new behaviors and how this new behavior can be upstreamed into the core 
Arrow
-repository.
-
-Understanding ExecNode
-======================
-
-ExecNode is an abstract class with several pure virtual methods that control 
how the node operates:
-
-:func:`ExecNode::StartProducing`
---------------------------------
-
-This method is called once at the start of the plan.  Most nodes ignore this 
method (any
-necessary initialization should happen in the constructor or Init).  However, 
source nodes
-will typically provide a custom implementation.  Source nodes should schedule 
whatever tasks
-are needed to start reading and providing the data.  Source nodes are usually 
the primary
-creator of tasks in a plan.
-
-.. note::
-   The ExecPlan operates on a push-based model.  Sources are often pull-based. 
 For example,
-   your source may be an iterator.  The source node will typically then 
schedule tasks to pull one
-   item from the source and push that item into the source's output node (via 
``InputReceived``).
-
-Examples
-^^^^^^^^
-
-* In the ``table_source`` node the input table is divided into batches.  A 
task is created for
-  each batch and that task calls ``InputReceived`` on the node's output.
-* In the ``scan`` node a task is created to start listing fragments from the 
dataset.  Each listing
-  task then creates tasks to read batches from the fragment, asynchronously.  
When the batch is
-  full read in then a continuation schedules a new task with the exec plan.  
This task calls
-  ``InputReceived`` on the scan node's output.
-
-:func:`ExecNode::InputReceived`
--------------------------------
-
-This method is called many times during the execution of a plan.  It is how 
nodes pass data
-to each other.  An input node will call InputReceived on its output.  Acero's 
execution model
-is push-based.  Each node pushes data into its output by calling InputReceived 
and passing in
-a batch of data.
-
-The InputReceived method is often where the actual work happens for the node.  
For example,
-a project node will execute its expressions and create a new expanded output 
batch.  It will then
-call InputReceived on its output.  InputReceived will never be called on a 
source node.  Sink
-nodes will never call InputReceived.  All other nodes will experience both.
-
-Some nodes (often called "pipeline breakers") must accumulate input before 
they can generate
-any output.  For example, a sort node must accumulate all input before it can 
sort the data and
-generate output.  In these nodes the InputReceived method will typically place 
the data into
-some kind of accumulation queue.  If the node doesn't have enough data to 
operate then it will
-not call InputReceived.  This will then be the end of the current task.
-
-Examples
-^^^^^^^^
-
-* The ``project`` node runs its expressions, using the received batch as input 
for the expression.
-  A new batch is created from the input batch and the result of the 
expressions.  The new batch is
-  given the same order index as the input batch and the node then calls 
``InputReceived`` on its
-  output.
-* The ``order_by`` node inserts the batch into an accumulation queue.  If this 
was the last batch
-  then the node will sort everything in the accumulation queue.  The node will 
then call
-  ``InputReceived`` on the output for each batch in the sorted result.  A new 
batch index will be
-  assigned to each batch.  Note that this final output step might also occur 
as a result of a call
-  to ``InputFinished`` (described below).
-
-:func:`ExecNode::InputFinished`
--------------------------------
-
-This method will be called once per input.  A node will call InputFinished on 
its output once it
-knows how many batches it will be sending to that output.  Normally this 
happens when the node is
-finished working.  For example, a scan node will call InputFinished once it 
has finished reading
-its files.  However, it could call it earlier if it knows (maybe from file 
metadata) how many
-batches will be created.
-
-Some nodes will use this signal to trigger some processing.  For example, a 
sort node need to
-wait until it has received all of its input before it can sort the data.  It 
relies on the InputFinished
-call to know this has happened.
-
-Even if a node is not doing any special processing when finished (e.g. a 
project node or filter node
-doesn't need to do any end-of-stream processing) that node will still call 
InputFinished on its
-output.
-
-.. warning::
-   The InputFinished call might arrive before the final call to InputReceived. 
 In fact, it could
-   even be sent out before any calls to InputReceived begin.  For example, the 
table source node
-   always knows exactly how many batches it will be producing.  It could 
choose to call InputFinished
-   before it ever calls InputReceived.  If a node needs to do "end-of-stream" 
processing then it typically
-   uses an AtomicCounter which is a helper class to figure out when all of the 
data has arrived.
-
-Examples
-^^^^^^^^
-
-* The ``order_by`` checks to see if it has already received all its batches.  
If it has then it performs
-  the sorting step described in the ``InputReceived`` example.  Before it 
starts sending output data it
-  checks to see how many output batches it has (it's possible the batch size 
changed as part of the
-  accumulating or sorting) and calls ``InputFinished`` on the node's output.
-* The ``fetch`` node, during a call to ``InputReceived`` realizes it has 
received all the rows it was
-  asked for.  It calls ``InputFinished`` on its output immediately (even 
though its own ``InputFinished``
-  method has not yet been called)
-
-:func:`ExecNode::PauseProducing` / :func:`ExecNode::ResumeProducing`
----------------------------------------------------------------------
-
-These methods control backpressure.  Some nodes may need to pause their input 
to avoid accumulating
-too much data.  For example, when the user is consuming the plan with a 
RecordBatchReader we use a
-SinkNode.  The SinkNode places data in a queue that the RecordBatchReader 
pulls from (this is a
-conversion from a push-model to a pull-model).  If the user is reading the 
RecordBatchReader slowly then
-it is possible this queue will start to fill up.  For another example we can 
consider the write node.
-This node writes data to a filesystem.  If the writes are slow then data might 
accumulate at the
-write node.  As a result, the write node would need to apply backpressure.
-
-When a node realizes that it needs to apply some backpressure it will call 
PauseProducing on its input.
-Once the node has enough space to continue it will then call ResumeProducing 
on its input.  For example,
-the SinkNode would pause when its queue gets too full.  As the user continues 
to read from the
-RecordBatchReader we can expect the queue to slowly drain.  Once the queue has 
drained enough then the
-SinkNode can call ResumeProducing.
-
-Source nodes typically need to provide special behavior for PauseProducing and 
ResumeProducing.  For
-example, a scan node that is reading from a file can pause reading the file.  
However, some source nodes
-may not be able to pause in any meaningful way.  There is not much point in a 
table source node pausing
-because its data is already in memory.
-
-Nodes that are neither source or sink should still forward backpressure 
signals.  For example, when
-PauseProducing is called on a project node it should call PauseProducing on 
its input.  If a node has
-multiple inputs then it should forward the signal to every input.
-
-Examples
-^^^^^^^^
-
-* The ``write`` node, in its ``InputReceived`` method, adds a batch to a 
dataset writer's queue.  If the
-  dataset writer is then full it will return an unfinished future that will 
complete when it has more room.
-  The ``write`` node then calls ``PauseProducing`` on its input.  It then adds 
a continuation to the future
-  that will call ``ResumeProducing`` on its input.
-* The ``scan`` node uses an :class:`AsyncTaskScheduler` to keep track of all 
the tasks it schedules.  This
-  scheduler is throttled to limit how much concurrent I/O the ``scan`` node is 
allowed to perform.  When
-  ``PauseProducing`` is called then the node will pause the scheduler.  This 
means that any tasks queued
-  behind the throttle will not be submitted.  However, any ongoing I/O will 
continue (backpressure can't
-  take effect immediately).  When ``ResumeProducing`` is called the ``scan`` 
node will unpause the scheduler.
-
-:func:`ExecNode::StopProducing`
--------------------------------
-
-StopProducing is called when a plan needs to end early.  This can happen 
because the user cancelled
-the plan and it can happen because an error occurred.  Most nodes do not need 
to do anything here.
-There is no expectation or requirement that a node sends any remaining data it 
has.  Any node that
-schedules tasks (e.g. a source node) should stop producing new data.
-
-In addition to plan-wide cancellation, a node may call this method on its 
input if it has decided
-that it has received all the data that it needs.  However, because of 
parallelism, a node may still
-receive a few calls to ``InputReceived`` after it has stopped its input.
-
-If any external resources are used then cleanup should happen as part of this 
call.
-
-Examples
-^^^^^^^^
-
-* The ``asofjoin`` node has a dedicated processing thread the communicates 
with the main Acero threads
-  using a queue.  When ``StopProducing`` is called the node inserts a poison 
pill into the queue.  This
-  tells the processing thread to stop immediately.  Once the processing thread 
stops it marks its external
-  task (described below) as completed which allows the plan to finish.
-* The ``fetch`` node, in ``InputReceived``, may decide that it has all the 
data it needs.  It can then call
-  ``StopProducing`` on its input.
-
-Initialization / Construction / Destruction
--------------------------------------------
-
-Simple initialization logic (that cannot error) can be done in the 
constructor.  If the initialization
-logic may return an invalid status then it can either be done in the exec 
node's factory method or
-the ``Init`` method.  The factory method is preferred for simple validation.  
The ``Init`` method is
-preferred if the initialization might do expensive allocation or other 
resource consumption.  ``Init`` will
-always be called before ``StartProducing`` is called.  Initialization could 
also be done in
-``StartProducing`` but keep in mind that other nodes may have started by that 
point.
-
-In addition, there is a ``Validate`` method that can be overloaded to provide 
custom validation.  This
-method is normally called before ``Init`` but after all inputs and outputs 
have been added.
-
-Finalization happens today in the destructor.  There are a few examples today 
where that might be slow.
-For example, in the write node, if there was an error during the plan, then we 
might close out some open
-files here.  Should there be significant finalization that is either 
asynchronous or could potentially
-trigger an error then we could introduce a Finalize method to the ExecNode 
lifecycle.  It hasn't been
-done yet only because it hasn't been needed.
-
-Summary
--------
-
-.. list-table:: ExecNode Lifecycle
-   :widths: 20 40 40
-   :header-rows: 1
-
-   * - Method Name
-     - This is called when...
-     - A node calls this when...
-   * - StartProducing
-     - The plan is starting
-     - N/A
-   * - InputReceived
-     - Data is received from the input
-     - To send data to the output
-   * - InputFinished
-     - The input knows how many batches there are
-     - The node can tell its output how many batches there are
-   * - StopProducing
-     - A plan is aborted or an output has enough data
-     - A node has all the data it needs
-
-Extending Acero
-===============
-
-Acero instantiates a singleton :class:`ExecFactoryRegistry` which maps between 
names and exec node
-factories (methods which create an ExecNode from options).  To create a new 
ExecNode you can register
-the node with this registry and your node will now be usable by Acero.  If you 
would like to be able
-to use this node with Substrait plans you will also need to configure the 
Substrait registry so that it
-knows how to map Substrait to your custom node.
-
-This means that you can create and add new nodes to Acero without recompiling 
Acero from source.
-
-Scheduling and Parallelism
-==========================
-
-There are many ways in that data engines can utilize multiple compute 
resources (e.g. multiple cores).
-Before we get into the details of Acero's scheduling we will cover a few high 
level topics.
-
-Parallel Execution of Plans
----------------------------
-
-Users may want to execute multiple plans concurrently and they are welcome to 
do so.  However, Acero has no
-concept of inter-plan scheduling.  Each plan will attempt to maximize its 
usage of compute resources and
-there will likely be contention of CPU and memory and disk resources.  If 
plans are using the default CPU &
-I/O thread pools this will be mitigated somewhat since they will share the 
same thread pool.
-
-Locally Distributed Plans
--------------------------
-
-A common way to tackle multi-threading is to split the input into partitions 
and then create a plan for
-each partition and then merge the results from these plans in some way.  For 
example, let's assume you
-have 20 files and 10 cores and you want to read and sort all the data.  You 
could create a plan for every
-2 files to read and sort those files.  Then you could create one extra plan 
that takes the input from these
-10 child plans and merges the 10 input streams in a sorted fashion.
-
-This approach is popular because it is how queries are distributed across 
multiple servers and so it
-is widely supported and well understood.  Acero does not do this today but 
there is no reason to prevent it.
-Adding shuffle & partition nodes to Acero should be a high priority and would 
enable Acero to be used by
-distributed systems.  Once that has been done then it should be possible to do 
a local shuffle (local
-meaning exchanging between multiple exec plan instances on a single system) if 
desired.
-
-.. figure:: dist_plan.svg
-
-   A distributed plan can provide parallelism even if the plans themselves run 
serially
-
-Pipeline Parallelism
---------------------
-
-Acero attempts to maximize parallelism using pipeline parallelism.  As each 
batch of data arrives from the
-source we immediately create a task and start processing it.  This means we 
will likely start processing
-batch X before the processing of batch X-1 has completed.  This is very 
flexible and powerful.  However, it also
-means that properly implementing an ExecNode is difficult.
-
-For example, an ExecNode's InputReceived method should be reentrant.  In other 
words, it should be expected
-that InputReceived will be called before the previous call to InputReceived 
has completed.  This means that
-nodes with any kind of mutable state will need mutexes or similar mechanisms 
to protect that state from race
-conditions.  It also means that tasks can easily get out of order and nodes 
should not expect any particular ordering
-of their input (more on this later).
-
-.. figure:: pipeline.svg
-
-   An example of pipeline parallelism on a system with 3 CPU threads and 2 I/O 
threads
-
-Asynchronicity
---------------
-
-Some operations take a long time and may not require the CPU.  Reading data 
from the filesystem is one example.  If we
-only have one thread per core then time will be wasted while we wait for these 
operations to complete.  There
-are two common solutions to this problem.  A synchronous solution is often to 
create more threads than there are
-cores with the expectation that some of them will be blocked and that is ok.  
This approach tends to be simpler
-but it can lead to excess thread contention and requires fine-tuning.
-
-Another solution is to make the slow operations asynchronous.  When the slow 
operation starts the caller gives up
-the thread and allows other tasks to run in the meantime.  Once the slow 
operation finishes then a new task is
-created to take the result and continue processing.  This helps to minimize 
thread contention but tends to be
-more complex to implement.
-
-Due to a lack of standard C++ async APIs, Acero uses a combination of the two 
approaches.  Acero has two thread pools.
-The first is the CPU thread pool.  This thread pool has one thread per core.  
Tasks in this thread pool should never
-block (beyond minor delays for synchronization) and should generally be 
actively using CPU as much as possible.  Threads
-on the I/O thread pool are expected to spend most of the time idle.  They 
should avoid doing any CPU-intensive work.
-Their job is basically to wait for data to be available and schedule follow-up 
tasks on the CPU thread pool.
-
-.. figure:: async.svg
-
-   Arrow achieves asynchronous execution by combining CPU & I/O thread pools
-
-.. note::
-
-   Most nodes in Acero do not need to worry about asynchronicity.  They are 
fully synchronous and do not spawn tasks.
-
-Task per Pipeline (and sometimes beyond)
-----------------------------------------
-
-An engine could choose to create a thread task for every execution of a node.  
However, without careful scheduling,
-this leads to problems with cache locality.  For example, let's assume we have 
a basic plan consisting of three
-exec nodes, scan, project, and then filter (this is a very common use case).  
Now let's assume there are 100 batches.
-In a task-per-operator model we would have tasks like "Scan Batch 5", "Project 
Batch 5", and "Filter Batch 5".  Each
-of those tasks is potentially going to access the same data.  For example, 
maybe the ``project`` and ``filter`` nodes need
-to read the same column.  A column which is intially created in a decode phase 
of the ``scan`` node.  To maximize cache
-utilization we would need to carefully schedule our tasks to ensure that all 
three of those tasks are run consecutively
-and assigned to the same CPU core.
-
-To avoid this problem we design tasks that run through as many nodes as 
possible before the task ends.  This sequence
-of nodes is often referred to as a "pipeline" and the nodes that end the 
pipeline (and thus end the task) are often
-called "pipeline breakers".  Some nodes might even fall somewhere in between.  
For example, in a hash join node, when
-we receive a batch on the probe side, and the hash table has been built, we do 
not need to end the task and instead keep
-on running.  This means that tasks might sometimes end at the join node and 
might sometimes continue past the join node.
-
-.. figure:: pipeline_task.svg
-
-   A logical view of pipelines in a plan and two tasks, showing that pipeline 
boundaries may vary during a plan
-
-
-Thread Pools and Schedulers
----------------------------
-
-The CPU and I/O thread pools are a part of the core Arrow-C++ library.  They 
contain a FIFO queue of tasks and will
-execute them as a thread is available.  For Acero we need additional 
capabilities.  For this we use the
-AsyncTaskScheduler.  In the simplest mode of operation the scheduler simply 
submits tasks to an underlying thread pool.
-However, it is also capable of creating sub-schedulers which can apply 
throttling, prioritization, and task tracking:
-
- * A throttled scheduler associates a cost with each task.  Tasks are only 
submitted to the underlying scheduler
-   if there is room.  If there is not then the tasks are placed in a queue.  
The write node uses a throttle of size
-   1 to avoid reentrantly calling the dataset writer (the dataset writer does 
its own internal scheduling).  A throttled
-   scheduler can be manually paused and unpaused.  When paused all tasks are 
queued and queued tasks will not be submitted
-   even if there is room.  This can be useful in source nodes to implement 
PauseProducing and ResumeProducing.
- * Priority can be applied to throttled schedulers to control the order in 
which queued tasks are submitted.  If
-   there is room a task is submitted immediately (regardless of priority).  
However, if the throttle is full then
-   the task is queued and subject to prioritization.  The scan node throttles 
how many read requests it generates
-   and prioritizes reading a dataset in order, if possible.
- * A task group can be used to keep track of a collection of tasks and run a 
finalization task when all of the
-   tasks have completed.  This is useful for fork-join style problems.  The 
write node uses a task group to close
-   a file once all outstanding write tasks for the file have completed.
-
-There is research and examples out there for different ways to prioritize 
tasks in an execution engine.  Acero has not
-yet had to address this problem.  Let's go through some common situations:
-
- * Engines will often prioritize reading from the build side of a join node 
before reading from the probe side.  This
-   would be more easily handled in Acero by applying backpressure.
- * Another common use case is to control memory accumulation.  Engines will 
prioritize tasks which are closer to the
-   sink node in an effort to relieve memory pressure.  However, Acero 
currently assumes that spilling will be added
-   at pipeline breakers and that memory usage in a plan will be more or less 
static (per core) and well below the
-   limits of the hardware.  This might change if Acero needs to be used in an 
environment where there are many compute
-   resources and limited memory (e.g. a GPU)
- * Engines will often use work stealing algorithms to prioritize running tasks 
on the same core to improve cache
-   locality.  However, since Acero uses a task-per-pipeline model there isn't 
much lost opportunity for cache
-   parallelism that a scheduler could reclaim.  Tasks only end when there is 
no more work that can be done with the data.
-
-While there is not much prioritization in place in Acero today we do have the 
tools to apply it should we need to.
-
-.. note::
-   In addition to the AsyncTaskScheduler there is another class called the 
TaskScheduler.  This class predates the
-   AsyncTaskScheduler and was designed to offer task tracking for highly 
efficient synchronous fork-join workloads.
-   If this specialized purpose meets your needs then you may consider using 
it.  It would be interesting to profile
-   this against the AsyncTaskScheduler and see how closely the two compare.
-
-Intra-node Parallelism
-----------------------
-
-Some nodes can potentially exploit parallelism within a task.  For example, in 
the scan node we can decode
-columns in parallel.  In the hash join node, parallelism is sometimes 
exploited for complex tasks such as
-building the hash table.  This sort of parallelism is less common but not 
necessarily discouraged.  Profiling should
-be done first though to ensure that this extra parallelism will be helpful in 
your workload.
-
-All Work Happens in Tasks
--------------------------
-
-All work in Acero happens as part of a task.  When a plan is started the 
AsyncTaskScheduler is created and given an
-initial task.  This initial task calls StartProducing on the nodes.  Tasks may 
schedule additional tasks.  For example,
-source nodes will usually schedule tasks during the call to StartProducing.  
Pipeline breakers will often schedule tasks
-when they have accumulated all the data they need.  Once all tasks in a plan 
are finished then the plan is considered
-done.
-
-Some nodes use external threads.  These threads must be registered as external 
tasks using the BeginExternalTask method.
-For example, the asof join node uses a dedicated processing thread to achieve 
serial execution.  This dedicated thread
-is registered as an external task.  External tasks should be avoided where 
possible because they require careful
-handling to avoid deadlock in error situations.
-
-Ordered Execution
-=================
-
-Some nodes either establish an ordering to their outgoing batches or they need 
to be able to process batches in order.
-Acero handles ordering using the ``batch_index`` property on an ExecBatch.  If 
a node has a deterministic output order
-then it should apply a batch index on batches that it emits.  For example, the 
OrderByNode applies a new ordering to
-batches (regardless of the incoming ordering).  The scan node is able to 
attach an implicit ordering to batches which
-reflects the order of the rows in the files being scanned.
-
-If a node needs to process data in order then it is a bit more complicated.  
Because of the parallel nature of execution
-we cannot guarantee that batches will arrive at a node in order.  However, 
they can generally be expected to be "mostly
-ordered".  As a result, we can insert the batches into a sequencing queue.  
The sequencing queue is given a callback which
-is guaranteed to run on the batches, serially, in order.  For example, the 
fetch node uses a sequencing queue.  The callback
-checks to see if we need to include part or all of the batch, and then slices 
the batch if needed.
-
-Even if a node does not care about order it should try and maintain the batch 
index if it can.  The project and filter
-nodes do not care about order but they ensure that output batches keep the 
same index as their input batches.  The filter
-node will even emit empty batches if it needs to so that it can maintain the 
batch order without gaps.
-
-.. figure:: ordered.svg
-
-   An example of ordered execution
-
-
-Partitioned Execution
-=====================
-
-A stream is partitioned (or sometimes called segmented) if rows are grouped 
together in some way.  Currently there is not
-a formal notion of partitioning.  However, one is starting to develop (e.g. 
segmented aggregation) and we may end up
-introducing a more formal notion of partitions to Acero at some point as well.
-
-Spillover
-=========
-
-Spillover has not yet been implemented in Acero.
-
-Distributed Execution
-=====================
-
-There are certain exec nodes which are useful when an engine is used in a 
distributed environment.  The terminology
-can vary so we will use the Substrait terminology.  An exchange node sends 
data to different workers.  Often this is
-a partitioned exchange so that Acero is expected to partition each batch and 
distribute partitions across N different
-workers.  On the other end we have the capture node.  This node receives data 
from different workers.
-
-These nodes do not exist in Acero today.  However, they would be in scope and 
we hope to have such nodes someday.
-
-Profiling & Tracing
-===================
-
-Acero's tracing is currently half-implemented and there are major gaps in 
profiling tools.  However, there has been some
-effort at tracing with open telemetry and most of the necessary pieces are in 
place.  The main thing currently lacking is
-some kind of effective visualization of the tracing results.
-
-In order to use the tracing that is present today you will need to build with 
Arrow with ``ARROW_WITH_OPENTELEMETRY=ON``.
-Then you will need to set the environment variable 
``ARROW_TRACING_BACKEND=otlp_http``.  This will configure open telemetry
-to export trace results (as OTLP) to the HTTP endpoint 
http://localhost:4318/v1/traces.  You will need to configure an
-open telemetry collector to collect results on that endpoint and you will need 
to configure a trace viewer of some kind
-such as Jaeger: https://www.jaegertracing.io/docs/1.21/opentelemetry/
-
-Benchmarking
-============
-
-The most complete macro benchmarking for Acero is provided by 
https://github.com/voltrondata-labs/arrowbench
-These include a set of TPC-H benchmarks, executed from the R-dplyr 
integration, which are run on every Arrow commit and
-reported to Conbench at https://conbench.ursa.dev/
-
-In addition to these TPC-H benchmarks there are a number of micro-benchmarks 
for various nodes (hash-join, asof-join,
-etc.)  Finally, the compute functions themselves should mostly have 
micro-benchmarks.  For more on micro benchmarks you
-can refer to https://arrow.apache.org/docs/developers/benchmarks.html
-
-Any new functionality should include micro benchmarks to avoid regressions.
-
-Bindings
-========
-
-Public API
-----------
-
-The public API for Acero consists of Declaration and the various 
DeclarationToXyz methods.  In addition the
-options classes for each node are part of the public API.  However, nodes are 
extensible and so this API is
-extensible.
-
-R (dplyr)
----------
-
-Dplyr is an R library for programmatically building queries.  The arrow-r 
package has dplyr bindings which
-adapt the dplyr API to create Acero execution plans.  In addition, there is a 
dplyr-substrait backend that
-is in development which could eventually replace the Acero-aware binding.
-
-Python
-------
-
-The pyarrow library binds to Acero in two different ways.  First, there is a 
direct binding in pyarrow.acero
-which directly binds to the public API.  Second, there are a number of compute 
utilities like
-pyarrow.Table.group_by which uses Acero, though this is invisible to the user.
-
-Java
-----
-
-The Java implementation exposes some capabilities from Arrow datasets.  These 
use Acero implicitly.  There
-are no direct bindings to Acero or Substrait in the Java implementation today.
-
-Design Philosophies
-===================
-
-Engine Independent Compute
---------------------------
-
-If a node requires complex computation then it should encapsulate that work in 
abstractions that don't depend on
-any particular engine design.  For example, the hash join node uses utilities 
such as a row encoder, a hash table,
-and an exec batch builder.  Other places share implementations of sequencing 
queues and row segmenters.  The node
-itself should be kept minimal and simply maps from Acero to the abstraction.
-
-This helps to decouple designs from Acero's design details and allows them to 
be more resilient to changes in the
-engine.  It also helps to promote these abstractions as capabilities on their 
own.  Either for use in other engines
-or for potential new additions to pyarrow as compute utilities.
-
-Make Tasks not Threads
-----------------------
-
-If you need to run something in parallel then you should use thread tasks and 
not dedicated threads.
-
- * This keeps the thread count down (reduces thread contention and context 
switches)
- * This prevents deadlock (tasks get cancelled automatically in the event of a 
failure)
- * This simplifies profiling (Tasks can be easily measured, easier to know 
where all the work is)
- * This makes it possible to run without threads (sometimes users are doing 
their own threading and
-   sometimes we need to run in thread-restricted environments like emscripten)
-
-Note: we do not always follow this advice currently.  There is a dedicated 
process thread in the asof join
-node.  Dedicated threads are "ok" for experimental use but we'd like to 
migrate away from them.
-
-Don't Block on CPU Threads
---------------------------
-
-If you need to run a potentially long running activity that is not actively 
using CPU resources (e.g. reading from
-disk, network I/O, waiting on an external library using its own threads) then 
you should use asynchronous utilities
-to ensure that you do not block CPU threads.
-
-Don't Reinvent the Wheel
-------------------------
-
-Each node should not be a standalone island of utilities.  Where possible, 
computation should be pushed
-either into compute functions or into common shared utilities.  This is the 
only way a project as large as
-this can hope to be maintained.
-
-Avoid Query Optimization
-------------------------
-
-Writing an efficient Acero plan can be challenging.  For example, filter 
expressions and column selection
-should be pushed down into the scan node so that the data isn't read from 
disk.  Expressions should be
-simplified and common sub-expressions factored out.  The build side of a hash 
join node should be the
-smaller of the two inputs.
-
-However, figuring these problems out is a challenge reserved for a query 
planner or a query optimizer.
-Creating a query optimizer is a challenging task beyond the scope of Acero.  
With adoption of Substrait
-we hope utilities will eventually emerge that solve these problems.  As a 
result, we generally avoid doing
-any kind of query optimization within Acero.  Acero should interpret 
declarations as literally as possible.
-This helps reduce maintenance and avoids surprises.
-
-We also realize that this is not always possible.  For example, the hash join 
node currently detects if there
-is a chain of hash join operators and, if there is, it configure bloom filters 
between the operators.  This is
-technically a task that could be left to a query optimizer.  However, this 
behavior is rather specific to Acero
-and fairly niche and so it is unlikely it will be introduced to an optimizer 
anytime soon.
-
-Performance Guidelines
-======================
-
-Batch Size
-----------
-
-Perhaps the most discussed performance criteria is batch size.  Acero was 
originally
-designed based on research to follow a morsel-batch model.  Tasks are created 
based on
-a large batch of rows (a morsel).  The goal is for the morsel to be large 
enough to justify
-the overhead of a task.  Within a task the data is further subdivided into 
batches.
-Each batch should be small enough to fit comfortable into CPU cache (often the 
L2 cache).
-
-This sets up two loops.  The outer loop is parallel and the inner loop is not:
-
-.. code:: python
-
-  for morsel in dataset: # parallel
-    for batch in morsel:
-      run_pipeline(batch)
-
-The advantage of this style of execution is that successive nodes (or 
successive operations
-within an exec node) that access the same column are likely to benefit from 
cache.  It also
-is essential for functions that require random access to data.  It maximizes 
parallelism while
-minimizing the data transfer from main memory to CPU cache.
-
-.. figure:: microbatch.svg
-
-   If multiple passes through the data are needed (or random access) and the 
batch is much bigger
-   then the cache then performance suffers.  Breaking the task into smaller 
batches helps improve
-   task locality.
-
-The morsel/batch model is reflected in a few places in Acero:
-
- * In most source nodes we will try and grab batches of 1Mi rows.  This is 
often configurable.
- * In the source node we then iterate and slice off batches of 32Ki rows.  
This is not currently
-   configurable.
- * The hash join node currently requires that a batches contain at 32Ki rows 
or less as it uses
-   16-bit signed integers as row indices in some places.
-
-However, this guidance is debateable.  Profiling has shown that we do not get 
any real benefit
-from moving to a smaller batch size.  It seems any advantage we do get is lost 
in per-batch
-overhead.  Most of this overhead appears to be due to various per-batch 
allocations.  In addition,
-depending on your hardware, it's not clear that CPU Cache<->RAM will always be 
the bottleneck.  A
-combination of linear access, pre-fetch, and high CPU<->RAM bandwidth can 
alleviate the penalty
-of cache misses.
-
-As a result, this section is included in the guide to provide historical 
context, but should not
-be considered binding.
-
-Ongoing & Deprecated Work
-=========================
-
-The following efforts are ongoing.  They are described here to explain certain 
duplication in the
-code base as well as explain types that are going away.
-
-Scanner v2
-----------
-
-The scanner is currently a node in the datasets module registered with the 
factory registry as "scan".
-This node was written prior to Acero and made extensive use of AsyncGenerator 
to scan multiple files
-in parallel.  Unfortunately, the use of AsyncGenerator made the scan difficult 
to profile, difficult
-to debug, and impossible to cancel.  A new scan node is in progress.  It is 
currently registered with
-the name "scan2".  The new scan node uses the AsyncTaskScheduler instead of 
AsyncGenerator and should
-provide additional features such as the ability to skip rows and handle nested 
column projection (for
-formats that support it)
-
-OrderBySink and SelectKSink
----------------------------
-
-These two exec nodes provided custom sink implementations.  They were written 
before ordered execution
-was added to Acero and were the only way to generate ordered output.  However, 
they had to be placed
-at the end of a plan and the fact that they were custom sink nodes made them 
difficult to describe with
-Declaration.  The OrderByNode and FetchNode replace these.  These are kept at 
the moment until existing
-bindings move away from them.
-
-Upstreaming Changes
-===================
-
-Acero is designed so that it can be extended without recompilation.  You can 
easily add new compute
-functions and exec nodes without creating a fork or compiling Acero.  However, 
as you develop new
-features that are generally useful, we hope you will make time to upstream 
your changes.
-
-Even though we welcome these changes we have to admit that there is a cost to 
this process.  Upstreaming
-code requires that the new module behave correctly, but that is typically the 
easier part to review.
-More importantly, upstreaming code is a process of transferring the 
maintenance burden from yourself to
-the wider Arrow C++ project maintainers.  This requires a deep understanding 
of the code by maintainers,
-it requires the code be consistent with the style of the project, and it 
requires that the code be well
-tested with unit tests to aid in regression.
-
-Because of this, we highly recommend taking the following steps:
-
-* As you are starting out you should send a message to the mailing list 
announcing your intentions and
-  design.  This will help you determine if there is wider interest in the 
feature and others may have
-  ideas or suggestions to contribute early on in the process.
-
-  * If there is not much interest in the feature then keep in mind that it may 
be difficult to eventually
-    upstream the change.  The maintenance capacity of the team is limited and 
we try and prioritize
-    features that are in high demand.
-
-* We recommend developing and testing the change on your own fork until you 
get it to a point where you
-  are fairly confident things are working correctly.  If the change is large 
then you might also think
-  about how you can break up the change into smaller pieces.  As you do this 
you can share both the larger
-  PR (as a draft PR or a branch on your local fork) and the smaller PRs.  This 
way we can see the context
-  of the smaller PRs.  However, if you do break things up, smaller PRs should 
still ideally stand on their
-  own.
-
-* Any PR will need to have the following:
-
-  * Unit tests converting the new functionality
-
-  * Microbenchmarks if there is any significant compute work going on
-
-  * Examples demonstrating how to use the new feature
-
-  * Updates to the API reference and this guide
-
-  * Passing CI (you can enable GitHub Actions on your fork and that will allow 
most CI jobs to run before
-    you create your PR)
diff --git a/docs/source/cpp/user_guide.rst b/docs/source/cpp/user_guide.rst
index 6a426b6c10..094859f9c5 100644
--- a/docs/source/cpp/user_guide.rst
+++ b/docs/source/cpp/user_guide.rst
@@ -30,7 +30,7 @@ User Guide
    tables
    compute
    gandiva
-   streaming_execution
+   acero
    io
    ipc
    orc
diff --git a/docs/source/developers/cpp/acero.rst 
b/docs/source/developers/cpp/acero.rst
index 688cfa9d5d..0492007904 100644
--- a/docs/source/developers/cpp/acero.rst
+++ b/docs/source/developers/cpp/acero.rst
@@ -15,551 +15,686 @@
 .. specific language governing permissions and limitations
 .. under the License.
 
-.. highlight:: console
-.. _development-cpp-acero:
+.. default-domain:: cpp
+.. highlight:: cpp
+.. cpp:namespace:: arrow::acero
 
 ================
 Developing Acero
 ================
 
-Swiss Table
-===========
-
-A specialized hash table implementation used to dynamically map combinations of
-key field values to a dense set of integer ids. Ids can later be used in place
-of keys to identify groups of rows with equal keys.
-
-Introduction
-------------
-
-Hash group-by in Arrow uses a variant of a hash table based on a data structure
-called Swiss table. Swiss table uses linear probing. There is an array of slots
-and the information related to inserted keys is stored in these slots. A hash
-function determines the slot where the search for a matching key will start
-during hash table lookup. Then the slots are visited sequentially, wrapping
-around the end of an array, until either a match or an empty slot is found, the
-latter case meaning that there is no match.  Swiss table organizes the slots in
-blocks of 8 and has a design that enables data level parallelism at the block
-level. More precisely, it allows for visiting all slots within a block at once
-during lookups, by simply using 64-bit arithmetic. SIMD instructions can 
further
-enhance this data level parallelism allowing to process multiple blocks related
-to multiple input keys together using SIMD vectors of 64-bit elements. Occupied
-slots within a block are always clustered together. The name Swiss table comes
-from likening resulting sequences of empty slots to holes in a one dimensional
-cheese.
-
-Interface
----------
+This page goes into more detail into the design of Acero.  It discusses how
+to create custom exec nodes and describes some of the philosophies behind 
Acero's
+design and implementation.  Finally, it gives an overview of how to extend 
Acero
+with new behaviors and how this new behavior can be upstreamed into the core 
Arrow
+repository.
+
+Understanding ExecNode
+======================
+
+ExecNode is an abstract class with several pure virtual methods that control 
how the node operates:
 
-Hash table used in query processing for implementing join and group-by 
operators
-does not need to provide all of the operations that a general purpose hash 
table
-would. Simplified requirements can help achieve a simpler and more efficient
-design. For instance we do not need to be able to remove previously inserted
-keys. It’s an append-only data structure: new keys can be added but old keys 
are
-never erased. Also, only a single copy of each key can be inserted - it is like
-``std::map`` in that sense and not ``std::multimap``.
-
-Our Swiss table is fully vectorized. That means that all methods work on 
vectors
-of input keys processing them in batches. Specialized SIMD implementations of
-processing functions are almost always provided for performance critical
-operations. All callback interfaces used from the core hash table code are also
-designed to work on batches of inputs instead of individual keys. The batch 
size
-can be almost arbitrary and is selected by the client of the hash table. Batch
-size should be the smallest number of input items, big enough so that the
-benefits of vectorization and SIMD can be fully experienced. Keeping it small
-means less memory used for temporary arrays storing intermediate results of
-computation (vector equivalent of some temporary variables kept on the stack).
-That in turn means smaller space in CPU caches, which also means less impact on
-other memory access intensive operations. We pick 1024 as the default size of
-the batch. We will call it a **mini-batch** to distinguish it from potentially
-other forms of batches used at higher levels in the code, e.g. when scheduling
-work for worker threads or relational operators inside an analytic query.
-
-The main functionality provided by Swiss table is mapping of arbitrarily 
complex
-keys to unique integer ids. Let us call it **lookup-or-insert**. Given a
-sequence of key values, return a corresponding sequence of integer ids, such
-that all keys that are equal receive the same id and for K distinct keys the
-integer ids will be assigned from the set of numbers 0 to (K-1). If we find a
-matching key in a hash table for a given input, we return the **key id**
-assigned when the key was first inserted into a hash table. If we fail to find
-an already inserted match, we assign the first unused integer as a key id and
-add a new entry to a hash table. Due to vectorized processing, which may result
-in out-of-order processing of individual inputs, it is not guaranteed that if
-there are two new key values in the same input batch and one of them appears
-earlier in the input sequence, then it will receive a smaller key id. 
Additional
-mapping functionality can be built on top of basic mapping to integer key id,
-for instance if we want to assign and perhaps keep updating some values to all
-unique keys, we can keep these values in a resizable vector indexed by obtained
-key id.
-
-The implementation of Swiss table does not need to have any information related
-to the domain of the keys. It does not use their logical data type or
-information about their physical representation and does not even use pointers
-to keys. All access to keys is delegated to a separate class or classes that
-provide callback functions for three operations:
-
-- computing hashes of keys;
-- checking equality for given pairs of keys;
-- appending a given sequence of keys to a stack maintained outside of Swiss
-  table object, so that they can be referenced later on by key ids (key ids 
will
-  be equal to their positions in the stack).
-
-When passing arguments to callback functions the keys are referenced using
-integer ids. For the left side - that is the keys present in the input
-mini-batch - ordinal positions within that mini-batch are used. For the right
-side - that is the keys inserted into the hash table - these are identified by
-key ids assigned to them and stored inside Swiss table when they were first
-encountered and processed.
-
-Diagram with logical view of information passing in callbacks:
-
-.. image:: img/swiss_table_1.jpg
-
-Hash table values for inserted keys are also stored inside Swiss table. Because
-of that, hash table logic does not need to ever re-evaluate the hash, and there
-is actually no need for a hash function callback. It is enough that the caller
-provides hash values for all entries in the batch when calling 
lookup-or-insert.
-
-Basic architecture and organization of data
+:func:`ExecNode::StartProducing`
+--------------------------------
+
+This method is called once at the start of the plan.  Most nodes ignore this 
method (any
+necessary initialization should happen in the constructor or Init).  However, 
source nodes
+will typically provide a custom implementation.  Source nodes should schedule 
whatever tasks
+are needed to start reading and providing the data.  Source nodes are usually 
the primary
+creator of tasks in a plan.
+
+.. note::
+   The ExecPlan operates on a push-based model.  Sources are often pull-based. 
 For example,
+   your source may be an iterator.  The source node will typically then 
schedule tasks to pull one
+   item from the source and push that item into the source's output node (via 
``InputReceived``).
+
+Examples
+^^^^^^^^
+
+* In the ``table_source`` node the input table is divided into batches.  A 
task is created for
+  each batch and that task calls ``InputReceived`` on the node's output.
+* In the ``scan`` node a task is created to start listing fragments from the 
dataset.  Each listing
+  task then creates tasks to read batches from the fragment, asynchronously.  
When the batch is
+  full read in then a continuation schedules a new task with the exec plan.  
This task calls
+  ``InputReceived`` on the scan node's output.
+
+:func:`ExecNode::InputReceived`
+-------------------------------
+
+This method is called many times during the execution of a plan.  It is how 
nodes pass data
+to each other.  An input node will call InputReceived on its output.  Acero's 
execution model
+is push-based.  Each node pushes data into its output by calling InputReceived 
and passing in
+a batch of data.
+
+The InputReceived method is often where the actual work happens for the node.  
For example,
+a project node will execute its expressions and create a new expanded output 
batch.  It will then
+call InputReceived on its output.  InputReceived will never be called on a 
source node.  Sink
+nodes will never call InputReceived.  All other nodes will experience both.
+
+Some nodes (often called "pipeline breakers") must accumulate input before 
they can generate
+any output.  For example, a sort node must accumulate all input before it can 
sort the data and
+generate output.  In these nodes the InputReceived method will typically place 
the data into
+some kind of accumulation queue.  If the node doesn't have enough data to 
operate then it will
+not call InputReceived.  This will then be the end of the current task.
+
+Examples
+^^^^^^^^
+
+* The ``project`` node runs its expressions, using the received batch as input 
for the expression.
+  A new batch is created from the input batch and the result of the 
expressions.  The new batch is
+  given the same order index as the input batch and the node then calls 
``InputReceived`` on its
+  output.
+* The ``order_by`` node inserts the batch into an accumulation queue.  If this 
was the last batch
+  then the node will sort everything in the accumulation queue.  The node will 
then call
+  ``InputReceived`` on the output for each batch in the sorted result.  A new 
batch index will be
+  assigned to each batch.  Note that this final output step might also occur 
as a result of a call
+  to ``InputFinished`` (described below).
+
+:func:`ExecNode::InputFinished`
+-------------------------------
+
+This method will be called once per input.  A node will call InputFinished on 
its output once it
+knows how many batches it will be sending to that output.  Normally this 
happens when the node is
+finished working.  For example, a scan node will call InputFinished once it 
has finished reading
+its files.  However, it could call it earlier if it knows (maybe from file 
metadata) how many
+batches will be created.
+
+Some nodes will use this signal to trigger some processing.  For example, a 
sort node need to
+wait until it has received all of its input before it can sort the data.  It 
relies on the InputFinished
+call to know this has happened.
+
+Even if a node is not doing any special processing when finished (e.g. a 
project node or filter node
+doesn't need to do any end-of-stream processing) that node will still call 
InputFinished on its
+output.
+
+.. warning::
+   The InputFinished call might arrive before the final call to InputReceived. 
 In fact, it could
+   even be sent out before any calls to InputReceived begin.  For example, the 
table source node
+   always knows exactly how many batches it will be producing.  It could 
choose to call InputFinished
+   before it ever calls InputReceived.  If a node needs to do "end-of-stream" 
processing then it typically
+   uses an AtomicCounter which is a helper class to figure out when all of the 
data has arrived.
+
+Examples
+^^^^^^^^
+
+* The ``order_by`` checks to see if it has already received all its batches.  
If it has then it performs
+  the sorting step described in the ``InputReceived`` example.  Before it 
starts sending output data it
+  checks to see how many output batches it has (it's possible the batch size 
changed as part of the
+  accumulating or sorting) and calls ``InputFinished`` on the node's output.
+* The ``fetch`` node, during a call to ``InputReceived`` realizes it has 
received all the rows it was
+  asked for.  It calls ``InputFinished`` on its output immediately (even 
though its own ``InputFinished``
+  method has not yet been called)
+
+:func:`ExecNode::PauseProducing` / :func:`ExecNode::ResumeProducing`
+---------------------------------------------------------------------
+
+These methods control backpressure.  Some nodes may need to pause their input 
to avoid accumulating
+too much data.  For example, when the user is consuming the plan with a 
RecordBatchReader we use a
+SinkNode.  The SinkNode places data in a queue that the RecordBatchReader 
pulls from (this is a
+conversion from a push-model to a pull-model).  If the user is reading the 
RecordBatchReader slowly then
+it is possible this queue will start to fill up.  For another example we can 
consider the write node.
+This node writes data to a filesystem.  If the writes are slow then data might 
accumulate at the
+write node.  As a result, the write node would need to apply backpressure.
+
+When a node realizes that it needs to apply some backpressure it will call 
PauseProducing on its input.
+Once the node has enough space to continue it will then call ResumeProducing 
on its input.  For example,
+the SinkNode would pause when its queue gets too full.  As the user continues 
to read from the
+RecordBatchReader we can expect the queue to slowly drain.  Once the queue has 
drained enough then the
+SinkNode can call ResumeProducing.
+
+Source nodes typically need to provide special behavior for PauseProducing and 
ResumeProducing.  For
+example, a scan node that is reading from a file can pause reading the file.  
However, some source nodes
+may not be able to pause in any meaningful way.  There is not much point in a 
table source node pausing
+because its data is already in memory.
+
+Nodes that are neither source or sink should still forward backpressure 
signals.  For example, when
+PauseProducing is called on a project node it should call PauseProducing on 
its input.  If a node has
+multiple inputs then it should forward the signal to every input.
+
+Examples
+^^^^^^^^
+
+* The ``write`` node, in its ``InputReceived`` method, adds a batch to a 
dataset writer's queue.  If the
+  dataset writer is then full it will return an unfinished future that will 
complete when it has more room.
+  The ``write`` node then calls ``PauseProducing`` on its input.  It then adds 
a continuation to the future
+  that will call ``ResumeProducing`` on its input.
+* The ``scan`` node uses an :class:`AsyncTaskScheduler` to keep track of all 
the tasks it schedules.  This
+  scheduler is throttled to limit how much concurrent I/O the ``scan`` node is 
allowed to perform.  When
+  ``PauseProducing`` is called then the node will pause the scheduler.  This 
means that any tasks queued
+  behind the throttle will not be submitted.  However, any ongoing I/O will 
continue (backpressure can't
+  take effect immediately).  When ``ResumeProducing`` is called the ``scan`` 
node will unpause the scheduler.
+
+:func:`ExecNode::StopProducing`
+-------------------------------
+
+StopProducing is called when a plan needs to end early.  This can happen 
because the user cancelled
+the plan and it can happen because an error occurred.  Most nodes do not need 
to do anything here.
+There is no expectation or requirement that a node sends any remaining data it 
has.  Any node that
+schedules tasks (e.g. a source node) should stop producing new data.
+
+In addition to plan-wide cancellation, a node may call this method on its 
input if it has decided
+that it has received all the data that it needs.  However, because of 
parallelism, a node may still
+receive a few calls to ``InputReceived`` after it has stopped its input.
+
+If any external resources are used then cleanup should happen as part of this 
call.
+
+Examples
+^^^^^^^^
+
+* The ``asofjoin`` node has a dedicated processing thread the communicates 
with the main Acero threads
+  using a queue.  When ``StopProducing`` is called the node inserts a poison 
pill into the queue.  This
+  tells the processing thread to stop immediately.  Once the processing thread 
stops it marks its external
+  task (described below) as completed which allows the plan to finish.
+* The ``fetch`` node, in ``InputReceived``, may decide that it has all the 
data it needs.  It can then call
+  ``StopProducing`` on its input.
+
+Initialization / Construction / Destruction
 -------------------------------------------
 
-The hash table is an array of **slots**. Slots are grouped in groups of 8 
called
-**blocks**. The number of blocks is a power of 2. The empty hash table starts
-with a single block, with all slots empty. Then, as the keys are getting
-inserted and the amount of empty slots is shrinking, at some point resizing of
-the hash table is triggered. The data stored in slots is moved to a new hash
-table that has the double of the number of blocks.
-
-The diagram below shows the basic organization of data in our implementation of
-Swiss table:
-
-.. image:: img/swiss_table_2.jpg
-
-N is the log of the number of blocks, :math:`2^{N+3}` is  the number of slots 
and
-also the maximum number of inserted keys and hence (N + 3) is the number of 
bits
-required to store a key id. We will refer to N as the **size of the hash 
table**.
-
-Index of a block within an array will be called **block id**, and similarly 
index
-of a slot will be **slot id**. Sometimes we will focus on a single block and
-refer to slots that belong to it by using a **local slot id**, which is an 
index
-from 0 to 7.
-
-Every slot can either be **empty** or store data related to a single inserted
-key. There are three pieces of information stored inside a slot:
-
-- status byte,
-- key id,
-- key hash.
-
-Status byte, as the name suggests, stores 8 bits. The highest bit indicates if
-the slot is empty (the highest bit is set) or corresponds to one of inserted
-keys (the highest bit is zero). The remaining 7 bits contain 7 bits of key hash
-that we call a **stamp**. The stamp is used to eliminate some false positives
-when searching for a matching key for a given input. Slot also stores **key 
id**,
-which is a non-negative integer smaller than the number of inserted keys, that 
is
-used as a reference to the actual inserted key. The last piece of information
-related to an inserted key is its **hash** value. We store hashes for all keys,
-so that they never need to be re-computed. That greatly simplifies some
-operations, like resizing of a hash table, that may not even need to look at 
the
-keys at all. For an empty slot, the status byte is 0x80, key id is zero and the
-hash is not used and can be set to any number.
-
-A single block contains 8 slots and can be viewed as a micro-stack of up to 8
-inserted keys. When the first key is inserted into an empty block, it will 
occupy
-a slot with local id 0. The second inserted key will go into slot number 1 and 
so
-on. We use N highest bits of hash to get an index of a **start block**, when
-searching for a match or an empty slot to insert a previously not seen key when
-that is the case. If the start block contains any empty slots, then the search
-for either a match or place to insert a key will end at that block. We will 
call
-such a block an **open block**. A block that is not open is a full block. In 
the
-case of full block, the input key related search may continue in the next block
-modulo the number of blocks. If the key is not inserted into its start block, 
we
-will refer to it as an **overflow** entry, other entries being 
**non-overflow**.
-Overflow entries are slower to process, since they require visiting more than 
one
-block, so we want to keep their percentage low. This is done by choosing the
-right **load factor** (percentage of occupied slots in the hash table) at which
-the hash table gets resized and the number of blocks gets doubled. By tuning 
this
-value we can control the probability of encountering an overflow entry.
-
-The most interesting part of each block is the set of status bytes of its 
slots,
-which is simply a single 64-bit word. The implementation of efficient searches
-across these bytes during lookups require using either leading zero count or
-trailing zero count intrinsic. Since there are cases when only the first one is
-available, in order to take advantage of it, we order the bytes in the 64-bit
-status word so that the first slot within a block uses the highest byte and the
-last one uses the lowest byte (slots are in reversed bytes order). The diagram
-below shows how the information about slots is stored within a 64-bit status
-word:
-
-.. image:: img/swiss_table_3.jpg
-
-Each status byte has a 7-bit fragment of hash value - a **stamp** - and an 
empty
-slot bit. Empty slots have status byte equal to 0x80 - the highest bit is set 
to
-1 to indicate an empty slot and the lowest bits, which are used by a stamp, are
-set to zero.
-
-The diagram below shows which bits of hash value are used by hash table:
-
-.. image:: img/swiss_table_4.jpg
-
-If a hash table has :math:`2^{N}` blocks, then we use N highest bits of a hash
-to select a start block when searching for a match. The next 7 bits are used as
-a stamp. Using the highest bits to pick a start block means that a range of 
hash
-values can be easily mapped to a range of block ids of start blocks for hashes
-in that range. This is useful when resizing a hash table or merging two hash
-tables together.
-
-Interleaving status bytes and key ids
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Status bytes and key ids for all slots are stored in a single array of bytes.
-They are first grouped by 8 into blocks, then each block of status bytes is
-interleaved with a corresponding block of key ids. Finally key ids are
-represented using the smallest possible number of bits and bit-packed (bits
-representing each next key id start right after the last bit of the previous 
key
-id). Note that regardless of the chosen number of bits, a block of bit-packed
-key ids (that is 8 of them) will start and end on the byte boundary.
-
-The diagram below shows the organization of bytes and bits of a single block in
-interleaved array:
-
-.. image:: img/swiss_table_5.jpg
-
-From the size of the hash table we can derive the number K of bits needed in 
the
-worst case to encode any key id. K is equal to the number of bits needed to
-represent slot id (number of keys is not greater than the number of slots and 
any
-key id is strictly less than the number of keys), which for a hash table of 
size
-N (N blocks) equals (N+3). To simplify bit packing and unpacking and avoid
-handling of special cases, we will round up K to full bytes for K > 24 bits.
-
-Status bytes are stored in a single 64-bit word in reverse byte order (the last
-byte corresponds to the slot with local id 0). On the other hand key ids are
-stored in the normal order (the order of slot ids).
-
-Since both status byte and key id for a given slot are stored in the same array
-close to each other, we can expect that most of the lookups will read only one
-CPU cache-line from memory inside Swiss table code (then at least another one
-outside Swiss table to access the bytes of the key for the purpose of
-comparison). Even if we hit an overflow entry, it is still likely to reside on
-the same cache-line as the start block data. Hash values, which are stored
-separately from status byte and key id, are only used when resizing and do not
-impact the lookups outside these events.
+Simple initialization logic (that cannot error) can be done in the 
constructor.  If the initialization
+logic may return an invalid status then it can either be done in the exec 
node's factory method or
+the ``Init`` method.  The factory method is preferred for simple validation.  
The ``Init`` method is
+preferred if the initialization might do expensive allocation or other 
resource consumption.  ``Init`` will
+always be called before ``StartProducing`` is called.  Initialization could 
also be done in
+``StartProducing`` but keep in mind that other nodes may have started by that 
point.
+
+In addition, there is a ``Validate`` method that can be overloaded to provide 
custom validation.  This
+method is normally called before ``Init`` but after all inputs and outputs 
have been added.
+
+Finalization happens today in the destructor.  There are a few examples today 
where that might be slow.
+For example, in the write node, if there was an error during the plan, then we 
might close out some open
+files here.  Should there be significant finalization that is either 
asynchronous or could potentially
+trigger an error then we could introduce a Finalize method to the ExecNode 
lifecycle.  It hasn't been
+done yet only because it hasn't been needed.
+
+Summary
+-------
+
+.. list-table:: ExecNode Lifecycle
+   :widths: 20 40 40
+   :header-rows: 1
+
+   * - Method Name
+     - This is called when...
+     - A node calls this when...
+   * - StartProducing
+     - The plan is starting
+     - N/A
+   * - InputReceived
+     - Data is received from the input
+     - To send data to the output
+   * - InputFinished
+     - The input knows how many batches there are
+     - The node can tell its output how many batches there are
+   * - StopProducing
+     - A plan is aborted or an output has enough data
+     - A node has all the data it needs
+
+Extending Acero
+===============
+
+Acero instantiates a singleton :class:`ExecFactoryRegistry` which maps between 
names and exec node
+factories (methods which create an ExecNode from options).  To create a new 
ExecNode you can register
+the node with this registry and your node will now be usable by Acero.  If you 
would like to be able
+to use this node with Substrait plans you will also need to configure the 
Substrait registry so that it
+knows how to map Substrait to your custom node.
+
+This means that you can create and add new nodes to Acero without recompiling 
Acero from source.
+
+Scheduling and Parallelism
+==========================
+
+There are many ways in that data engines can utilize multiple compute 
resources (e.g. multiple cores).
+Before we get into the details of Acero's scheduling we will cover a few high 
level topics.
+
+Parallel Execution of Plans
+---------------------------
+
+Users may want to execute multiple plans concurrently and they are welcome to 
do so.  However, Acero has no
+concept of inter-plan scheduling.  Each plan will attempt to maximize its 
usage of compute resources and
+there will likely be contention of CPU and memory and disk resources.  If 
plans are using the default CPU &
+I/O thread pools this will be mitigated somewhat since they will share the 
same thread pool.
+
+Locally Distributed Plans
+-------------------------
+
+A common way to tackle multi-threading is to split the input into partitions 
and then create a plan for
+each partition and then merge the results from these plans in some way.  For 
example, let's assume you
+have 20 files and 10 cores and you want to read and sort all the data.  You 
could create a plan for every
+2 files to read and sort those files.  Then you could create one extra plan 
that takes the input from these
+10 child plans and merges the 10 input streams in a sorted fashion.
+
+This approach is popular because it is how queries are distributed across 
multiple servers and so it
+is widely supported and well understood.  Acero does not do this today but 
there is no reason to prevent it.
+Adding shuffle & partition nodes to Acero should be a high priority and would 
enable Acero to be used by
+distributed systems.  Once that has been done then it should be possible to do 
a local shuffle (local
+meaning exchanging between multiple exec plan instances on a single system) if 
desired.
+
+.. figure:: img/dist_plan.svg
+
+   A distributed plan can provide parallelism even if the plans themselves run 
serially
+
+Pipeline Parallelism
+--------------------
+
+Acero attempts to maximize parallelism using pipeline parallelism.  As each 
batch of data arrives from the
+source we immediately create a task and start processing it.  This means we 
will likely start processing
+batch X before the processing of batch X-1 has completed.  This is very 
flexible and powerful.  However, it also
+means that properly implementing an ExecNode is difficult.
+
+For example, an ExecNode's InputReceived method should be reentrant.  In other 
words, it should be expected
+that InputReceived will be called before the previous call to InputReceived 
has completed.  This means that
+nodes with any kind of mutable state will need mutexes or similar mechanisms 
to protect that state from race
+conditions.  It also means that tasks can easily get out of order and nodes 
should not expect any particular ordering
+of their input (more on this later).
+
+.. figure:: img/pipeline.svg
+
+   An example of pipeline parallelism on a system with 3 CPU threads and 2 I/O 
threads
+
+Asynchronicity
+--------------
+
+Some operations take a long time and may not require the CPU.  Reading data 
from the filesystem is one example.  If we
+only have one thread per core then time will be wasted while we wait for these 
operations to complete.  There
+are two common solutions to this problem.  A synchronous solution is often to 
create more threads than there are
+cores with the expectation that some of them will be blocked and that is ok.  
This approach tends to be simpler
+but it can lead to excess thread contention and requires fine-tuning.
+
+Another solution is to make the slow operations asynchronous.  When the slow 
operation starts the caller gives up
+the thread and allows other tasks to run in the meantime.  Once the slow 
operation finishes then a new task is
+created to take the result and continue processing.  This helps to minimize 
thread contention but tends to be
+more complex to implement.
+
+Due to a lack of standard C++ async APIs, Acero uses a combination of the two 
approaches.  Acero has two thread pools.
+The first is the CPU thread pool.  This thread pool has one thread per core.  
Tasks in this thread pool should never
+block (beyond minor delays for synchronization) and should generally be 
actively using CPU as much as possible.  Threads
+on the I/O thread pool are expected to spend most of the time idle.  They 
should avoid doing any CPU-intensive work.
+Their job is basically to wait for data to be available and schedule follow-up 
tasks on the CPU thread pool.
+
+.. figure:: img/async.svg
+
+   Arrow achieves asynchronous execution by combining CPU & I/O thread pools
+
+.. note::
+
+   Most nodes in Acero do not need to worry about asynchronicity.  They are 
fully synchronous and do not spawn tasks.
+
+Task per Pipeline (and sometimes beyond)
+----------------------------------------
+
+An engine could choose to create a thread task for every execution of a node.  
However, without careful scheduling,
+this leads to problems with cache locality.  For example, let's assume we have 
a basic plan consisting of three
+exec nodes, scan, project, and then filter (this is a very common use case).  
Now let's assume there are 100 batches.
+In a task-per-operator model we would have tasks like "Scan Batch 5", "Project 
Batch 5", and "Filter Batch 5".  Each
+of those tasks is potentially going to access the same data.  For example, 
maybe the ``project`` and ``filter`` nodes need
+to read the same column.  A column which is intially created in a decode phase 
of the ``scan`` node.  To maximize cache
+utilization we would need to carefully schedule our tasks to ensure that all 
three of those tasks are run consecutively
+and assigned to the same CPU core.
+
+To avoid this problem we design tasks that run through as many nodes as 
possible before the task ends.  This sequence
+of nodes is often referred to as a "pipeline" and the nodes that end the 
pipeline (and thus end the task) are often
+called "pipeline breakers".  Some nodes might even fall somewhere in between.  
For example, in a hash join node, when
+we receive a batch on the probe side, and the hash table has been built, we do 
not need to end the task and instead keep
+on running.  This means that tasks might sometimes end at the join node and 
might sometimes continue past the join node.
+
+.. figure:: img/pipeline_task.svg
+
+   A logical view of pipelines in a plan and two tasks, showing that pipeline 
boundaries may vary during a plan
+
+
+Thread Pools and Schedulers
+---------------------------
+
+The CPU and I/O thread pools are a part of the core Arrow-C++ library.  They 
contain a FIFO queue of tasks and will
+execute them as a thread is available.  For Acero we need additional 
capabilities.  For this we use the
+AsyncTaskScheduler.  In the simplest mode of operation the scheduler simply 
submits tasks to an underlying thread pool.
+However, it is also capable of creating sub-schedulers which can apply 
throttling, prioritization, and task tracking:
+
+ * A throttled scheduler associates a cost with each task.  Tasks are only 
submitted to the underlying scheduler
+   if there is room.  If there is not then the tasks are placed in a queue.  
The write node uses a throttle of size
+   1 to avoid reentrantly calling the dataset writer (the dataset writer does 
its own internal scheduling).  A throttled
+   scheduler can be manually paused and unpaused.  When paused all tasks are 
queued and queued tasks will not be submitted
+   even if there is room.  This can be useful in source nodes to implement 
PauseProducing and ResumeProducing.
+ * Priority can be applied to throttled schedulers to control the order in 
which queued tasks are submitted.  If
+   there is room a task is submitted immediately (regardless of priority).  
However, if the throttle is full then
+   the task is queued and subject to prioritization.  The scan node throttles 
how many read requests it generates
+   and prioritizes reading a dataset in order, if possible.
+ * A task group can be used to keep track of a collection of tasks and run a 
finalization task when all of the
+   tasks have completed.  This is useful for fork-join style problems.  The 
write node uses a task group to close
+   a file once all outstanding write tasks for the file have completed.
+
+There is research and examples out there for different ways to prioritize 
tasks in an execution engine.  Acero has not
+yet had to address this problem.  Let's go through some common situations:
+
+ * Engines will often prioritize reading from the build side of a join node 
before reading from the probe side.  This
+   would be more easily handled in Acero by applying backpressure.
+ * Another common use case is to control memory accumulation.  Engines will 
prioritize tasks which are closer to the
+   sink node in an effort to relieve memory pressure.  However, Acero 
currently assumes that spilling will be added
+   at pipeline breakers and that memory usage in a plan will be more or less 
static (per core) and well below the
+   limits of the hardware.  This might change if Acero needs to be used in an 
environment where there are many compute
+   resources and limited memory (e.g. a GPU)
+ * Engines will often use work stealing algorithms to prioritize running tasks 
on the same core to improve cache
+   locality.  However, since Acero uses a task-per-pipeline model there isn't 
much lost opportunity for cache
+   parallelism that a scheduler could reclaim.  Tasks only end when there is 
no more work that can be done with the data.
+
+While there is not much prioritization in place in Acero today we do have the 
tools to apply it should we need to.
 
 .. note::
-   Improvement to consider:
-   In addition to the Swiss table data, we need to store an array of inserted
-   keys, one for each key id. If keys are of fixed length, then the address of
-   the bytes of the key can be calculated by multiplying key id by the common
-   length of the key. If keys are of varying length, then there will be an
-   additional array with an offset of each key within the array of concatenated
-   bytes of keys. That means that any key comparison during lookup will involve
-   3 arrays: one to get key id, one to get key offset and final one with bytes 
of
-   the key. This could be reduced to 2 array lookups if we stored key offset
-   instead of key id interleaved with slot status bytes. Offset indexed by key 
id
-   and stored in its own array becomes offset indexed by slot id and stored
-   interleaved with slot status bytes. At the same time key id indexed by slot 
id
-   and interleaved with slot status bytes before becomes key id referenced 
using
-   offset and stored with key bytes. There may be a slight increase in the 
total
-   size of memory needed by the hash table, equal to the difference in the 
number
-   of bits used to store offset and those used to store key id, multiplied by 
the
-   number of slots, but that should be a small fraction of the total size.
-
-32-bit hash vs 64-bit hash
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Currently we use 32-bit hash values in Swiss table code and 32-bit integers as
-key ids. For the robust implementation, sooner or later we will need to support
-64-bit hash and 64-bit key ids. When we use 32-bit hash, it means that we run
-out of hash bits when hash table size N is greater than 25 (25 bits of hash
-needed to select a block and 7 bits needed to generate a stamp byte reach 32
-total bits). When the number of inserted keys exceeds the maximal number of 
keys
-stored in a hash table of size 25 (which is at least :math:`2^{24}`), the 
chance
-of false positives during lookups will start quickly growing. 32-bit hash 
should
-not be used with more than about 16 million inserted keys.
-
-Low memory footprint and low chance of hash collisions
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Swiss table is a good choice of a hash table for modern hardware, because it
-combines lookups that can take advantage of special CPU instructions with space
-efficiency and low chance of hash collisions.
-
-Space efficiency is important for performance, because the cost of random array
-accesses, often dominating the lookup cost for larger hash tables, increases 
with
-the size of the arrays. This happens due to limited space of CPU caches. Let us
-look at what is the amortized additional storage cost for a key in a hash table
-apart from the essential cost of storing data of all those keys. Furthermore, 
we
-can skip the storage of hash values, since these are only used during 
infrequent
-hash table resize operations (should not have a big impact on CPU cache usage 
in
-normal cases).
-
-Half full hash table of size N will use 2 status bytes per inserted key 
(because
-for every filled slot there is one empty slot) and 2*(N+3) bits for key id
-(again, one for the occupied slot and one for the empty). For N = 16 for
-instance this is slightly under 7 bytes per inserted key.
-
-Swiss table also has a low probability of false positives leading to wasted key
-comparisons. Here is some rationale behind why this should be the case. Hash
-table of size N can contain up to :math:`2^{N+3}` keys. Search for a match
-involves (N + 7) hash bits: N to select a start block and 7 to use as a stamp.
-There are always at least 16 times more combinations of used hash bits than
-there are keys in the hash table (32 times more if the hash table is half 
full).
-These numbers mean that the probability of false positives resulting from a
-search for a matching slot should be low. That corresponds to an expected 
number
-of comparisons per lookup being close to 1 for keys already present and 0 for
-new keys.
-
-Lookup
+   In addition to the AsyncTaskScheduler there is another class called the 
TaskScheduler.  This class predates the
+   AsyncTaskScheduler and was designed to offer task tracking for highly 
efficient synchronous fork-join workloads.
+   If this specialized purpose meets your needs then you may consider using 
it.  It would be interesting to profile
+   this against the AsyncTaskScheduler and see how closely the two compare.
+
+Intra-node Parallelism
+----------------------
+
+Some nodes can potentially exploit parallelism within a task.  For example, in 
the scan node we can decode
+columns in parallel.  In the hash join node, parallelism is sometimes 
exploited for complex tasks such as
+building the hash table.  This sort of parallelism is less common but not 
necessarily discouraged.  Profiling should
+be done first though to ensure that this extra parallelism will be helpful in 
your workload.
+
+All Work Happens in Tasks
+-------------------------
+
+All work in Acero happens as part of a task.  When a plan is started the 
AsyncTaskScheduler is created and given an
+initial task.  This initial task calls StartProducing on the nodes.  Tasks may 
schedule additional tasks.  For example,
+source nodes will usually schedule tasks during the call to StartProducing.  
Pipeline breakers will often schedule tasks
+when they have accumulated all the data they need.  Once all tasks in a plan 
are finished then the plan is considered
+done.
+
+Some nodes use external threads.  These threads must be registered as external 
tasks using the BeginExternalTask method.
+For example, the asof join node uses a dedicated processing thread to achieve 
serial execution.  This dedicated thread
+is registered as an external task.  External tasks should be avoided where 
possible because they require careful
+handling to avoid deadlock in error situations.
+
+Ordered Execution
+=================
+
+Some nodes either establish an ordering to their outgoing batches or they need 
to be able to process batches in order.
+Acero handles ordering using the ``batch_index`` property on an ExecBatch.  If 
a node has a deterministic output order
+then it should apply a batch index on batches that it emits.  For example, the 
OrderByNode applies a new ordering to
+batches (regardless of the incoming ordering).  The scan node is able to 
attach an implicit ordering to batches which
+reflects the order of the rows in the files being scanned.
+
+If a node needs to process data in order then it is a bit more complicated.  
Because of the parallel nature of execution
+we cannot guarantee that batches will arrive at a node in order.  However, 
they can generally be expected to be "mostly
+ordered".  As a result, we can insert the batches into a sequencing queue.  
The sequencing queue is given a callback which
+is guaranteed to run on the batches, serially, in order.  For example, the 
fetch node uses a sequencing queue.  The callback
+checks to see if we need to include part or all of the batch, and then slices 
the batch if needed.
+
+Even if a node does not care about order it should try and maintain the batch 
index if it can.  The project and filter
+nodes do not care about order but they ensure that output batches keep the 
same index as their input batches.  The filter
+node will even emit empty batches if it needs to so that it can maintain the 
batch order without gaps.
+
+.. figure:: img/ordered.svg
+
+   An example of ordered execution
+
+
+Partitioned Execution
+=====================
+
+A stream is partitioned (or sometimes called segmented) if rows are grouped 
together in some way.  Currently there is not
+a formal notion of partitioning.  However, one is starting to develop (e.g. 
segmented aggregation) and we may end up
+introducing a more formal notion of partitions to Acero at some point as well.
+
+Spillover
+=========
+
+Spillover has not yet been implemented in Acero.
+
+Distributed Execution
+=====================
+
+There are certain exec nodes which are useful when an engine is used in a 
distributed environment.  The terminology
+can vary so we will use the Substrait terminology.  An exchange node sends 
data to different workers.  Often this is
+a partitioned exchange so that Acero is expected to partition each batch and 
distribute partitions across N different
+workers.  On the other end we have the capture node.  This node receives data 
from different workers.
+
+These nodes do not exist in Acero today.  However, they would be in scope and 
we hope to have such nodes someday.
+
+Profiling & Tracing
+===================
+
+Acero's tracing is currently half-implemented and there are major gaps in 
profiling tools.  However, there has been some
+effort at tracing with open telemetry and most of the necessary pieces are in 
place.  The main thing currently lacking is
+some kind of effective visualization of the tracing results.
+
+In order to use the tracing that is present today you will need to build with 
Arrow with ``ARROW_WITH_OPENTELEMETRY=ON``.
+Then you will need to set the environment variable 
``ARROW_TRACING_BACKEND=otlp_http``.  This will configure open telemetry
+to export trace results (as OTLP) to the HTTP endpoint 
http://localhost:4318/v1/traces.  You will need to configure an
+open telemetry collector to collect results on that endpoint and you will need 
to configure a trace viewer of some kind
+such as Jaeger: https://www.jaegertracing.io/docs/1.21/opentelemetry/
+
+Benchmarking
+============
+
+The most complete macro benchmarking for Acero is provided by 
https://github.com/voltrondata-labs/arrowbench
+These include a set of TPC-H benchmarks, executed from the R-dplyr 
integration, which are run on every Arrow commit and
+reported to Conbench at https://conbench.ursa.dev/
+
+In addition to these TPC-H benchmarks there are a number of micro-benchmarks 
for various nodes (hash-join, asof-join,
+etc.)  Finally, the compute functions themselves should mostly have 
micro-benchmarks.  For more on micro benchmarks you
+can refer to https://arrow.apache.org/docs/developers/benchmarks.html
+
+Any new functionality should include micro benchmarks to avoid regressions.
+
+Bindings
+========
+
+Public API
+----------
+
+The public API for Acero consists of Declaration and the various 
DeclarationToXyz methods.  In addition the
+options classes for each node are part of the public API.  However, nodes are 
extensible and so this API is
+extensible.
+
+R (dplyr)
+---------
+
+Dplyr is an R library for programmatically building queries.  The arrow-r 
package has dplyr bindings which
+adapt the dplyr API to create Acero execution plans.  In addition, there is a 
dplyr-substrait backend that
+is in development which could eventually replace the Acero-aware binding.
+
+Python
 ------
 
-Lookup-or-insert operation, given a hash of a key, finds a list of candidate
-slots with corresponding keys that are likely to be equal to the input key. The
-list may be empty, which means that the key does not exist yet in the hash
-table. If it is not empty, then the callback function for key comparison is
-called for each next candidate to verify that there is indeed a match. False
-positives get rejected and we end up either finding an actual match or an empty
-slot, which means that the key is new to the hash table. New keys get assigned
-next available integers as key ids, and are appended to the set of keys stored 
in
-the hash table. As a result of inserting new keys to the hash table, the 
density
-of occupied slots may reach an upper limit, at which point the hash table will 
be
-resized and will afterwards have twice as many slots. That is in summary
-lookup-or-insert functionality, but the actual implementation is a bit more
-involved, because of vectorization of the processing and various optimizations
-for common cases.
+The pyarrow library binds to Acero in two different ways.  First, there is a 
direct binding in pyarrow.acero
+which directly binds to the public API.  Second, there are a number of compute 
utilities like
+pyarrow.Table.group_by which uses Acero, though this is invisible to the user.
 
-Search within a single block
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Java
+----
 
-There are three possible cases that can occur when searching for a match for a
-given key (that is, for a given stamp of a key) within a single block,
-illustrated below.
-
-1. There is a matching stamp in the block of status bytes:
-
-.. image:: img/swiss_table_6.jpg
-
-2. There is no matching stamp in the block, but there is an empty slot in the
-   block:
-
-.. image:: img/swiss_table_7.jpg
-
-3. There is no matching stamp in the block and the block is full (there are no
-   empty slots left):
-
-.. image:: img/swiss_table_8.jpg
-
-64-bit arithmetic can be used to search for a matching slot within the entire
-single block at once, without iterating over all slots in it. Following is an
-example of a sequence of steps to find the first status byte for a given stamp,
-returning the first empty slot on miss if the block is not full or 8 (one past
-maximum local slot id) otherwise.
-
-Following is a sketch of the possible steps to execute when searching for the
-matching stamp in a single block.
-
-| *Example will use input stamp 0x5E and a 64-bit status bytes word with one 
empty
-  slot:*
-| *0x 4B17 5E3A 5E2B 1180*
-
-1. [1 instruction] Replicate stamp to all bytes by multiplying it by 0x 0101 
0101
-   0101 0101.
-
-   | *We obtain: 0x 5E5E 5E5E 5E5E 5E5E.*
-
-2. [1 instruction] XOR replicated stamp with status bytes word. Bytes 
corresponding
-   to a matching stamp will be 0, bytes corresponding to empty slots will have 
a
-   value between 128 and 255, bytes corresponding to non-matching non-empty 
slots
-   will have a value between 1 and 127.
-
-   | *We obtain: 0x 1549 0064 0075 4FDE.*
-
-3. [2 instructions] In the next step we want to have information about a match 
in
-   the highest bit of each byte. We can ignore here empty slot bytes, because 
they
-   will be taken care of at a later step. Set the highest bit in each byte (OR 
with
-   0x 8080 8080 8080 8080) and then subtract 1 from each byte (subtract 0x 
0101 0101
-   0101 0101 from 64-bit word). Now if a byte corresponds to a non-empty slot 
then
-   the highest bit 0 indicates a match and 1 indicates a miss.
-
-   | *We obtain: 0x 95C9 80E4 80F5 CFDE,*
-   | *then 0x 94C8 7FE3 7FF4 CEDD.*
-
-4. [3 instructions] In the next step we want to obtain in each byte one of two
-   values: 0x80 if it is either an empty slot or a match, 0x00 otherwise. We do
-   it in three steps: NOT the result of the previous step to change the meaning
-   of the highest bit; OR with the original status word to set highest bit in a
-   byte to 1 for empty slots; mask out everything other than the highest bits 
in
-   all bytes (AND with 0x 8080 8080 8080 8080).
-
-   | *We obtain: 6B37 801C 800B 3122,*
-   | *then 6B37 DE3E DE2B 31A2,*
-   | *finally 0x0000 8000 8000 0080.*
-
-5. [2 instructions] Finally, use leading zero bits count and divide it by 8 to
-   find an index of the last byte that corresponds either to a match or an 
empty
-   slot. If the leading zero count intrinsic returns 64 for a 64-bit input 
zero,
-   then after dividing by 8 we will also get the desired answer in case of a 
full
-   block without any matches.
-
-   | *We obtain: 16,*
-   | *then 2 (index of the first slot within the block that matches the 
stamp).*
-
-If SIMD instructions with 64-bit lanes are available, multiple single block
-searches for different keys can be executed together. For instance AVX2
-instruction set allows to process quadruplets of 64-bit values in a single
-instruction, four searches at once.
-
-Complete search potentially across multiple blocks
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Full implementation of a search for a matching key may involve visiting 
multiple
-blocks beginning with the start block selected based on the hash of the key. We
-move to the next block modulo the number of blocks, whenever we do not find a
-match in the current block and the current block is full. The search may also
-involve visiting one or more slots in each block. Visiting in this case means
-calling a comparison callback to verify the match whenever a slot with a 
matching
-stamp is encountered. Eventually the search stops when either:
-
-- the matching key is found in one of the slots matching the stamp, or
-- an empty slot is reached. This is illustrated in the diagram below:
-
-.. image:: img/swiss_table_9.jpg
-
-Optimistic processing with two passes
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Hash table lookups may have high cost in the pessimistic case, when we 
encounter
-cases of hash collisions and full blocks that lead to visiting further blocks. 
In
-the majority of cases we can expect an optimistic situation - the start block 
is
-not full, so we will only visit this one block, and all stamps in the block are
-different, so we will need at most one comparison to find a match. We can 
expect
-about 90% of the key lookups for an existing key to go through the optimistic
-path of processing. For that reason it pays off to optimize especially for this
-90% of inputs.
-
-Lookups in Swiss table are split into two passes over an input batch of keys. 
The
-**first pass: fast-path lookup**, is a highly optimized, vectorized,
-SIMD-friendly, branch-free code that fully handles optimistic cases. The 
**second
-pass: slow-path lookup**, is normally executed only for the selection of inputs
-that have not been finished in the first pass, although it can also be called
-directly on all of the inputs, skipping fast-path lookup. It handles all 
special
-cases and inserts but in order to be robust it is not as efficient as 
fast-path.
-Slow-path lookup does not need to repeat the work done in fast-path lookup - it
-can use the state reached at the end of fast-path lookup as a starting point.
-
-Fast-path lookup implements search only for the first stamp match and only 
within
-the start block. It only makes sense when we already have at least one key
-inserted into the hash table, since it does not handle inserts. It takes a 
vector
-of key hashes as an input and based on it outputs three pieces of information 
for
-each key:
-
-- Key id corresponding to the slot in which a matching stamp was found. Any 
valid
-  key id if a matching stamp was not found.
-- A flag indicating if a match was found or not.
-- Slot id of a slot from which slow-path should pick up the search if the first
-  match was either not found or it turns out to be false positive after
-  evaluating key comparison.
+The Java implementation exposes some capabilities from Arrow datasets.  These 
use Acero implicitly.  There
+are no direct bindings to Acero or Substrait in the Java implementation today.
 
-.. note::
-   Improvement to consider: precomputing 1st pass lookup results.
-
-   If the hash table is small, the number of inserted keys is small, we could
-   further simplify and speed-up the first pass by storing in a lookup table
-   pre-computed results for all combinations of hash bits. Let us consider the
-   case of Swiss table of size 5 that has 256 slots and up to 128 inserted 
keys.
-   Only 12 bits of hash are used by lookup in that case: 5 to select a block, 7
-   to create a stamp. For all :math:`2^{12}` combinations of those bits we 
could
-   keep the result of first pass lookup in an array. Key id and a match
-   indicating flag can use one byte: 7 bits for key id and 1 bit for the flag.
-   Note that slot id is only needed if we go into 2nd pass lookup, so it can be
-   stored separately and likely only accessed by a small subset of keys.
-   Fast-path lookup becomes almost a single fetch of result from a 4KB array.
-   Lookup arrays used to implement this need to be kept in sync with the main
-   copy of data about slots, which requires extra care during inserts. Since 
the
-   number of entries in lookup arrays is much higher than the number of slots,
-   this technique only makes sense for small hash tables.
-
-Dense comparisons
-~~~~~~~~~~~~~~~~~
-
-If there is at least one key inserted into a hash table, then every slot 
contains
-a key id value that corresponds to some actual key that can be used in
-comparison. That is because empty slots are initialized with 0 as their key id.
-After the fast-path lookup we get a match-found flag for each input. If it is
-set, then we need to run a comparison of the input key with the key in the hash
-table identified by key id returned by fast-path code. The comparison will 
verify
-that there is a true match between the keys. We only need to do this for a
-subset of inputs that have a match candidate, but since we have key id values
-corresponding to some real key for all inputs, we may as well execute
-comparisons on all inputs unconditionally. If the majority (e.g. more than 80%)
-of the keys have a match candidate, the cost of evaluating comparison for the
-remaining fraction of keys but without filtering may actually be cheaper than 
the
-cost of running evaluation only for required keys while referencing filter
-information. This can be seen as a variant of general preconditioning 
techniques
-used to avoid diverging conditional branches in the code. It may be used, based
-on some heuristic, to verify matches reported by fast-path lookups and is
-referred to as **dense comparisons**.
-
-Resizing
---------
-
-New hash table is initialized as empty and has only a single block with a space
-for only a few key entries. Doubling of the hash table size becomes necessary 
as
-more keys get inserted. It is invoked during the 2nd pass of the lookups, which
-also handles inserts. It happens immediately after the number of inserted keys
-reaches a specific upper limit decided based on a current size of the hash 
table.
-There may still be unprocessed entries from the input mini-batch after 
resizing,
-so the 2nd pass of the lookup is restarted right after, with the bigger hash
-table and the remaining subset of unprocessed entries.
-
-Current policy, that should work reasonably well, is to resize a small hash 
table
-(up to 8KB) when it is 50% full. Larger hash tables are resized when 75% full.
-We want to keep size in memory as small as possible, while maintaining a low
-probability of blocks becoming full.
-
-When discussing resizing we will be talking about **resize source** and 
**resize
-target** tables. The diagram below shows how the same hash bits are interpreted
-differently by the source and the target.
-
-.. image:: img/swiss_table_10.jpg
-
-For a given hash, if a start block id was L in the source table, it will be
-either (2*L+0) or (2*L+1) in the target table. Based on that we can expect data
-access locality when migrating the data between the tables.
-
-Resizing is cheap also thanks to the fact that hash values for keys in the hash
-table are kept together with other slot data and do not need to be recomputed.
-That means that resizing procedure does not ever need to access the actual 
bytes
-of the key.
-
-1st pass
-~~~~~~~~
-
-Based on the hash value for a given slot we can tell whether this slot contains
-an overflow or non-overflow entry. In the first pass we go over all source 
slots
-in sequence, filter out overflow entries and move to the target table all other
-entries. Non-overflow entries from a block L will be distributed between blocks
-(2*L+0) and (2*L+1) of the target table. None of these target blocks can
-overflow, since they will be accommodating at most 8 input entries during this
-pass.
-
-For every non-overflow entry, the highest bit of a stamp in the source slot
-decides whether it will go to the left or to the right target block. It is
-further possible to avoid any conditional branches in this partitioning code, 
so
-that the result is friendly to the CPU execution pipeline.
-
-.. image:: img/swiss_table_11.jpg
-
-2nd pass
-~~~~~~~~
-
-In the second pass of resizing, we scan all source slots again, this time
-focusing only on the overflow entries that were all skipped in the 1st pass. We
-simply reinsert them in the target table using generic insertion code with one
-exception. Since we know that all the source keys are different, there is no
-need to search for a matching stamp or run key comparisons (or look at the key
-values). We just need to find the first open block beginning with the start
-block in the target table and use its first empty slot as the insert
-destination.
-
-We expect overflow entries to be rare and therefore the relative cost of that
-pass should stay low.
+Design Philosophies
+===================
+
+Engine Independent Compute
+--------------------------
+
+If a node requires complex computation then it should encapsulate that work in 
abstractions that don't depend on
+any particular engine design.  For example, the hash join node uses utilities 
such as a row encoder, a hash table,
+and an exec batch builder.  Other places share implementations of sequencing 
queues and row segmenters.  The node
+itself should be kept minimal and simply maps from Acero to the abstraction.
+
+This helps to decouple designs from Acero's design details and allows them to 
be more resilient to changes in the
+engine.  It also helps to promote these abstractions as capabilities on their 
own.  Either for use in other engines
+or for potential new additions to pyarrow as compute utilities.
+
+Make Tasks not Threads
+----------------------
+
+If you need to run something in parallel then you should use thread tasks and 
not dedicated threads.
+
+ * This keeps the thread count down (reduces thread contention and context 
switches)
+ * This prevents deadlock (tasks get cancelled automatically in the event of a 
failure)
+ * This simplifies profiling (Tasks can be easily measured, easier to know 
where all the work is)
+ * This makes it possible to run without threads (sometimes users are doing 
their own threading and
+   sometimes we need to run in thread-restricted environments like emscripten)
+
+Note: we do not always follow this advice currently.  There is a dedicated 
process thread in the asof join
+node.  Dedicated threads are "ok" for experimental use but we'd like to 
migrate away from them.
+
+Don't Block on CPU Threads
+--------------------------
+
+If you need to run a potentially long running activity that is not actively 
using CPU resources (e.g. reading from
+disk, network I/O, waiting on an external library using its own threads) then 
you should use asynchronous utilities
+to ensure that you do not block CPU threads.
+
+Don't Reinvent the Wheel
+------------------------
+
+Each node should not be a standalone island of utilities.  Where possible, 
computation should be pushed
+either into compute functions or into common shared utilities.  This is the 
only way a project as large as
+this can hope to be maintained.
+
+Avoid Query Optimization
+------------------------
+
+Writing an efficient Acero plan can be challenging.  For example, filter 
expressions and column selection
+should be pushed down into the scan node so that the data isn't read from 
disk.  Expressions should be
+simplified and common sub-expressions factored out.  The build side of a hash 
join node should be the
+smaller of the two inputs.
+
+However, figuring these problems out is a challenge reserved for a query 
planner or a query optimizer.
+Creating a query optimizer is a challenging task beyond the scope of Acero.  
With adoption of Substrait
+we hope utilities will eventually emerge that solve these problems.  As a 
result, we generally avoid doing
+any kind of query optimization within Acero.  Acero should interpret 
declarations as literally as possible.
+This helps reduce maintenance and avoids surprises.
+
+We also realize that this is not always possible.  For example, the hash join 
node currently detects if there
+is a chain of hash join operators and, if there is, it configure bloom filters 
between the operators.  This is
+technically a task that could be left to a query optimizer.  However, this 
behavior is rather specific to Acero
+and fairly niche and so it is unlikely it will be introduced to an optimizer 
anytime soon.
+
+Performance Guidelines
+======================
+
+Batch Size
+----------
+
+Perhaps the most discussed performance criteria is batch size.  Acero was 
originally
+designed based on research to follow a morsel-batch model.  Tasks are created 
based on
+a large batch of rows (a morsel).  The goal is for the morsel to be large 
enough to justify
+the overhead of a task.  Within a task the data is further subdivided into 
batches.
+Each batch should be small enough to fit comfortable into CPU cache (often the 
L2 cache).
+
+This sets up two loops.  The outer loop is parallel and the inner loop is not:
+
+.. code:: python
+
+  for morsel in dataset: # parallel
+    for batch in morsel:
+      run_pipeline(batch)
+
+The advantage of this style of execution is that successive nodes (or 
successive operations
+within an exec node) that access the same column are likely to benefit from 
cache.  It also
+is essential for functions that require random access to data.  It maximizes 
parallelism while
+minimizing the data transfer from main memory to CPU cache.
+
+.. figure:: img/microbatch.svg
+
+   If multiple passes through the data are needed (or random access) and the 
batch is much bigger
+   then the cache then performance suffers.  Breaking the task into smaller 
batches helps improve
+   task locality.
+
+The morsel/batch model is reflected in a few places in Acero:
+
+ * In most source nodes we will try and grab batches of 1Mi rows.  This is 
often configurable.
+ * In the source node we then iterate and slice off batches of 32Ki rows.  
This is not currently
+   configurable.
+ * The hash join node currently requires that a batches contain at 32Ki rows 
or less as it uses
+   16-bit signed integers as row indices in some places.
+
+However, this guidance is debateable.  Profiling has shown that we do not get 
any real benefit
+from moving to a smaller batch size.  It seems any advantage we do get is lost 
in per-batch
+overhead.  Most of this overhead appears to be due to various per-batch 
allocations.  In addition,
+depending on your hardware, it's not clear that CPU Cache<->RAM will always be 
the bottleneck.  A
+combination of linear access, pre-fetch, and high CPU<->RAM bandwidth can 
alleviate the penalty
+of cache misses.
+
+As a result, this section is included in the guide to provide historical 
context, but should not
+be considered binding.
+
+Ongoing & Deprecated Work
+=========================
+
+The following efforts are ongoing.  They are described here to explain certain 
duplication in the
+code base as well as explain types that are going away.
+
+Scanner v2
+----------
+
+The scanner is currently a node in the datasets module registered with the 
factory registry as "scan".
+This node was written prior to Acero and made extensive use of AsyncGenerator 
to scan multiple files
+in parallel.  Unfortunately, the use of AsyncGenerator made the scan difficult 
to profile, difficult
+to debug, and impossible to cancel.  A new scan node is in progress.  It is 
currently registered with
+the name "scan2".  The new scan node uses the AsyncTaskScheduler instead of 
AsyncGenerator and should
+provide additional features such as the ability to skip rows and handle nested 
column projection (for
+formats that support it)
+
+OrderBySink and SelectKSink
+---------------------------
+
+These two exec nodes provided custom sink implementations.  They were written 
before ordered execution
+was added to Acero and were the only way to generate ordered output.  However, 
they had to be placed
+at the end of a plan and the fact that they were custom sink nodes made them 
difficult to describe with
+Declaration.  The OrderByNode and FetchNode replace these.  These are kept at 
the moment until existing
+bindings move away from them.
+
+Upstreaming Changes
+===================
+
+Acero is designed so that it can be extended without recompilation.  You can 
easily add new compute
+functions and exec nodes without creating a fork or compiling Acero.  However, 
as you develop new
+features that are generally useful, we hope you will make time to upstream 
your changes.
+
+Even though we welcome these changes we have to admit that there is a cost to 
this process.  Upstreaming
+code requires that the new module behave correctly, but that is typically the 
easier part to review.
+More importantly, upstreaming code is a process of transferring the 
maintenance burden from yourself to
+the wider Arrow C++ project maintainers.  This requires a deep understanding 
of the code by maintainers,
+it requires the code be consistent with the style of the project, and it 
requires that the code be well
+tested with unit tests to aid in regression.
+
+Because of this, we highly recommend taking the following steps:
+
+* As you are starting out you should send a message to the mailing list 
announcing your intentions and
+  design.  This will help you determine if there is wider interest in the 
feature and others may have
+  ideas or suggestions to contribute early on in the process.
+
+  * If there is not much interest in the feature then keep in mind that it may 
be difficult to eventually
+    upstream the change.  The maintenance capacity of the team is limited and 
we try and prioritize
+    features that are in high demand.
+
+* We recommend developing and testing the change on your own fork until you 
get it to a point where you
+  are fairly confident things are working correctly.  If the change is large 
then you might also think
+  about how you can break up the change into smaller pieces.  As you do this 
you can share both the larger
+  PR (as a draft PR or a branch on your local fork) and the smaller PRs.  This 
way we can see the context
+  of the smaller PRs.  However, if you do break things up, smaller PRs should 
still ideally stand on their
+  own.
+
+* Any PR will need to have the following:
+
+  * Unit tests converting the new functionality
+
+  * Microbenchmarks if there is any significant compute work going on
+
+  * Examples demonstrating how to use the new feature
+
+  * Updates to the API reference and this guide
+
+  * Passing CI (you can enable GitHub Actions on your fork and that will allow 
most CI jobs to run before
+    you create your PR)
+
+Others
+======
+
+.. toctree::
+   :maxdepth: 2
+
+   acero/swiss_table
diff --git a/docs/source/developers/cpp/img/swiss_table_1.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_1.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_1.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_1.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_10.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_10.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_10.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_10.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_11.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_11.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_11.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_11.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_2.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_2.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_2.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_2.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_3.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_3.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_3.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_3.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_4.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_4.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_4.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_4.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_5.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_5.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_5.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_5.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_6.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_6.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_6.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_6.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_7.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_7.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_7.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_7.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_8.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_8.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_8.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_8.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_9.jpg 
b/docs/source/developers/cpp/acero/img/swiss_table_9.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_9.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_9.jpg
diff --git a/docs/source/developers/cpp/acero.rst 
b/docs/source/developers/cpp/acero/swiss_table.rst
similarity index 98%
copy from docs/source/developers/cpp/acero.rst
copy to docs/source/developers/cpp/acero/swiss_table.rst
index 688cfa9d5d..b590102306 100644
--- a/docs/source/developers/cpp/acero.rst
+++ b/docs/source/developers/cpp/acero/swiss_table.rst
@@ -16,12 +16,8 @@
 .. under the License.
 
 .. highlight:: console
-.. _development-cpp-acero:
-
-================
-Developing Acero
-================
 
+===========
 Swiss Table
 ===========
 
@@ -30,7 +26,7 @@ key field values to a dense set of integer ids. Ids can later 
be used in place
 of keys to identify groups of rows with equal keys.
 
 Introduction
-------------
+============
 
 Hash group-by in Arrow uses a variant of a hash table based on a data structure
 called Swiss table. Swiss table uses linear probing. There is an array of slots
@@ -49,7 +45,7 @@ from likening resulting sequences of empty slots to holes in 
a one dimensional
 cheese.
 
 Interface
----------
+=========
 
 Hash table used in query processing for implementing join and group-by 
operators
 does not need to provide all of the operations that a general purpose hash 
table
@@ -121,7 +117,7 @@ is actually no need for a hash function callback. It is 
enough that the caller
 provides hash values for all entries in the batch when calling 
lookup-or-insert.
 
 Basic architecture and organization of data
--------------------------------------------
+===========================================
 
 The hash table is an array of **slots**. Slots are grouped in groups of 8 
called
 **blocks**. The number of blocks is a power of 2. The empty hash table starts
@@ -210,7 +206,7 @@ in that range. This is useful when resizing a hash table or 
merging two hash
 tables together.
 
 Interleaving status bytes and key ids
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-------------------------------------
 
 Status bytes and key ids for all slots are stored in a single array of bytes.
 They are first grouped by 8 into blocks, then each block of status bytes is
@@ -265,7 +261,7 @@ impact the lookups outside these events.
    number of slots, but that should be a small fraction of the total size.
 
 32-bit hash vs 64-bit hash
-~~~~~~~~~~~~~~~~~~~~~~~~~~
+--------------------------
 
 Currently we use 32-bit hash values in Swiss table code and 32-bit integers as
 key ids. For the robust implementation, sooner or later we will need to support
@@ -278,7 +274,7 @@ of false positives during lookups will start quickly 
growing. 32-bit hash should
 not be used with more than about 16 million inserted keys.
 
 Low memory footprint and low chance of hash collisions
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+------------------------------------------------------
 
 Swiss table is a good choice of a hash table for modern hardware, because it
 combines lookups that can take advantage of special CPU instructions with space
@@ -310,7 +306,7 @@ of comparisons per lookup being close to 1 for keys already 
present and 0 for
 new keys.
 
 Lookup
-------
+======
 
 Lookup-or-insert operation, given a hash of a key, finds a list of candidate
 slots with corresponding keys that are likely to be equal to the input key. The
@@ -328,7 +324,7 @@ involved, because of vectorization of the processing and 
various optimizations
 for common cases.
 
 Search within a single block
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+----------------------------
 
 There are three possible cases that can occur when searching for a match for a
 given key (that is, for a given stamp of a key) within a single block,
@@ -409,7 +405,7 @@ instruction set allows to process quadruplets of 64-bit 
values in a single
 instruction, four searches at once.
 
 Complete search potentially across multiple blocks
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+--------------------------------------------------
 
 Full implementation of a search for a matching key may involve visiting 
multiple
 blocks beginning with the start block selected based on the hash of the key. We
@@ -425,7 +421,7 @@ stamp is encountered. Eventually the search stops when 
either:
 .. image:: img/swiss_table_9.jpg
 
 Optimistic processing with two passes
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-------------------------------------
 
 Hash table lookups may have high cost in the pessimistic case, when we 
encounter
 cases of hash collisions and full blocks that lead to visiting further blocks. 
In
@@ -479,7 +475,7 @@ each key:
    this technique only makes sense for small hash tables.
 
 Dense comparisons
-~~~~~~~~~~~~~~~~~
+-----------------
 
 If there is at least one key inserted into a hash table, then every slot 
contains
 a key id value that corresponds to some actual key that can be used in
@@ -500,7 +496,7 @@ on some heuristic, to verify matches reported by fast-path 
lookups and is
 referred to as **dense comparisons**.
 
 Resizing
---------
+========
 
 New hash table is initialized as empty and has only a single block with a space
 for only a few key entries. Doubling of the hash table size becomes necessary 
as
@@ -532,7 +528,7 @@ That means that resizing procedure does not ever need to 
access the actual bytes
 of the key.
 
 1st pass
-~~~~~~~~
+--------
 
 Based on the hash value for a given slot we can tell whether this slot contains
 an overflow or non-overflow entry. In the first pass we go over all source 
slots
@@ -550,7 +546,7 @@ that the result is friendly to the CPU execution pipeline.
 .. image:: img/swiss_table_11.jpg
 
 2nd pass
-~~~~~~~~
+--------
 
 In the second pass of resizing, we scan all source slots again, this time
 focusing only on the overflow entries that were all skipped in the 1st pass. We
diff --git a/docs/source/cpp/acero/async.md 
b/docs/source/developers/cpp/img/async.md
similarity index 100%
rename from docs/source/cpp/acero/async.md
rename to docs/source/developers/cpp/img/async.md
diff --git a/docs/source/cpp/acero/async.svg 
b/docs/source/developers/cpp/img/async.svg
similarity index 100%
rename from docs/source/cpp/acero/async.svg
rename to docs/source/developers/cpp/img/async.svg
diff --git a/docs/source/cpp/acero/dist_plan.svg 
b/docs/source/developers/cpp/img/dist_plan.svg
similarity index 100%
rename from docs/source/cpp/acero/dist_plan.svg
rename to docs/source/developers/cpp/img/dist_plan.svg
diff --git a/docs/source/cpp/acero/microbatch.svg 
b/docs/source/developers/cpp/img/microbatch.svg
similarity index 100%
rename from docs/source/cpp/acero/microbatch.svg
rename to docs/source/developers/cpp/img/microbatch.svg
diff --git a/docs/source/cpp/acero/ordered.svg 
b/docs/source/developers/cpp/img/ordered.svg
similarity index 100%
rename from docs/source/cpp/acero/ordered.svg
rename to docs/source/developers/cpp/img/ordered.svg
diff --git a/docs/source/cpp/acero/pipeline.svg 
b/docs/source/developers/cpp/img/pipeline.svg
similarity index 100%
rename from docs/source/cpp/acero/pipeline.svg
rename to docs/source/developers/cpp/img/pipeline.svg
diff --git a/docs/source/cpp/acero/pipeline_task.svg 
b/docs/source/developers/cpp/img/pipeline_task.svg
similarity index 100%
rename from docs/source/cpp/acero/pipeline_task.svg
rename to docs/source/developers/cpp/img/pipeline_task.svg
diff --git a/docs/source/java/substrait.rst b/docs/source/java/substrait.rst
index fa20dbd61d..5048552613 100644
--- a/docs/source/java/substrait.rst
+++ b/docs/source/java/substrait.rst
@@ -19,7 +19,7 @@
 Substrait
 =========
 
-The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero 
<../cpp/streaming_execution>`
+The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero 
<../cpp/acero>`
 query engine.
 
 .. contents::
@@ -199,5 +199,5 @@ This Java program:
 
 .. _`Substrait`: https://substrait.io/
 .. _`Substrait Java`: https://github.com/substrait-io/substrait-java
-.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
+.. _`Acero`: https://arrow.apache.org/docs/cpp/acero.html
 .. _`Extended Expression`: 
https://github.com/substrait-io/substrait/blob/main/site/docs/expressions/extended_expression.md
diff --git a/docs/source/python/api/acero.rst b/docs/source/python/api/acero.rst
index a8107f3cc1..b75fc33061 100644
--- a/docs/source/python/api/acero.rst
+++ b/docs/source/python/api/acero.rst
@@ -46,7 +46,7 @@ and to execute this efficiently in a batched manner.
 
 .. seealso::
 
-   :doc:`Acero C++ user guide <../../cpp/streaming_execution>`
+   :doc:`Acero C++ user guide <../../cpp/acero>`
 
    :ref:`api.substrait`
       Alternative way to run Acero from a standardized Substrait plan.
diff --git a/docs/source/python/integration/substrait.rst 
b/docs/source/python/integration/substrait.rst
index eaa6151e4d..8602e073f5 100644
--- a/docs/source/python/integration/substrait.rst
+++ b/docs/source/python/integration/substrait.rst
@@ -23,7 +23,7 @@ The ``arrow-substrait`` module implements support for the 
Substrait_ format,
 enabling conversion to and from Arrow objects.
 
 The ``arrow-dataset`` module can execute Substrait_ plans via the
-:doc:`Acero <../cpp/streaming_execution>` query engine.
+:doc:`Acero <../cpp/acero>` query engine.
 
 .. contents::
 
@@ -245,5 +245,5 @@ the expressions can be passed to the dataset scanner in the 
form of
 
 .. _`Substrait`: https://substrait.io/
 .. _`Substrait Python`: https://github.com/substrait-io/substrait-python
-.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
+.. _`Acero`: https://arrow.apache.org/docs/cpp/acero.html
 .. _`Extended Expression`: 
https://github.com/substrait-io/substrait/blob/main/site/docs/expressions/extended_expression.md

Reply via email to