This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 8590bc621a GH-46475: [Documentation][C++][Compute] Consolidate Acero
developer docs (#46476)
8590bc621a is described below
commit 8590bc621a868212bc8a184d0a1022f2ba87fd56
Author: Rossi Sun <[email protected]>
AuthorDate: Wed May 21 08:16:26 2025 -0700
GH-46475: [Documentation][C++][Compute] Consolidate Acero developer docs
(#46476)
### What changes are included in this PR?
1. Move current Acero Developer's Guild under user doc to where under
developer docs;
2. Make it the main doc enclosing elaborating docs like Swiss table.
### Are these changes tested?
No need.
### Are there any user-facing changes?
None.
* GitHub Issue: #46475
Lead-authored-by: Rossi Sun <[email protected]>
Co-authored-by: Bryce Mecum <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
.../cpp/{streaming_execution.rst => acero.rst} | 6 +-
docs/source/cpp/acero/developer_guide.rst | 692 -----------
docs/source/cpp/user_guide.rst | 2 +-
docs/source/developers/cpp/acero.rst | 1201 +++++++++++---------
.../cpp/{ => acero}/img/swiss_table_1.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_10.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_11.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_2.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_3.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_4.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_5.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_6.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_7.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_8.jpg | Bin
.../cpp/{ => acero}/img/swiss_table_9.jpg | Bin
.../cpp/{acero.rst => acero/swiss_table.rst} | 34 +-
.../{cpp/acero => developers/cpp/img}/async.md | 0
.../{cpp/acero => developers/cpp/img}/async.svg | 0
.../acero => developers/cpp/img}/dist_plan.svg | 0
.../acero => developers/cpp/img}/microbatch.svg | 0
.../{cpp/acero => developers/cpp/img}/ordered.svg | 0
.../{cpp/acero => developers/cpp/img}/pipeline.svg | 0
.../acero => developers/cpp/img}/pipeline_task.svg | 0
docs/source/java/substrait.rst | 4 +-
docs/source/python/api/acero.rst | 2 +-
docs/source/python/integration/substrait.rst | 4 +-
26 files changed, 694 insertions(+), 1251 deletions(-)
diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/acero.rst
similarity index 90%
rename from docs/source/cpp/streaming_execution.rst
rename to docs/source/cpp/acero.rst
index 1cd1540428..ddeffffa74 100644
--- a/docs/source/cpp/streaming_execution.rst
+++ b/docs/source/cpp/acero.rst
@@ -27,6 +27,11 @@ Acero: A C++ streaming execution engine
Acero is experimental and a stable API is not yet guaranteed.
+.. note::
+
+ If you are interested in contributing to Acero or learning about its
+ internals, please see the :doc:`Acero Developer's
Guide<../developers/cpp/acero>`.
+
For many complex computations, successive direct :ref:`invocation of
compute functions <invoking-compute-functions>` is not feasible
in either memory or computation time. To facilitate arbitrarily large inputs
@@ -44,4 +49,3 @@ be formulated and executed.
acero/overview
acero/user_guide
acero/substrait
- acero/developer_guide
diff --git a/docs/source/cpp/acero/developer_guide.rst
b/docs/source/cpp/acero/developer_guide.rst
deleted file mode 100644
index 7dd08fe3ce..0000000000
--- a/docs/source/cpp/acero/developer_guide.rst
+++ /dev/null
@@ -1,692 +0,0 @@
-.. Licensed to the Apache Software Foundation (ASF) under one
-.. or more contributor license agreements. See the NOTICE file
-.. distributed with this work for additional information
-.. regarding copyright ownership. The ASF licenses this file
-.. to you under the Apache License, Version 2.0 (the
-.. "License"); you may not use this file except in compliance
-.. with the License. You may obtain a copy of the License at
-
-.. http://www.apache.org/licenses/LICENSE-2.0
-
-.. Unless required by applicable law or agreed to in writing,
-.. software distributed under the License is distributed on an
-.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-.. KIND, either express or implied. See the License for the
-.. specific language governing permissions and limitations
-.. under the License.
-
-.. default-domain:: cpp
-.. highlight:: cpp
-.. cpp:namespace:: arrow::acero
-
-=================
-Developer's Guide
-=================
-
-This page goes into more detail into the design of Acero. It discusses how
-to create custom exec nodes and describes some of the philosophies behind
Acero's
-design and implementation. Finally, it gives an overview of how to extend
Acero
-with new behaviors and how this new behavior can be upstreamed into the core
Arrow
-repository.
-
-Understanding ExecNode
-======================
-
-ExecNode is an abstract class with several pure virtual methods that control
how the node operates:
-
-:func:`ExecNode::StartProducing`
---------------------------------
-
-This method is called once at the start of the plan. Most nodes ignore this
method (any
-necessary initialization should happen in the constructor or Init). However,
source nodes
-will typically provide a custom implementation. Source nodes should schedule
whatever tasks
-are needed to start reading and providing the data. Source nodes are usually
the primary
-creator of tasks in a plan.
-
-.. note::
- The ExecPlan operates on a push-based model. Sources are often pull-based.
For example,
- your source may be an iterator. The source node will typically then
schedule tasks to pull one
- item from the source and push that item into the source's output node (via
``InputReceived``).
-
-Examples
-^^^^^^^^
-
-* In the ``table_source`` node the input table is divided into batches. A
task is created for
- each batch and that task calls ``InputReceived`` on the node's output.
-* In the ``scan`` node a task is created to start listing fragments from the
dataset. Each listing
- task then creates tasks to read batches from the fragment, asynchronously.
When the batch is
- full read in then a continuation schedules a new task with the exec plan.
This task calls
- ``InputReceived`` on the scan node's output.
-
-:func:`ExecNode::InputReceived`
--------------------------------
-
-This method is called many times during the execution of a plan. It is how
nodes pass data
-to each other. An input node will call InputReceived on its output. Acero's
execution model
-is push-based. Each node pushes data into its output by calling InputReceived
and passing in
-a batch of data.
-
-The InputReceived method is often where the actual work happens for the node.
For example,
-a project node will execute its expressions and create a new expanded output
batch. It will then
-call InputReceived on its output. InputReceived will never be called on a
source node. Sink
-nodes will never call InputReceived. All other nodes will experience both.
-
-Some nodes (often called "pipeline breakers") must accumulate input before
they can generate
-any output. For example, a sort node must accumulate all input before it can
sort the data and
-generate output. In these nodes the InputReceived method will typically place
the data into
-some kind of accumulation queue. If the node doesn't have enough data to
operate then it will
-not call InputReceived. This will then be the end of the current task.
-
-Examples
-^^^^^^^^
-
-* The ``project`` node runs its expressions, using the received batch as input
for the expression.
- A new batch is created from the input batch and the result of the
expressions. The new batch is
- given the same order index as the input batch and the node then calls
``InputReceived`` on its
- output.
-* The ``order_by`` node inserts the batch into an accumulation queue. If this
was the last batch
- then the node will sort everything in the accumulation queue. The node will
then call
- ``InputReceived`` on the output for each batch in the sorted result. A new
batch index will be
- assigned to each batch. Note that this final output step might also occur
as a result of a call
- to ``InputFinished`` (described below).
-
-:func:`ExecNode::InputFinished`
--------------------------------
-
-This method will be called once per input. A node will call InputFinished on
its output once it
-knows how many batches it will be sending to that output. Normally this
happens when the node is
-finished working. For example, a scan node will call InputFinished once it
has finished reading
-its files. However, it could call it earlier if it knows (maybe from file
metadata) how many
-batches will be created.
-
-Some nodes will use this signal to trigger some processing. For example, a
sort node need to
-wait until it has received all of its input before it can sort the data. It
relies on the InputFinished
-call to know this has happened.
-
-Even if a node is not doing any special processing when finished (e.g. a
project node or filter node
-doesn't need to do any end-of-stream processing) that node will still call
InputFinished on its
-output.
-
-.. warning::
- The InputFinished call might arrive before the final call to InputReceived.
In fact, it could
- even be sent out before any calls to InputReceived begin. For example, the
table source node
- always knows exactly how many batches it will be producing. It could
choose to call InputFinished
- before it ever calls InputReceived. If a node needs to do "end-of-stream"
processing then it typically
- uses an AtomicCounter which is a helper class to figure out when all of the
data has arrived.
-
-Examples
-^^^^^^^^
-
-* The ``order_by`` checks to see if it has already received all its batches.
If it has then it performs
- the sorting step described in the ``InputReceived`` example. Before it
starts sending output data it
- checks to see how many output batches it has (it's possible the batch size
changed as part of the
- accumulating or sorting) and calls ``InputFinished`` on the node's output.
-* The ``fetch`` node, during a call to ``InputReceived`` realizes it has
received all the rows it was
- asked for. It calls ``InputFinished`` on its output immediately (even
though its own ``InputFinished``
- method has not yet been called)
-
-:func:`ExecNode::PauseProducing` / :func:`ExecNode::ResumeProducing`
----------------------------------------------------------------------
-
-These methods control backpressure. Some nodes may need to pause their input
to avoid accumulating
-too much data. For example, when the user is consuming the plan with a
RecordBatchReader we use a
-SinkNode. The SinkNode places data in a queue that the RecordBatchReader
pulls from (this is a
-conversion from a push-model to a pull-model). If the user is reading the
RecordBatchReader slowly then
-it is possible this queue will start to fill up. For another example we can
consider the write node.
-This node writes data to a filesystem. If the writes are slow then data might
accumulate at the
-write node. As a result, the write node would need to apply backpressure.
-
-When a node realizes that it needs to apply some backpressure it will call
PauseProducing on its input.
-Once the node has enough space to continue it will then call ResumeProducing
on its input. For example,
-the SinkNode would pause when its queue gets too full. As the user continues
to read from the
-RecordBatchReader we can expect the queue to slowly drain. Once the queue has
drained enough then the
-SinkNode can call ResumeProducing.
-
-Source nodes typically need to provide special behavior for PauseProducing and
ResumeProducing. For
-example, a scan node that is reading from a file can pause reading the file.
However, some source nodes
-may not be able to pause in any meaningful way. There is not much point in a
table source node pausing
-because its data is already in memory.
-
-Nodes that are neither source or sink should still forward backpressure
signals. For example, when
-PauseProducing is called on a project node it should call PauseProducing on
its input. If a node has
-multiple inputs then it should forward the signal to every input.
-
-Examples
-^^^^^^^^
-
-* The ``write`` node, in its ``InputReceived`` method, adds a batch to a
dataset writer's queue. If the
- dataset writer is then full it will return an unfinished future that will
complete when it has more room.
- The ``write`` node then calls ``PauseProducing`` on its input. It then adds
a continuation to the future
- that will call ``ResumeProducing`` on its input.
-* The ``scan`` node uses an :class:`AsyncTaskScheduler` to keep track of all
the tasks it schedules. This
- scheduler is throttled to limit how much concurrent I/O the ``scan`` node is
allowed to perform. When
- ``PauseProducing`` is called then the node will pause the scheduler. This
means that any tasks queued
- behind the throttle will not be submitted. However, any ongoing I/O will
continue (backpressure can't
- take effect immediately). When ``ResumeProducing`` is called the ``scan``
node will unpause the scheduler.
-
-:func:`ExecNode::StopProducing`
--------------------------------
-
-StopProducing is called when a plan needs to end early. This can happen
because the user cancelled
-the plan and it can happen because an error occurred. Most nodes do not need
to do anything here.
-There is no expectation or requirement that a node sends any remaining data it
has. Any node that
-schedules tasks (e.g. a source node) should stop producing new data.
-
-In addition to plan-wide cancellation, a node may call this method on its
input if it has decided
-that it has received all the data that it needs. However, because of
parallelism, a node may still
-receive a few calls to ``InputReceived`` after it has stopped its input.
-
-If any external resources are used then cleanup should happen as part of this
call.
-
-Examples
-^^^^^^^^
-
-* The ``asofjoin`` node has a dedicated processing thread the communicates
with the main Acero threads
- using a queue. When ``StopProducing`` is called the node inserts a poison
pill into the queue. This
- tells the processing thread to stop immediately. Once the processing thread
stops it marks its external
- task (described below) as completed which allows the plan to finish.
-* The ``fetch`` node, in ``InputReceived``, may decide that it has all the
data it needs. It can then call
- ``StopProducing`` on its input.
-
-Initialization / Construction / Destruction
--------------------------------------------
-
-Simple initialization logic (that cannot error) can be done in the
constructor. If the initialization
-logic may return an invalid status then it can either be done in the exec
node's factory method or
-the ``Init`` method. The factory method is preferred for simple validation.
The ``Init`` method is
-preferred if the initialization might do expensive allocation or other
resource consumption. ``Init`` will
-always be called before ``StartProducing`` is called. Initialization could
also be done in
-``StartProducing`` but keep in mind that other nodes may have started by that
point.
-
-In addition, there is a ``Validate`` method that can be overloaded to provide
custom validation. This
-method is normally called before ``Init`` but after all inputs and outputs
have been added.
-
-Finalization happens today in the destructor. There are a few examples today
where that might be slow.
-For example, in the write node, if there was an error during the plan, then we
might close out some open
-files here. Should there be significant finalization that is either
asynchronous or could potentially
-trigger an error then we could introduce a Finalize method to the ExecNode
lifecycle. It hasn't been
-done yet only because it hasn't been needed.
-
-Summary
--------
-
-.. list-table:: ExecNode Lifecycle
- :widths: 20 40 40
- :header-rows: 1
-
- * - Method Name
- - This is called when...
- - A node calls this when...
- * - StartProducing
- - The plan is starting
- - N/A
- * - InputReceived
- - Data is received from the input
- - To send data to the output
- * - InputFinished
- - The input knows how many batches there are
- - The node can tell its output how many batches there are
- * - StopProducing
- - A plan is aborted or an output has enough data
- - A node has all the data it needs
-
-Extending Acero
-===============
-
-Acero instantiates a singleton :class:`ExecFactoryRegistry` which maps between
names and exec node
-factories (methods which create an ExecNode from options). To create a new
ExecNode you can register
-the node with this registry and your node will now be usable by Acero. If you
would like to be able
-to use this node with Substrait plans you will also need to configure the
Substrait registry so that it
-knows how to map Substrait to your custom node.
-
-This means that you can create and add new nodes to Acero without recompiling
Acero from source.
-
-Scheduling and Parallelism
-==========================
-
-There are many ways in that data engines can utilize multiple compute
resources (e.g. multiple cores).
-Before we get into the details of Acero's scheduling we will cover a few high
level topics.
-
-Parallel Execution of Plans
----------------------------
-
-Users may want to execute multiple plans concurrently and they are welcome to
do so. However, Acero has no
-concept of inter-plan scheduling. Each plan will attempt to maximize its
usage of compute resources and
-there will likely be contention of CPU and memory and disk resources. If
plans are using the default CPU &
-I/O thread pools this will be mitigated somewhat since they will share the
same thread pool.
-
-Locally Distributed Plans
--------------------------
-
-A common way to tackle multi-threading is to split the input into partitions
and then create a plan for
-each partition and then merge the results from these plans in some way. For
example, let's assume you
-have 20 files and 10 cores and you want to read and sort all the data. You
could create a plan for every
-2 files to read and sort those files. Then you could create one extra plan
that takes the input from these
-10 child plans and merges the 10 input streams in a sorted fashion.
-
-This approach is popular because it is how queries are distributed across
multiple servers and so it
-is widely supported and well understood. Acero does not do this today but
there is no reason to prevent it.
-Adding shuffle & partition nodes to Acero should be a high priority and would
enable Acero to be used by
-distributed systems. Once that has been done then it should be possible to do
a local shuffle (local
-meaning exchanging between multiple exec plan instances on a single system) if
desired.
-
-.. figure:: dist_plan.svg
-
- A distributed plan can provide parallelism even if the plans themselves run
serially
-
-Pipeline Parallelism
---------------------
-
-Acero attempts to maximize parallelism using pipeline parallelism. As each
batch of data arrives from the
-source we immediately create a task and start processing it. This means we
will likely start processing
-batch X before the processing of batch X-1 has completed. This is very
flexible and powerful. However, it also
-means that properly implementing an ExecNode is difficult.
-
-For example, an ExecNode's InputReceived method should be reentrant. In other
words, it should be expected
-that InputReceived will be called before the previous call to InputReceived
has completed. This means that
-nodes with any kind of mutable state will need mutexes or similar mechanisms
to protect that state from race
-conditions. It also means that tasks can easily get out of order and nodes
should not expect any particular ordering
-of their input (more on this later).
-
-.. figure:: pipeline.svg
-
- An example of pipeline parallelism on a system with 3 CPU threads and 2 I/O
threads
-
-Asynchronicity
---------------
-
-Some operations take a long time and may not require the CPU. Reading data
from the filesystem is one example. If we
-only have one thread per core then time will be wasted while we wait for these
operations to complete. There
-are two common solutions to this problem. A synchronous solution is often to
create more threads than there are
-cores with the expectation that some of them will be blocked and that is ok.
This approach tends to be simpler
-but it can lead to excess thread contention and requires fine-tuning.
-
-Another solution is to make the slow operations asynchronous. When the slow
operation starts the caller gives up
-the thread and allows other tasks to run in the meantime. Once the slow
operation finishes then a new task is
-created to take the result and continue processing. This helps to minimize
thread contention but tends to be
-more complex to implement.
-
-Due to a lack of standard C++ async APIs, Acero uses a combination of the two
approaches. Acero has two thread pools.
-The first is the CPU thread pool. This thread pool has one thread per core.
Tasks in this thread pool should never
-block (beyond minor delays for synchronization) and should generally be
actively using CPU as much as possible. Threads
-on the I/O thread pool are expected to spend most of the time idle. They
should avoid doing any CPU-intensive work.
-Their job is basically to wait for data to be available and schedule follow-up
tasks on the CPU thread pool.
-
-.. figure:: async.svg
-
- Arrow achieves asynchronous execution by combining CPU & I/O thread pools
-
-.. note::
-
- Most nodes in Acero do not need to worry about asynchronicity. They are
fully synchronous and do not spawn tasks.
-
-Task per Pipeline (and sometimes beyond)
-----------------------------------------
-
-An engine could choose to create a thread task for every execution of a node.
However, without careful scheduling,
-this leads to problems with cache locality. For example, let's assume we have
a basic plan consisting of three
-exec nodes, scan, project, and then filter (this is a very common use case).
Now let's assume there are 100 batches.
-In a task-per-operator model we would have tasks like "Scan Batch 5", "Project
Batch 5", and "Filter Batch 5". Each
-of those tasks is potentially going to access the same data. For example,
maybe the ``project`` and ``filter`` nodes need
-to read the same column. A column which is intially created in a decode phase
of the ``scan`` node. To maximize cache
-utilization we would need to carefully schedule our tasks to ensure that all
three of those tasks are run consecutively
-and assigned to the same CPU core.
-
-To avoid this problem we design tasks that run through as many nodes as
possible before the task ends. This sequence
-of nodes is often referred to as a "pipeline" and the nodes that end the
pipeline (and thus end the task) are often
-called "pipeline breakers". Some nodes might even fall somewhere in between.
For example, in a hash join node, when
-we receive a batch on the probe side, and the hash table has been built, we do
not need to end the task and instead keep
-on running. This means that tasks might sometimes end at the join node and
might sometimes continue past the join node.
-
-.. figure:: pipeline_task.svg
-
- A logical view of pipelines in a plan and two tasks, showing that pipeline
boundaries may vary during a plan
-
-
-Thread Pools and Schedulers
----------------------------
-
-The CPU and I/O thread pools are a part of the core Arrow-C++ library. They
contain a FIFO queue of tasks and will
-execute them as a thread is available. For Acero we need additional
capabilities. For this we use the
-AsyncTaskScheduler. In the simplest mode of operation the scheduler simply
submits tasks to an underlying thread pool.
-However, it is also capable of creating sub-schedulers which can apply
throttling, prioritization, and task tracking:
-
- * A throttled scheduler associates a cost with each task. Tasks are only
submitted to the underlying scheduler
- if there is room. If there is not then the tasks are placed in a queue.
The write node uses a throttle of size
- 1 to avoid reentrantly calling the dataset writer (the dataset writer does
its own internal scheduling). A throttled
- scheduler can be manually paused and unpaused. When paused all tasks are
queued and queued tasks will not be submitted
- even if there is room. This can be useful in source nodes to implement
PauseProducing and ResumeProducing.
- * Priority can be applied to throttled schedulers to control the order in
which queued tasks are submitted. If
- there is room a task is submitted immediately (regardless of priority).
However, if the throttle is full then
- the task is queued and subject to prioritization. The scan node throttles
how many read requests it generates
- and prioritizes reading a dataset in order, if possible.
- * A task group can be used to keep track of a collection of tasks and run a
finalization task when all of the
- tasks have completed. This is useful for fork-join style problems. The
write node uses a task group to close
- a file once all outstanding write tasks for the file have completed.
-
-There is research and examples out there for different ways to prioritize
tasks in an execution engine. Acero has not
-yet had to address this problem. Let's go through some common situations:
-
- * Engines will often prioritize reading from the build side of a join node
before reading from the probe side. This
- would be more easily handled in Acero by applying backpressure.
- * Another common use case is to control memory accumulation. Engines will
prioritize tasks which are closer to the
- sink node in an effort to relieve memory pressure. However, Acero
currently assumes that spilling will be added
- at pipeline breakers and that memory usage in a plan will be more or less
static (per core) and well below the
- limits of the hardware. This might change if Acero needs to be used in an
environment where there are many compute
- resources and limited memory (e.g. a GPU)
- * Engines will often use work stealing algorithms to prioritize running tasks
on the same core to improve cache
- locality. However, since Acero uses a task-per-pipeline model there isn't
much lost opportunity for cache
- parallelism that a scheduler could reclaim. Tasks only end when there is
no more work that can be done with the data.
-
-While there is not much prioritization in place in Acero today we do have the
tools to apply it should we need to.
-
-.. note::
- In addition to the AsyncTaskScheduler there is another class called the
TaskScheduler. This class predates the
- AsyncTaskScheduler and was designed to offer task tracking for highly
efficient synchronous fork-join workloads.
- If this specialized purpose meets your needs then you may consider using
it. It would be interesting to profile
- this against the AsyncTaskScheduler and see how closely the two compare.
-
-Intra-node Parallelism
-----------------------
-
-Some nodes can potentially exploit parallelism within a task. For example, in
the scan node we can decode
-columns in parallel. In the hash join node, parallelism is sometimes
exploited for complex tasks such as
-building the hash table. This sort of parallelism is less common but not
necessarily discouraged. Profiling should
-be done first though to ensure that this extra parallelism will be helpful in
your workload.
-
-All Work Happens in Tasks
--------------------------
-
-All work in Acero happens as part of a task. When a plan is started the
AsyncTaskScheduler is created and given an
-initial task. This initial task calls StartProducing on the nodes. Tasks may
schedule additional tasks. For example,
-source nodes will usually schedule tasks during the call to StartProducing.
Pipeline breakers will often schedule tasks
-when they have accumulated all the data they need. Once all tasks in a plan
are finished then the plan is considered
-done.
-
-Some nodes use external threads. These threads must be registered as external
tasks using the BeginExternalTask method.
-For example, the asof join node uses a dedicated processing thread to achieve
serial execution. This dedicated thread
-is registered as an external task. External tasks should be avoided where
possible because they require careful
-handling to avoid deadlock in error situations.
-
-Ordered Execution
-=================
-
-Some nodes either establish an ordering to their outgoing batches or they need
to be able to process batches in order.
-Acero handles ordering using the ``batch_index`` property on an ExecBatch. If
a node has a deterministic output order
-then it should apply a batch index on batches that it emits. For example, the
OrderByNode applies a new ordering to
-batches (regardless of the incoming ordering). The scan node is able to
attach an implicit ordering to batches which
-reflects the order of the rows in the files being scanned.
-
-If a node needs to process data in order then it is a bit more complicated.
Because of the parallel nature of execution
-we cannot guarantee that batches will arrive at a node in order. However,
they can generally be expected to be "mostly
-ordered". As a result, we can insert the batches into a sequencing queue.
The sequencing queue is given a callback which
-is guaranteed to run on the batches, serially, in order. For example, the
fetch node uses a sequencing queue. The callback
-checks to see if we need to include part or all of the batch, and then slices
the batch if needed.
-
-Even if a node does not care about order it should try and maintain the batch
index if it can. The project and filter
-nodes do not care about order but they ensure that output batches keep the
same index as their input batches. The filter
-node will even emit empty batches if it needs to so that it can maintain the
batch order without gaps.
-
-.. figure:: ordered.svg
-
- An example of ordered execution
-
-
-Partitioned Execution
-=====================
-
-A stream is partitioned (or sometimes called segmented) if rows are grouped
together in some way. Currently there is not
-a formal notion of partitioning. However, one is starting to develop (e.g.
segmented aggregation) and we may end up
-introducing a more formal notion of partitions to Acero at some point as well.
-
-Spillover
-=========
-
-Spillover has not yet been implemented in Acero.
-
-Distributed Execution
-=====================
-
-There are certain exec nodes which are useful when an engine is used in a
distributed environment. The terminology
-can vary so we will use the Substrait terminology. An exchange node sends
data to different workers. Often this is
-a partitioned exchange so that Acero is expected to partition each batch and
distribute partitions across N different
-workers. On the other end we have the capture node. This node receives data
from different workers.
-
-These nodes do not exist in Acero today. However, they would be in scope and
we hope to have such nodes someday.
-
-Profiling & Tracing
-===================
-
-Acero's tracing is currently half-implemented and there are major gaps in
profiling tools. However, there has been some
-effort at tracing with open telemetry and most of the necessary pieces are in
place. The main thing currently lacking is
-some kind of effective visualization of the tracing results.
-
-In order to use the tracing that is present today you will need to build with
Arrow with ``ARROW_WITH_OPENTELEMETRY=ON``.
-Then you will need to set the environment variable
``ARROW_TRACING_BACKEND=otlp_http``. This will configure open telemetry
-to export trace results (as OTLP) to the HTTP endpoint
http://localhost:4318/v1/traces. You will need to configure an
-open telemetry collector to collect results on that endpoint and you will need
to configure a trace viewer of some kind
-such as Jaeger: https://www.jaegertracing.io/docs/1.21/opentelemetry/
-
-Benchmarking
-============
-
-The most complete macro benchmarking for Acero is provided by
https://github.com/voltrondata-labs/arrowbench
-These include a set of TPC-H benchmarks, executed from the R-dplyr
integration, which are run on every Arrow commit and
-reported to Conbench at https://conbench.ursa.dev/
-
-In addition to these TPC-H benchmarks there are a number of micro-benchmarks
for various nodes (hash-join, asof-join,
-etc.) Finally, the compute functions themselves should mostly have
micro-benchmarks. For more on micro benchmarks you
-can refer to https://arrow.apache.org/docs/developers/benchmarks.html
-
-Any new functionality should include micro benchmarks to avoid regressions.
-
-Bindings
-========
-
-Public API
-----------
-
-The public API for Acero consists of Declaration and the various
DeclarationToXyz methods. In addition the
-options classes for each node are part of the public API. However, nodes are
extensible and so this API is
-extensible.
-
-R (dplyr)
----------
-
-Dplyr is an R library for programmatically building queries. The arrow-r
package has dplyr bindings which
-adapt the dplyr API to create Acero execution plans. In addition, there is a
dplyr-substrait backend that
-is in development which could eventually replace the Acero-aware binding.
-
-Python
-------
-
-The pyarrow library binds to Acero in two different ways. First, there is a
direct binding in pyarrow.acero
-which directly binds to the public API. Second, there are a number of compute
utilities like
-pyarrow.Table.group_by which uses Acero, though this is invisible to the user.
-
-Java
-----
-
-The Java implementation exposes some capabilities from Arrow datasets. These
use Acero implicitly. There
-are no direct bindings to Acero or Substrait in the Java implementation today.
-
-Design Philosophies
-===================
-
-Engine Independent Compute
---------------------------
-
-If a node requires complex computation then it should encapsulate that work in
abstractions that don't depend on
-any particular engine design. For example, the hash join node uses utilities
such as a row encoder, a hash table,
-and an exec batch builder. Other places share implementations of sequencing
queues and row segmenters. The node
-itself should be kept minimal and simply maps from Acero to the abstraction.
-
-This helps to decouple designs from Acero's design details and allows them to
be more resilient to changes in the
-engine. It also helps to promote these abstractions as capabilities on their
own. Either for use in other engines
-or for potential new additions to pyarrow as compute utilities.
-
-Make Tasks not Threads
-----------------------
-
-If you need to run something in parallel then you should use thread tasks and
not dedicated threads.
-
- * This keeps the thread count down (reduces thread contention and context
switches)
- * This prevents deadlock (tasks get cancelled automatically in the event of a
failure)
- * This simplifies profiling (Tasks can be easily measured, easier to know
where all the work is)
- * This makes it possible to run without threads (sometimes users are doing
their own threading and
- sometimes we need to run in thread-restricted environments like emscripten)
-
-Note: we do not always follow this advice currently. There is a dedicated
process thread in the asof join
-node. Dedicated threads are "ok" for experimental use but we'd like to
migrate away from them.
-
-Don't Block on CPU Threads
---------------------------
-
-If you need to run a potentially long running activity that is not actively
using CPU resources (e.g. reading from
-disk, network I/O, waiting on an external library using its own threads) then
you should use asynchronous utilities
-to ensure that you do not block CPU threads.
-
-Don't Reinvent the Wheel
-------------------------
-
-Each node should not be a standalone island of utilities. Where possible,
computation should be pushed
-either into compute functions or into common shared utilities. This is the
only way a project as large as
-this can hope to be maintained.
-
-Avoid Query Optimization
-------------------------
-
-Writing an efficient Acero plan can be challenging. For example, filter
expressions and column selection
-should be pushed down into the scan node so that the data isn't read from
disk. Expressions should be
-simplified and common sub-expressions factored out. The build side of a hash
join node should be the
-smaller of the two inputs.
-
-However, figuring these problems out is a challenge reserved for a query
planner or a query optimizer.
-Creating a query optimizer is a challenging task beyond the scope of Acero.
With adoption of Substrait
-we hope utilities will eventually emerge that solve these problems. As a
result, we generally avoid doing
-any kind of query optimization within Acero. Acero should interpret
declarations as literally as possible.
-This helps reduce maintenance and avoids surprises.
-
-We also realize that this is not always possible. For example, the hash join
node currently detects if there
-is a chain of hash join operators and, if there is, it configure bloom filters
between the operators. This is
-technically a task that could be left to a query optimizer. However, this
behavior is rather specific to Acero
-and fairly niche and so it is unlikely it will be introduced to an optimizer
anytime soon.
-
-Performance Guidelines
-======================
-
-Batch Size
-----------
-
-Perhaps the most discussed performance criteria is batch size. Acero was
originally
-designed based on research to follow a morsel-batch model. Tasks are created
based on
-a large batch of rows (a morsel). The goal is for the morsel to be large
enough to justify
-the overhead of a task. Within a task the data is further subdivided into
batches.
-Each batch should be small enough to fit comfortable into CPU cache (often the
L2 cache).
-
-This sets up two loops. The outer loop is parallel and the inner loop is not:
-
-.. code:: python
-
- for morsel in dataset: # parallel
- for batch in morsel:
- run_pipeline(batch)
-
-The advantage of this style of execution is that successive nodes (or
successive operations
-within an exec node) that access the same column are likely to benefit from
cache. It also
-is essential for functions that require random access to data. It maximizes
parallelism while
-minimizing the data transfer from main memory to CPU cache.
-
-.. figure:: microbatch.svg
-
- If multiple passes through the data are needed (or random access) and the
batch is much bigger
- then the cache then performance suffers. Breaking the task into smaller
batches helps improve
- task locality.
-
-The morsel/batch model is reflected in a few places in Acero:
-
- * In most source nodes we will try and grab batches of 1Mi rows. This is
often configurable.
- * In the source node we then iterate and slice off batches of 32Ki rows.
This is not currently
- configurable.
- * The hash join node currently requires that a batches contain at 32Ki rows
or less as it uses
- 16-bit signed integers as row indices in some places.
-
-However, this guidance is debateable. Profiling has shown that we do not get
any real benefit
-from moving to a smaller batch size. It seems any advantage we do get is lost
in per-batch
-overhead. Most of this overhead appears to be due to various per-batch
allocations. In addition,
-depending on your hardware, it's not clear that CPU Cache<->RAM will always be
the bottleneck. A
-combination of linear access, pre-fetch, and high CPU<->RAM bandwidth can
alleviate the penalty
-of cache misses.
-
-As a result, this section is included in the guide to provide historical
context, but should not
-be considered binding.
-
-Ongoing & Deprecated Work
-=========================
-
-The following efforts are ongoing. They are described here to explain certain
duplication in the
-code base as well as explain types that are going away.
-
-Scanner v2
-----------
-
-The scanner is currently a node in the datasets module registered with the
factory registry as "scan".
-This node was written prior to Acero and made extensive use of AsyncGenerator
to scan multiple files
-in parallel. Unfortunately, the use of AsyncGenerator made the scan difficult
to profile, difficult
-to debug, and impossible to cancel. A new scan node is in progress. It is
currently registered with
-the name "scan2". The new scan node uses the AsyncTaskScheduler instead of
AsyncGenerator and should
-provide additional features such as the ability to skip rows and handle nested
column projection (for
-formats that support it)
-
-OrderBySink and SelectKSink
----------------------------
-
-These two exec nodes provided custom sink implementations. They were written
before ordered execution
-was added to Acero and were the only way to generate ordered output. However,
they had to be placed
-at the end of a plan and the fact that they were custom sink nodes made them
difficult to describe with
-Declaration. The OrderByNode and FetchNode replace these. These are kept at
the moment until existing
-bindings move away from them.
-
-Upstreaming Changes
-===================
-
-Acero is designed so that it can be extended without recompilation. You can
easily add new compute
-functions and exec nodes without creating a fork or compiling Acero. However,
as you develop new
-features that are generally useful, we hope you will make time to upstream
your changes.
-
-Even though we welcome these changes we have to admit that there is a cost to
this process. Upstreaming
-code requires that the new module behave correctly, but that is typically the
easier part to review.
-More importantly, upstreaming code is a process of transferring the
maintenance burden from yourself to
-the wider Arrow C++ project maintainers. This requires a deep understanding
of the code by maintainers,
-it requires the code be consistent with the style of the project, and it
requires that the code be well
-tested with unit tests to aid in regression.
-
-Because of this, we highly recommend taking the following steps:
-
-* As you are starting out you should send a message to the mailing list
announcing your intentions and
- design. This will help you determine if there is wider interest in the
feature and others may have
- ideas or suggestions to contribute early on in the process.
-
- * If there is not much interest in the feature then keep in mind that it may
be difficult to eventually
- upstream the change. The maintenance capacity of the team is limited and
we try and prioritize
- features that are in high demand.
-
-* We recommend developing and testing the change on your own fork until you
get it to a point where you
- are fairly confident things are working correctly. If the change is large
then you might also think
- about how you can break up the change into smaller pieces. As you do this
you can share both the larger
- PR (as a draft PR or a branch on your local fork) and the smaller PRs. This
way we can see the context
- of the smaller PRs. However, if you do break things up, smaller PRs should
still ideally stand on their
- own.
-
-* Any PR will need to have the following:
-
- * Unit tests converting the new functionality
-
- * Microbenchmarks if there is any significant compute work going on
-
- * Examples demonstrating how to use the new feature
-
- * Updates to the API reference and this guide
-
- * Passing CI (you can enable GitHub Actions on your fork and that will allow
most CI jobs to run before
- you create your PR)
diff --git a/docs/source/cpp/user_guide.rst b/docs/source/cpp/user_guide.rst
index 6a426b6c10..094859f9c5 100644
--- a/docs/source/cpp/user_guide.rst
+++ b/docs/source/cpp/user_guide.rst
@@ -30,7 +30,7 @@ User Guide
tables
compute
gandiva
- streaming_execution
+ acero
io
ipc
orc
diff --git a/docs/source/developers/cpp/acero.rst
b/docs/source/developers/cpp/acero.rst
index 688cfa9d5d..0492007904 100644
--- a/docs/source/developers/cpp/acero.rst
+++ b/docs/source/developers/cpp/acero.rst
@@ -15,551 +15,686 @@
.. specific language governing permissions and limitations
.. under the License.
-.. highlight:: console
-.. _development-cpp-acero:
+.. default-domain:: cpp
+.. highlight:: cpp
+.. cpp:namespace:: arrow::acero
================
Developing Acero
================
-Swiss Table
-===========
-
-A specialized hash table implementation used to dynamically map combinations of
-key field values to a dense set of integer ids. Ids can later be used in place
-of keys to identify groups of rows with equal keys.
-
-Introduction
-------------
-
-Hash group-by in Arrow uses a variant of a hash table based on a data structure
-called Swiss table. Swiss table uses linear probing. There is an array of slots
-and the information related to inserted keys is stored in these slots. A hash
-function determines the slot where the search for a matching key will start
-during hash table lookup. Then the slots are visited sequentially, wrapping
-around the end of an array, until either a match or an empty slot is found, the
-latter case meaning that there is no match. Swiss table organizes the slots in
-blocks of 8 and has a design that enables data level parallelism at the block
-level. More precisely, it allows for visiting all slots within a block at once
-during lookups, by simply using 64-bit arithmetic. SIMD instructions can
further
-enhance this data level parallelism allowing to process multiple blocks related
-to multiple input keys together using SIMD vectors of 64-bit elements. Occupied
-slots within a block are always clustered together. The name Swiss table comes
-from likening resulting sequences of empty slots to holes in a one dimensional
-cheese.
-
-Interface
----------
+This page goes into more detail into the design of Acero. It discusses how
+to create custom exec nodes and describes some of the philosophies behind
Acero's
+design and implementation. Finally, it gives an overview of how to extend
Acero
+with new behaviors and how this new behavior can be upstreamed into the core
Arrow
+repository.
+
+Understanding ExecNode
+======================
+
+ExecNode is an abstract class with several pure virtual methods that control
how the node operates:
-Hash table used in query processing for implementing join and group-by
operators
-does not need to provide all of the operations that a general purpose hash
table
-would. Simplified requirements can help achieve a simpler and more efficient
-design. For instance we do not need to be able to remove previously inserted
-keys. It’s an append-only data structure: new keys can be added but old keys
are
-never erased. Also, only a single copy of each key can be inserted - it is like
-``std::map`` in that sense and not ``std::multimap``.
-
-Our Swiss table is fully vectorized. That means that all methods work on
vectors
-of input keys processing them in batches. Specialized SIMD implementations of
-processing functions are almost always provided for performance critical
-operations. All callback interfaces used from the core hash table code are also
-designed to work on batches of inputs instead of individual keys. The batch
size
-can be almost arbitrary and is selected by the client of the hash table. Batch
-size should be the smallest number of input items, big enough so that the
-benefits of vectorization and SIMD can be fully experienced. Keeping it small
-means less memory used for temporary arrays storing intermediate results of
-computation (vector equivalent of some temporary variables kept on the stack).
-That in turn means smaller space in CPU caches, which also means less impact on
-other memory access intensive operations. We pick 1024 as the default size of
-the batch. We will call it a **mini-batch** to distinguish it from potentially
-other forms of batches used at higher levels in the code, e.g. when scheduling
-work for worker threads or relational operators inside an analytic query.
-
-The main functionality provided by Swiss table is mapping of arbitrarily
complex
-keys to unique integer ids. Let us call it **lookup-or-insert**. Given a
-sequence of key values, return a corresponding sequence of integer ids, such
-that all keys that are equal receive the same id and for K distinct keys the
-integer ids will be assigned from the set of numbers 0 to (K-1). If we find a
-matching key in a hash table for a given input, we return the **key id**
-assigned when the key was first inserted into a hash table. If we fail to find
-an already inserted match, we assign the first unused integer as a key id and
-add a new entry to a hash table. Due to vectorized processing, which may result
-in out-of-order processing of individual inputs, it is not guaranteed that if
-there are two new key values in the same input batch and one of them appears
-earlier in the input sequence, then it will receive a smaller key id.
Additional
-mapping functionality can be built on top of basic mapping to integer key id,
-for instance if we want to assign and perhaps keep updating some values to all
-unique keys, we can keep these values in a resizable vector indexed by obtained
-key id.
-
-The implementation of Swiss table does not need to have any information related
-to the domain of the keys. It does not use their logical data type or
-information about their physical representation and does not even use pointers
-to keys. All access to keys is delegated to a separate class or classes that
-provide callback functions for three operations:
-
-- computing hashes of keys;
-- checking equality for given pairs of keys;
-- appending a given sequence of keys to a stack maintained outside of Swiss
- table object, so that they can be referenced later on by key ids (key ids
will
- be equal to their positions in the stack).
-
-When passing arguments to callback functions the keys are referenced using
-integer ids. For the left side - that is the keys present in the input
-mini-batch - ordinal positions within that mini-batch are used. For the right
-side - that is the keys inserted into the hash table - these are identified by
-key ids assigned to them and stored inside Swiss table when they were first
-encountered and processed.
-
-Diagram with logical view of information passing in callbacks:
-
-.. image:: img/swiss_table_1.jpg
-
-Hash table values for inserted keys are also stored inside Swiss table. Because
-of that, hash table logic does not need to ever re-evaluate the hash, and there
-is actually no need for a hash function callback. It is enough that the caller
-provides hash values for all entries in the batch when calling
lookup-or-insert.
-
-Basic architecture and organization of data
+:func:`ExecNode::StartProducing`
+--------------------------------
+
+This method is called once at the start of the plan. Most nodes ignore this
method (any
+necessary initialization should happen in the constructor or Init). However,
source nodes
+will typically provide a custom implementation. Source nodes should schedule
whatever tasks
+are needed to start reading and providing the data. Source nodes are usually
the primary
+creator of tasks in a plan.
+
+.. note::
+ The ExecPlan operates on a push-based model. Sources are often pull-based.
For example,
+ your source may be an iterator. The source node will typically then
schedule tasks to pull one
+ item from the source and push that item into the source's output node (via
``InputReceived``).
+
+Examples
+^^^^^^^^
+
+* In the ``table_source`` node the input table is divided into batches. A
task is created for
+ each batch and that task calls ``InputReceived`` on the node's output.
+* In the ``scan`` node a task is created to start listing fragments from the
dataset. Each listing
+ task then creates tasks to read batches from the fragment, asynchronously.
When the batch is
+ full read in then a continuation schedules a new task with the exec plan.
This task calls
+ ``InputReceived`` on the scan node's output.
+
+:func:`ExecNode::InputReceived`
+-------------------------------
+
+This method is called many times during the execution of a plan. It is how
nodes pass data
+to each other. An input node will call InputReceived on its output. Acero's
execution model
+is push-based. Each node pushes data into its output by calling InputReceived
and passing in
+a batch of data.
+
+The InputReceived method is often where the actual work happens for the node.
For example,
+a project node will execute its expressions and create a new expanded output
batch. It will then
+call InputReceived on its output. InputReceived will never be called on a
source node. Sink
+nodes will never call InputReceived. All other nodes will experience both.
+
+Some nodes (often called "pipeline breakers") must accumulate input before
they can generate
+any output. For example, a sort node must accumulate all input before it can
sort the data and
+generate output. In these nodes the InputReceived method will typically place
the data into
+some kind of accumulation queue. If the node doesn't have enough data to
operate then it will
+not call InputReceived. This will then be the end of the current task.
+
+Examples
+^^^^^^^^
+
+* The ``project`` node runs its expressions, using the received batch as input
for the expression.
+ A new batch is created from the input batch and the result of the
expressions. The new batch is
+ given the same order index as the input batch and the node then calls
``InputReceived`` on its
+ output.
+* The ``order_by`` node inserts the batch into an accumulation queue. If this
was the last batch
+ then the node will sort everything in the accumulation queue. The node will
then call
+ ``InputReceived`` on the output for each batch in the sorted result. A new
batch index will be
+ assigned to each batch. Note that this final output step might also occur
as a result of a call
+ to ``InputFinished`` (described below).
+
+:func:`ExecNode::InputFinished`
+-------------------------------
+
+This method will be called once per input. A node will call InputFinished on
its output once it
+knows how many batches it will be sending to that output. Normally this
happens when the node is
+finished working. For example, a scan node will call InputFinished once it
has finished reading
+its files. However, it could call it earlier if it knows (maybe from file
metadata) how many
+batches will be created.
+
+Some nodes will use this signal to trigger some processing. For example, a
sort node need to
+wait until it has received all of its input before it can sort the data. It
relies on the InputFinished
+call to know this has happened.
+
+Even if a node is not doing any special processing when finished (e.g. a
project node or filter node
+doesn't need to do any end-of-stream processing) that node will still call
InputFinished on its
+output.
+
+.. warning::
+ The InputFinished call might arrive before the final call to InputReceived.
In fact, it could
+ even be sent out before any calls to InputReceived begin. For example, the
table source node
+ always knows exactly how many batches it will be producing. It could
choose to call InputFinished
+ before it ever calls InputReceived. If a node needs to do "end-of-stream"
processing then it typically
+ uses an AtomicCounter which is a helper class to figure out when all of the
data has arrived.
+
+Examples
+^^^^^^^^
+
+* The ``order_by`` checks to see if it has already received all its batches.
If it has then it performs
+ the sorting step described in the ``InputReceived`` example. Before it
starts sending output data it
+ checks to see how many output batches it has (it's possible the batch size
changed as part of the
+ accumulating or sorting) and calls ``InputFinished`` on the node's output.
+* The ``fetch`` node, during a call to ``InputReceived`` realizes it has
received all the rows it was
+ asked for. It calls ``InputFinished`` on its output immediately (even
though its own ``InputFinished``
+ method has not yet been called)
+
+:func:`ExecNode::PauseProducing` / :func:`ExecNode::ResumeProducing`
+---------------------------------------------------------------------
+
+These methods control backpressure. Some nodes may need to pause their input
to avoid accumulating
+too much data. For example, when the user is consuming the plan with a
RecordBatchReader we use a
+SinkNode. The SinkNode places data in a queue that the RecordBatchReader
pulls from (this is a
+conversion from a push-model to a pull-model). If the user is reading the
RecordBatchReader slowly then
+it is possible this queue will start to fill up. For another example we can
consider the write node.
+This node writes data to a filesystem. If the writes are slow then data might
accumulate at the
+write node. As a result, the write node would need to apply backpressure.
+
+When a node realizes that it needs to apply some backpressure it will call
PauseProducing on its input.
+Once the node has enough space to continue it will then call ResumeProducing
on its input. For example,
+the SinkNode would pause when its queue gets too full. As the user continues
to read from the
+RecordBatchReader we can expect the queue to slowly drain. Once the queue has
drained enough then the
+SinkNode can call ResumeProducing.
+
+Source nodes typically need to provide special behavior for PauseProducing and
ResumeProducing. For
+example, a scan node that is reading from a file can pause reading the file.
However, some source nodes
+may not be able to pause in any meaningful way. There is not much point in a
table source node pausing
+because its data is already in memory.
+
+Nodes that are neither source or sink should still forward backpressure
signals. For example, when
+PauseProducing is called on a project node it should call PauseProducing on
its input. If a node has
+multiple inputs then it should forward the signal to every input.
+
+Examples
+^^^^^^^^
+
+* The ``write`` node, in its ``InputReceived`` method, adds a batch to a
dataset writer's queue. If the
+ dataset writer is then full it will return an unfinished future that will
complete when it has more room.
+ The ``write`` node then calls ``PauseProducing`` on its input. It then adds
a continuation to the future
+ that will call ``ResumeProducing`` on its input.
+* The ``scan`` node uses an :class:`AsyncTaskScheduler` to keep track of all
the tasks it schedules. This
+ scheduler is throttled to limit how much concurrent I/O the ``scan`` node is
allowed to perform. When
+ ``PauseProducing`` is called then the node will pause the scheduler. This
means that any tasks queued
+ behind the throttle will not be submitted. However, any ongoing I/O will
continue (backpressure can't
+ take effect immediately). When ``ResumeProducing`` is called the ``scan``
node will unpause the scheduler.
+
+:func:`ExecNode::StopProducing`
+-------------------------------
+
+StopProducing is called when a plan needs to end early. This can happen
because the user cancelled
+the plan and it can happen because an error occurred. Most nodes do not need
to do anything here.
+There is no expectation or requirement that a node sends any remaining data it
has. Any node that
+schedules tasks (e.g. a source node) should stop producing new data.
+
+In addition to plan-wide cancellation, a node may call this method on its
input if it has decided
+that it has received all the data that it needs. However, because of
parallelism, a node may still
+receive a few calls to ``InputReceived`` after it has stopped its input.
+
+If any external resources are used then cleanup should happen as part of this
call.
+
+Examples
+^^^^^^^^
+
+* The ``asofjoin`` node has a dedicated processing thread the communicates
with the main Acero threads
+ using a queue. When ``StopProducing`` is called the node inserts a poison
pill into the queue. This
+ tells the processing thread to stop immediately. Once the processing thread
stops it marks its external
+ task (described below) as completed which allows the plan to finish.
+* The ``fetch`` node, in ``InputReceived``, may decide that it has all the
data it needs. It can then call
+ ``StopProducing`` on its input.
+
+Initialization / Construction / Destruction
-------------------------------------------
-The hash table is an array of **slots**. Slots are grouped in groups of 8
called
-**blocks**. The number of blocks is a power of 2. The empty hash table starts
-with a single block, with all slots empty. Then, as the keys are getting
-inserted and the amount of empty slots is shrinking, at some point resizing of
-the hash table is triggered. The data stored in slots is moved to a new hash
-table that has the double of the number of blocks.
-
-The diagram below shows the basic organization of data in our implementation of
-Swiss table:
-
-.. image:: img/swiss_table_2.jpg
-
-N is the log of the number of blocks, :math:`2^{N+3}` is the number of slots
and
-also the maximum number of inserted keys and hence (N + 3) is the number of
bits
-required to store a key id. We will refer to N as the **size of the hash
table**.
-
-Index of a block within an array will be called **block id**, and similarly
index
-of a slot will be **slot id**. Sometimes we will focus on a single block and
-refer to slots that belong to it by using a **local slot id**, which is an
index
-from 0 to 7.
-
-Every slot can either be **empty** or store data related to a single inserted
-key. There are three pieces of information stored inside a slot:
-
-- status byte,
-- key id,
-- key hash.
-
-Status byte, as the name suggests, stores 8 bits. The highest bit indicates if
-the slot is empty (the highest bit is set) or corresponds to one of inserted
-keys (the highest bit is zero). The remaining 7 bits contain 7 bits of key hash
-that we call a **stamp**. The stamp is used to eliminate some false positives
-when searching for a matching key for a given input. Slot also stores **key
id**,
-which is a non-negative integer smaller than the number of inserted keys, that
is
-used as a reference to the actual inserted key. The last piece of information
-related to an inserted key is its **hash** value. We store hashes for all keys,
-so that they never need to be re-computed. That greatly simplifies some
-operations, like resizing of a hash table, that may not even need to look at
the
-keys at all. For an empty slot, the status byte is 0x80, key id is zero and the
-hash is not used and can be set to any number.
-
-A single block contains 8 slots and can be viewed as a micro-stack of up to 8
-inserted keys. When the first key is inserted into an empty block, it will
occupy
-a slot with local id 0. The second inserted key will go into slot number 1 and
so
-on. We use N highest bits of hash to get an index of a **start block**, when
-searching for a match or an empty slot to insert a previously not seen key when
-that is the case. If the start block contains any empty slots, then the search
-for either a match or place to insert a key will end at that block. We will
call
-such a block an **open block**. A block that is not open is a full block. In
the
-case of full block, the input key related search may continue in the next block
-modulo the number of blocks. If the key is not inserted into its start block,
we
-will refer to it as an **overflow** entry, other entries being
**non-overflow**.
-Overflow entries are slower to process, since they require visiting more than
one
-block, so we want to keep their percentage low. This is done by choosing the
-right **load factor** (percentage of occupied slots in the hash table) at which
-the hash table gets resized and the number of blocks gets doubled. By tuning
this
-value we can control the probability of encountering an overflow entry.
-
-The most interesting part of each block is the set of status bytes of its
slots,
-which is simply a single 64-bit word. The implementation of efficient searches
-across these bytes during lookups require using either leading zero count or
-trailing zero count intrinsic. Since there are cases when only the first one is
-available, in order to take advantage of it, we order the bytes in the 64-bit
-status word so that the first slot within a block uses the highest byte and the
-last one uses the lowest byte (slots are in reversed bytes order). The diagram
-below shows how the information about slots is stored within a 64-bit status
-word:
-
-.. image:: img/swiss_table_3.jpg
-
-Each status byte has a 7-bit fragment of hash value - a **stamp** - and an
empty
-slot bit. Empty slots have status byte equal to 0x80 - the highest bit is set
to
-1 to indicate an empty slot and the lowest bits, which are used by a stamp, are
-set to zero.
-
-The diagram below shows which bits of hash value are used by hash table:
-
-.. image:: img/swiss_table_4.jpg
-
-If a hash table has :math:`2^{N}` blocks, then we use N highest bits of a hash
-to select a start block when searching for a match. The next 7 bits are used as
-a stamp. Using the highest bits to pick a start block means that a range of
hash
-values can be easily mapped to a range of block ids of start blocks for hashes
-in that range. This is useful when resizing a hash table or merging two hash
-tables together.
-
-Interleaving status bytes and key ids
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Status bytes and key ids for all slots are stored in a single array of bytes.
-They are first grouped by 8 into blocks, then each block of status bytes is
-interleaved with a corresponding block of key ids. Finally key ids are
-represented using the smallest possible number of bits and bit-packed (bits
-representing each next key id start right after the last bit of the previous
key
-id). Note that regardless of the chosen number of bits, a block of bit-packed
-key ids (that is 8 of them) will start and end on the byte boundary.
-
-The diagram below shows the organization of bytes and bits of a single block in
-interleaved array:
-
-.. image:: img/swiss_table_5.jpg
-
-From the size of the hash table we can derive the number K of bits needed in
the
-worst case to encode any key id. K is equal to the number of bits needed to
-represent slot id (number of keys is not greater than the number of slots and
any
-key id is strictly less than the number of keys), which for a hash table of
size
-N (N blocks) equals (N+3). To simplify bit packing and unpacking and avoid
-handling of special cases, we will round up K to full bytes for K > 24 bits.
-
-Status bytes are stored in a single 64-bit word in reverse byte order (the last
-byte corresponds to the slot with local id 0). On the other hand key ids are
-stored in the normal order (the order of slot ids).
-
-Since both status byte and key id for a given slot are stored in the same array
-close to each other, we can expect that most of the lookups will read only one
-CPU cache-line from memory inside Swiss table code (then at least another one
-outside Swiss table to access the bytes of the key for the purpose of
-comparison). Even if we hit an overflow entry, it is still likely to reside on
-the same cache-line as the start block data. Hash values, which are stored
-separately from status byte and key id, are only used when resizing and do not
-impact the lookups outside these events.
+Simple initialization logic (that cannot error) can be done in the
constructor. If the initialization
+logic may return an invalid status then it can either be done in the exec
node's factory method or
+the ``Init`` method. The factory method is preferred for simple validation.
The ``Init`` method is
+preferred if the initialization might do expensive allocation or other
resource consumption. ``Init`` will
+always be called before ``StartProducing`` is called. Initialization could
also be done in
+``StartProducing`` but keep in mind that other nodes may have started by that
point.
+
+In addition, there is a ``Validate`` method that can be overloaded to provide
custom validation. This
+method is normally called before ``Init`` but after all inputs and outputs
have been added.
+
+Finalization happens today in the destructor. There are a few examples today
where that might be slow.
+For example, in the write node, if there was an error during the plan, then we
might close out some open
+files here. Should there be significant finalization that is either
asynchronous or could potentially
+trigger an error then we could introduce a Finalize method to the ExecNode
lifecycle. It hasn't been
+done yet only because it hasn't been needed.
+
+Summary
+-------
+
+.. list-table:: ExecNode Lifecycle
+ :widths: 20 40 40
+ :header-rows: 1
+
+ * - Method Name
+ - This is called when...
+ - A node calls this when...
+ * - StartProducing
+ - The plan is starting
+ - N/A
+ * - InputReceived
+ - Data is received from the input
+ - To send data to the output
+ * - InputFinished
+ - The input knows how many batches there are
+ - The node can tell its output how many batches there are
+ * - StopProducing
+ - A plan is aborted or an output has enough data
+ - A node has all the data it needs
+
+Extending Acero
+===============
+
+Acero instantiates a singleton :class:`ExecFactoryRegistry` which maps between
names and exec node
+factories (methods which create an ExecNode from options). To create a new
ExecNode you can register
+the node with this registry and your node will now be usable by Acero. If you
would like to be able
+to use this node with Substrait plans you will also need to configure the
Substrait registry so that it
+knows how to map Substrait to your custom node.
+
+This means that you can create and add new nodes to Acero without recompiling
Acero from source.
+
+Scheduling and Parallelism
+==========================
+
+There are many ways in that data engines can utilize multiple compute
resources (e.g. multiple cores).
+Before we get into the details of Acero's scheduling we will cover a few high
level topics.
+
+Parallel Execution of Plans
+---------------------------
+
+Users may want to execute multiple plans concurrently and they are welcome to
do so. However, Acero has no
+concept of inter-plan scheduling. Each plan will attempt to maximize its
usage of compute resources and
+there will likely be contention of CPU and memory and disk resources. If
plans are using the default CPU &
+I/O thread pools this will be mitigated somewhat since they will share the
same thread pool.
+
+Locally Distributed Plans
+-------------------------
+
+A common way to tackle multi-threading is to split the input into partitions
and then create a plan for
+each partition and then merge the results from these plans in some way. For
example, let's assume you
+have 20 files and 10 cores and you want to read and sort all the data. You
could create a plan for every
+2 files to read and sort those files. Then you could create one extra plan
that takes the input from these
+10 child plans and merges the 10 input streams in a sorted fashion.
+
+This approach is popular because it is how queries are distributed across
multiple servers and so it
+is widely supported and well understood. Acero does not do this today but
there is no reason to prevent it.
+Adding shuffle & partition nodes to Acero should be a high priority and would
enable Acero to be used by
+distributed systems. Once that has been done then it should be possible to do
a local shuffle (local
+meaning exchanging between multiple exec plan instances on a single system) if
desired.
+
+.. figure:: img/dist_plan.svg
+
+ A distributed plan can provide parallelism even if the plans themselves run
serially
+
+Pipeline Parallelism
+--------------------
+
+Acero attempts to maximize parallelism using pipeline parallelism. As each
batch of data arrives from the
+source we immediately create a task and start processing it. This means we
will likely start processing
+batch X before the processing of batch X-1 has completed. This is very
flexible and powerful. However, it also
+means that properly implementing an ExecNode is difficult.
+
+For example, an ExecNode's InputReceived method should be reentrant. In other
words, it should be expected
+that InputReceived will be called before the previous call to InputReceived
has completed. This means that
+nodes with any kind of mutable state will need mutexes or similar mechanisms
to protect that state from race
+conditions. It also means that tasks can easily get out of order and nodes
should not expect any particular ordering
+of their input (more on this later).
+
+.. figure:: img/pipeline.svg
+
+ An example of pipeline parallelism on a system with 3 CPU threads and 2 I/O
threads
+
+Asynchronicity
+--------------
+
+Some operations take a long time and may not require the CPU. Reading data
from the filesystem is one example. If we
+only have one thread per core then time will be wasted while we wait for these
operations to complete. There
+are two common solutions to this problem. A synchronous solution is often to
create more threads than there are
+cores with the expectation that some of them will be blocked and that is ok.
This approach tends to be simpler
+but it can lead to excess thread contention and requires fine-tuning.
+
+Another solution is to make the slow operations asynchronous. When the slow
operation starts the caller gives up
+the thread and allows other tasks to run in the meantime. Once the slow
operation finishes then a new task is
+created to take the result and continue processing. This helps to minimize
thread contention but tends to be
+more complex to implement.
+
+Due to a lack of standard C++ async APIs, Acero uses a combination of the two
approaches. Acero has two thread pools.
+The first is the CPU thread pool. This thread pool has one thread per core.
Tasks in this thread pool should never
+block (beyond minor delays for synchronization) and should generally be
actively using CPU as much as possible. Threads
+on the I/O thread pool are expected to spend most of the time idle. They
should avoid doing any CPU-intensive work.
+Their job is basically to wait for data to be available and schedule follow-up
tasks on the CPU thread pool.
+
+.. figure:: img/async.svg
+
+ Arrow achieves asynchronous execution by combining CPU & I/O thread pools
+
+.. note::
+
+ Most nodes in Acero do not need to worry about asynchronicity. They are
fully synchronous and do not spawn tasks.
+
+Task per Pipeline (and sometimes beyond)
+----------------------------------------
+
+An engine could choose to create a thread task for every execution of a node.
However, without careful scheduling,
+this leads to problems with cache locality. For example, let's assume we have
a basic plan consisting of three
+exec nodes, scan, project, and then filter (this is a very common use case).
Now let's assume there are 100 batches.
+In a task-per-operator model we would have tasks like "Scan Batch 5", "Project
Batch 5", and "Filter Batch 5". Each
+of those tasks is potentially going to access the same data. For example,
maybe the ``project`` and ``filter`` nodes need
+to read the same column. A column which is intially created in a decode phase
of the ``scan`` node. To maximize cache
+utilization we would need to carefully schedule our tasks to ensure that all
three of those tasks are run consecutively
+and assigned to the same CPU core.
+
+To avoid this problem we design tasks that run through as many nodes as
possible before the task ends. This sequence
+of nodes is often referred to as a "pipeline" and the nodes that end the
pipeline (and thus end the task) are often
+called "pipeline breakers". Some nodes might even fall somewhere in between.
For example, in a hash join node, when
+we receive a batch on the probe side, and the hash table has been built, we do
not need to end the task and instead keep
+on running. This means that tasks might sometimes end at the join node and
might sometimes continue past the join node.
+
+.. figure:: img/pipeline_task.svg
+
+ A logical view of pipelines in a plan and two tasks, showing that pipeline
boundaries may vary during a plan
+
+
+Thread Pools and Schedulers
+---------------------------
+
+The CPU and I/O thread pools are a part of the core Arrow-C++ library. They
contain a FIFO queue of tasks and will
+execute them as a thread is available. For Acero we need additional
capabilities. For this we use the
+AsyncTaskScheduler. In the simplest mode of operation the scheduler simply
submits tasks to an underlying thread pool.
+However, it is also capable of creating sub-schedulers which can apply
throttling, prioritization, and task tracking:
+
+ * A throttled scheduler associates a cost with each task. Tasks are only
submitted to the underlying scheduler
+ if there is room. If there is not then the tasks are placed in a queue.
The write node uses a throttle of size
+ 1 to avoid reentrantly calling the dataset writer (the dataset writer does
its own internal scheduling). A throttled
+ scheduler can be manually paused and unpaused. When paused all tasks are
queued and queued tasks will not be submitted
+ even if there is room. This can be useful in source nodes to implement
PauseProducing and ResumeProducing.
+ * Priority can be applied to throttled schedulers to control the order in
which queued tasks are submitted. If
+ there is room a task is submitted immediately (regardless of priority).
However, if the throttle is full then
+ the task is queued and subject to prioritization. The scan node throttles
how many read requests it generates
+ and prioritizes reading a dataset in order, if possible.
+ * A task group can be used to keep track of a collection of tasks and run a
finalization task when all of the
+ tasks have completed. This is useful for fork-join style problems. The
write node uses a task group to close
+ a file once all outstanding write tasks for the file have completed.
+
+There is research and examples out there for different ways to prioritize
tasks in an execution engine. Acero has not
+yet had to address this problem. Let's go through some common situations:
+
+ * Engines will often prioritize reading from the build side of a join node
before reading from the probe side. This
+ would be more easily handled in Acero by applying backpressure.
+ * Another common use case is to control memory accumulation. Engines will
prioritize tasks which are closer to the
+ sink node in an effort to relieve memory pressure. However, Acero
currently assumes that spilling will be added
+ at pipeline breakers and that memory usage in a plan will be more or less
static (per core) and well below the
+ limits of the hardware. This might change if Acero needs to be used in an
environment where there are many compute
+ resources and limited memory (e.g. a GPU)
+ * Engines will often use work stealing algorithms to prioritize running tasks
on the same core to improve cache
+ locality. However, since Acero uses a task-per-pipeline model there isn't
much lost opportunity for cache
+ parallelism that a scheduler could reclaim. Tasks only end when there is
no more work that can be done with the data.
+
+While there is not much prioritization in place in Acero today we do have the
tools to apply it should we need to.
.. note::
- Improvement to consider:
- In addition to the Swiss table data, we need to store an array of inserted
- keys, one for each key id. If keys are of fixed length, then the address of
- the bytes of the key can be calculated by multiplying key id by the common
- length of the key. If keys are of varying length, then there will be an
- additional array with an offset of each key within the array of concatenated
- bytes of keys. That means that any key comparison during lookup will involve
- 3 arrays: one to get key id, one to get key offset and final one with bytes
of
- the key. This could be reduced to 2 array lookups if we stored key offset
- instead of key id interleaved with slot status bytes. Offset indexed by key
id
- and stored in its own array becomes offset indexed by slot id and stored
- interleaved with slot status bytes. At the same time key id indexed by slot
id
- and interleaved with slot status bytes before becomes key id referenced
using
- offset and stored with key bytes. There may be a slight increase in the
total
- size of memory needed by the hash table, equal to the difference in the
number
- of bits used to store offset and those used to store key id, multiplied by
the
- number of slots, but that should be a small fraction of the total size.
-
-32-bit hash vs 64-bit hash
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Currently we use 32-bit hash values in Swiss table code and 32-bit integers as
-key ids. For the robust implementation, sooner or later we will need to support
-64-bit hash and 64-bit key ids. When we use 32-bit hash, it means that we run
-out of hash bits when hash table size N is greater than 25 (25 bits of hash
-needed to select a block and 7 bits needed to generate a stamp byte reach 32
-total bits). When the number of inserted keys exceeds the maximal number of
keys
-stored in a hash table of size 25 (which is at least :math:`2^{24}`), the
chance
-of false positives during lookups will start quickly growing. 32-bit hash
should
-not be used with more than about 16 million inserted keys.
-
-Low memory footprint and low chance of hash collisions
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Swiss table is a good choice of a hash table for modern hardware, because it
-combines lookups that can take advantage of special CPU instructions with space
-efficiency and low chance of hash collisions.
-
-Space efficiency is important for performance, because the cost of random array
-accesses, often dominating the lookup cost for larger hash tables, increases
with
-the size of the arrays. This happens due to limited space of CPU caches. Let us
-look at what is the amortized additional storage cost for a key in a hash table
-apart from the essential cost of storing data of all those keys. Furthermore,
we
-can skip the storage of hash values, since these are only used during
infrequent
-hash table resize operations (should not have a big impact on CPU cache usage
in
-normal cases).
-
-Half full hash table of size N will use 2 status bytes per inserted key
(because
-for every filled slot there is one empty slot) and 2*(N+3) bits for key id
-(again, one for the occupied slot and one for the empty). For N = 16 for
-instance this is slightly under 7 bytes per inserted key.
-
-Swiss table also has a low probability of false positives leading to wasted key
-comparisons. Here is some rationale behind why this should be the case. Hash
-table of size N can contain up to :math:`2^{N+3}` keys. Search for a match
-involves (N + 7) hash bits: N to select a start block and 7 to use as a stamp.
-There are always at least 16 times more combinations of used hash bits than
-there are keys in the hash table (32 times more if the hash table is half
full).
-These numbers mean that the probability of false positives resulting from a
-search for a matching slot should be low. That corresponds to an expected
number
-of comparisons per lookup being close to 1 for keys already present and 0 for
-new keys.
-
-Lookup
+ In addition to the AsyncTaskScheduler there is another class called the
TaskScheduler. This class predates the
+ AsyncTaskScheduler and was designed to offer task tracking for highly
efficient synchronous fork-join workloads.
+ If this specialized purpose meets your needs then you may consider using
it. It would be interesting to profile
+ this against the AsyncTaskScheduler and see how closely the two compare.
+
+Intra-node Parallelism
+----------------------
+
+Some nodes can potentially exploit parallelism within a task. For example, in
the scan node we can decode
+columns in parallel. In the hash join node, parallelism is sometimes
exploited for complex tasks such as
+building the hash table. This sort of parallelism is less common but not
necessarily discouraged. Profiling should
+be done first though to ensure that this extra parallelism will be helpful in
your workload.
+
+All Work Happens in Tasks
+-------------------------
+
+All work in Acero happens as part of a task. When a plan is started the
AsyncTaskScheduler is created and given an
+initial task. This initial task calls StartProducing on the nodes. Tasks may
schedule additional tasks. For example,
+source nodes will usually schedule tasks during the call to StartProducing.
Pipeline breakers will often schedule tasks
+when they have accumulated all the data they need. Once all tasks in a plan
are finished then the plan is considered
+done.
+
+Some nodes use external threads. These threads must be registered as external
tasks using the BeginExternalTask method.
+For example, the asof join node uses a dedicated processing thread to achieve
serial execution. This dedicated thread
+is registered as an external task. External tasks should be avoided where
possible because they require careful
+handling to avoid deadlock in error situations.
+
+Ordered Execution
+=================
+
+Some nodes either establish an ordering to their outgoing batches or they need
to be able to process batches in order.
+Acero handles ordering using the ``batch_index`` property on an ExecBatch. If
a node has a deterministic output order
+then it should apply a batch index on batches that it emits. For example, the
OrderByNode applies a new ordering to
+batches (regardless of the incoming ordering). The scan node is able to
attach an implicit ordering to batches which
+reflects the order of the rows in the files being scanned.
+
+If a node needs to process data in order then it is a bit more complicated.
Because of the parallel nature of execution
+we cannot guarantee that batches will arrive at a node in order. However,
they can generally be expected to be "mostly
+ordered". As a result, we can insert the batches into a sequencing queue.
The sequencing queue is given a callback which
+is guaranteed to run on the batches, serially, in order. For example, the
fetch node uses a sequencing queue. The callback
+checks to see if we need to include part or all of the batch, and then slices
the batch if needed.
+
+Even if a node does not care about order it should try and maintain the batch
index if it can. The project and filter
+nodes do not care about order but they ensure that output batches keep the
same index as their input batches. The filter
+node will even emit empty batches if it needs to so that it can maintain the
batch order without gaps.
+
+.. figure:: img/ordered.svg
+
+ An example of ordered execution
+
+
+Partitioned Execution
+=====================
+
+A stream is partitioned (or sometimes called segmented) if rows are grouped
together in some way. Currently there is not
+a formal notion of partitioning. However, one is starting to develop (e.g.
segmented aggregation) and we may end up
+introducing a more formal notion of partitions to Acero at some point as well.
+
+Spillover
+=========
+
+Spillover has not yet been implemented in Acero.
+
+Distributed Execution
+=====================
+
+There are certain exec nodes which are useful when an engine is used in a
distributed environment. The terminology
+can vary so we will use the Substrait terminology. An exchange node sends
data to different workers. Often this is
+a partitioned exchange so that Acero is expected to partition each batch and
distribute partitions across N different
+workers. On the other end we have the capture node. This node receives data
from different workers.
+
+These nodes do not exist in Acero today. However, they would be in scope and
we hope to have such nodes someday.
+
+Profiling & Tracing
+===================
+
+Acero's tracing is currently half-implemented and there are major gaps in
profiling tools. However, there has been some
+effort at tracing with open telemetry and most of the necessary pieces are in
place. The main thing currently lacking is
+some kind of effective visualization of the tracing results.
+
+In order to use the tracing that is present today you will need to build with
Arrow with ``ARROW_WITH_OPENTELEMETRY=ON``.
+Then you will need to set the environment variable
``ARROW_TRACING_BACKEND=otlp_http``. This will configure open telemetry
+to export trace results (as OTLP) to the HTTP endpoint
http://localhost:4318/v1/traces. You will need to configure an
+open telemetry collector to collect results on that endpoint and you will need
to configure a trace viewer of some kind
+such as Jaeger: https://www.jaegertracing.io/docs/1.21/opentelemetry/
+
+Benchmarking
+============
+
+The most complete macro benchmarking for Acero is provided by
https://github.com/voltrondata-labs/arrowbench
+These include a set of TPC-H benchmarks, executed from the R-dplyr
integration, which are run on every Arrow commit and
+reported to Conbench at https://conbench.ursa.dev/
+
+In addition to these TPC-H benchmarks there are a number of micro-benchmarks
for various nodes (hash-join, asof-join,
+etc.) Finally, the compute functions themselves should mostly have
micro-benchmarks. For more on micro benchmarks you
+can refer to https://arrow.apache.org/docs/developers/benchmarks.html
+
+Any new functionality should include micro benchmarks to avoid regressions.
+
+Bindings
+========
+
+Public API
+----------
+
+The public API for Acero consists of Declaration and the various
DeclarationToXyz methods. In addition the
+options classes for each node are part of the public API. However, nodes are
extensible and so this API is
+extensible.
+
+R (dplyr)
+---------
+
+Dplyr is an R library for programmatically building queries. The arrow-r
package has dplyr bindings which
+adapt the dplyr API to create Acero execution plans. In addition, there is a
dplyr-substrait backend that
+is in development which could eventually replace the Acero-aware binding.
+
+Python
------
-Lookup-or-insert operation, given a hash of a key, finds a list of candidate
-slots with corresponding keys that are likely to be equal to the input key. The
-list may be empty, which means that the key does not exist yet in the hash
-table. If it is not empty, then the callback function for key comparison is
-called for each next candidate to verify that there is indeed a match. False
-positives get rejected and we end up either finding an actual match or an empty
-slot, which means that the key is new to the hash table. New keys get assigned
-next available integers as key ids, and are appended to the set of keys stored
in
-the hash table. As a result of inserting new keys to the hash table, the
density
-of occupied slots may reach an upper limit, at which point the hash table will
be
-resized and will afterwards have twice as many slots. That is in summary
-lookup-or-insert functionality, but the actual implementation is a bit more
-involved, because of vectorization of the processing and various optimizations
-for common cases.
+The pyarrow library binds to Acero in two different ways. First, there is a
direct binding in pyarrow.acero
+which directly binds to the public API. Second, there are a number of compute
utilities like
+pyarrow.Table.group_by which uses Acero, though this is invisible to the user.
-Search within a single block
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Java
+----
-There are three possible cases that can occur when searching for a match for a
-given key (that is, for a given stamp of a key) within a single block,
-illustrated below.
-
-1. There is a matching stamp in the block of status bytes:
-
-.. image:: img/swiss_table_6.jpg
-
-2. There is no matching stamp in the block, but there is an empty slot in the
- block:
-
-.. image:: img/swiss_table_7.jpg
-
-3. There is no matching stamp in the block and the block is full (there are no
- empty slots left):
-
-.. image:: img/swiss_table_8.jpg
-
-64-bit arithmetic can be used to search for a matching slot within the entire
-single block at once, without iterating over all slots in it. Following is an
-example of a sequence of steps to find the first status byte for a given stamp,
-returning the first empty slot on miss if the block is not full or 8 (one past
-maximum local slot id) otherwise.
-
-Following is a sketch of the possible steps to execute when searching for the
-matching stamp in a single block.
-
-| *Example will use input stamp 0x5E and a 64-bit status bytes word with one
empty
- slot:*
-| *0x 4B17 5E3A 5E2B 1180*
-
-1. [1 instruction] Replicate stamp to all bytes by multiplying it by 0x 0101
0101
- 0101 0101.
-
- | *We obtain: 0x 5E5E 5E5E 5E5E 5E5E.*
-
-2. [1 instruction] XOR replicated stamp with status bytes word. Bytes
corresponding
- to a matching stamp will be 0, bytes corresponding to empty slots will have
a
- value between 128 and 255, bytes corresponding to non-matching non-empty
slots
- will have a value between 1 and 127.
-
- | *We obtain: 0x 1549 0064 0075 4FDE.*
-
-3. [2 instructions] In the next step we want to have information about a match
in
- the highest bit of each byte. We can ignore here empty slot bytes, because
they
- will be taken care of at a later step. Set the highest bit in each byte (OR
with
- 0x 8080 8080 8080 8080) and then subtract 1 from each byte (subtract 0x
0101 0101
- 0101 0101 from 64-bit word). Now if a byte corresponds to a non-empty slot
then
- the highest bit 0 indicates a match and 1 indicates a miss.
-
- | *We obtain: 0x 95C9 80E4 80F5 CFDE,*
- | *then 0x 94C8 7FE3 7FF4 CEDD.*
-
-4. [3 instructions] In the next step we want to obtain in each byte one of two
- values: 0x80 if it is either an empty slot or a match, 0x00 otherwise. We do
- it in three steps: NOT the result of the previous step to change the meaning
- of the highest bit; OR with the original status word to set highest bit in a
- byte to 1 for empty slots; mask out everything other than the highest bits
in
- all bytes (AND with 0x 8080 8080 8080 8080).
-
- | *We obtain: 6B37 801C 800B 3122,*
- | *then 6B37 DE3E DE2B 31A2,*
- | *finally 0x0000 8000 8000 0080.*
-
-5. [2 instructions] Finally, use leading zero bits count and divide it by 8 to
- find an index of the last byte that corresponds either to a match or an
empty
- slot. If the leading zero count intrinsic returns 64 for a 64-bit input
zero,
- then after dividing by 8 we will also get the desired answer in case of a
full
- block without any matches.
-
- | *We obtain: 16,*
- | *then 2 (index of the first slot within the block that matches the
stamp).*
-
-If SIMD instructions with 64-bit lanes are available, multiple single block
-searches for different keys can be executed together. For instance AVX2
-instruction set allows to process quadruplets of 64-bit values in a single
-instruction, four searches at once.
-
-Complete search potentially across multiple blocks
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Full implementation of a search for a matching key may involve visiting
multiple
-blocks beginning with the start block selected based on the hash of the key. We
-move to the next block modulo the number of blocks, whenever we do not find a
-match in the current block and the current block is full. The search may also
-involve visiting one or more slots in each block. Visiting in this case means
-calling a comparison callback to verify the match whenever a slot with a
matching
-stamp is encountered. Eventually the search stops when either:
-
-- the matching key is found in one of the slots matching the stamp, or
-- an empty slot is reached. This is illustrated in the diagram below:
-
-.. image:: img/swiss_table_9.jpg
-
-Optimistic processing with two passes
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
-Hash table lookups may have high cost in the pessimistic case, when we
encounter
-cases of hash collisions and full blocks that lead to visiting further blocks.
In
-the majority of cases we can expect an optimistic situation - the start block
is
-not full, so we will only visit this one block, and all stamps in the block are
-different, so we will need at most one comparison to find a match. We can
expect
-about 90% of the key lookups for an existing key to go through the optimistic
-path of processing. For that reason it pays off to optimize especially for this
-90% of inputs.
-
-Lookups in Swiss table are split into two passes over an input batch of keys.
The
-**first pass: fast-path lookup**, is a highly optimized, vectorized,
-SIMD-friendly, branch-free code that fully handles optimistic cases. The
**second
-pass: slow-path lookup**, is normally executed only for the selection of inputs
-that have not been finished in the first pass, although it can also be called
-directly on all of the inputs, skipping fast-path lookup. It handles all
special
-cases and inserts but in order to be robust it is not as efficient as
fast-path.
-Slow-path lookup does not need to repeat the work done in fast-path lookup - it
-can use the state reached at the end of fast-path lookup as a starting point.
-
-Fast-path lookup implements search only for the first stamp match and only
within
-the start block. It only makes sense when we already have at least one key
-inserted into the hash table, since it does not handle inserts. It takes a
vector
-of key hashes as an input and based on it outputs three pieces of information
for
-each key:
-
-- Key id corresponding to the slot in which a matching stamp was found. Any
valid
- key id if a matching stamp was not found.
-- A flag indicating if a match was found or not.
-- Slot id of a slot from which slow-path should pick up the search if the first
- match was either not found or it turns out to be false positive after
- evaluating key comparison.
+The Java implementation exposes some capabilities from Arrow datasets. These
use Acero implicitly. There
+are no direct bindings to Acero or Substrait in the Java implementation today.
-.. note::
- Improvement to consider: precomputing 1st pass lookup results.
-
- If the hash table is small, the number of inserted keys is small, we could
- further simplify and speed-up the first pass by storing in a lookup table
- pre-computed results for all combinations of hash bits. Let us consider the
- case of Swiss table of size 5 that has 256 slots and up to 128 inserted
keys.
- Only 12 bits of hash are used by lookup in that case: 5 to select a block, 7
- to create a stamp. For all :math:`2^{12}` combinations of those bits we
could
- keep the result of first pass lookup in an array. Key id and a match
- indicating flag can use one byte: 7 bits for key id and 1 bit for the flag.
- Note that slot id is only needed if we go into 2nd pass lookup, so it can be
- stored separately and likely only accessed by a small subset of keys.
- Fast-path lookup becomes almost a single fetch of result from a 4KB array.
- Lookup arrays used to implement this need to be kept in sync with the main
- copy of data about slots, which requires extra care during inserts. Since
the
- number of entries in lookup arrays is much higher than the number of slots,
- this technique only makes sense for small hash tables.
-
-Dense comparisons
-~~~~~~~~~~~~~~~~~
-
-If there is at least one key inserted into a hash table, then every slot
contains
-a key id value that corresponds to some actual key that can be used in
-comparison. That is because empty slots are initialized with 0 as their key id.
-After the fast-path lookup we get a match-found flag for each input. If it is
-set, then we need to run a comparison of the input key with the key in the hash
-table identified by key id returned by fast-path code. The comparison will
verify
-that there is a true match between the keys. We only need to do this for a
-subset of inputs that have a match candidate, but since we have key id values
-corresponding to some real key for all inputs, we may as well execute
-comparisons on all inputs unconditionally. If the majority (e.g. more than 80%)
-of the keys have a match candidate, the cost of evaluating comparison for the
-remaining fraction of keys but without filtering may actually be cheaper than
the
-cost of running evaluation only for required keys while referencing filter
-information. This can be seen as a variant of general preconditioning
techniques
-used to avoid diverging conditional branches in the code. It may be used, based
-on some heuristic, to verify matches reported by fast-path lookups and is
-referred to as **dense comparisons**.
-
-Resizing
---------
-
-New hash table is initialized as empty and has only a single block with a space
-for only a few key entries. Doubling of the hash table size becomes necessary
as
-more keys get inserted. It is invoked during the 2nd pass of the lookups, which
-also handles inserts. It happens immediately after the number of inserted keys
-reaches a specific upper limit decided based on a current size of the hash
table.
-There may still be unprocessed entries from the input mini-batch after
resizing,
-so the 2nd pass of the lookup is restarted right after, with the bigger hash
-table and the remaining subset of unprocessed entries.
-
-Current policy, that should work reasonably well, is to resize a small hash
table
-(up to 8KB) when it is 50% full. Larger hash tables are resized when 75% full.
-We want to keep size in memory as small as possible, while maintaining a low
-probability of blocks becoming full.
-
-When discussing resizing we will be talking about **resize source** and
**resize
-target** tables. The diagram below shows how the same hash bits are interpreted
-differently by the source and the target.
-
-.. image:: img/swiss_table_10.jpg
-
-For a given hash, if a start block id was L in the source table, it will be
-either (2*L+0) or (2*L+1) in the target table. Based on that we can expect data
-access locality when migrating the data between the tables.
-
-Resizing is cheap also thanks to the fact that hash values for keys in the hash
-table are kept together with other slot data and do not need to be recomputed.
-That means that resizing procedure does not ever need to access the actual
bytes
-of the key.
-
-1st pass
-~~~~~~~~
-
-Based on the hash value for a given slot we can tell whether this slot contains
-an overflow or non-overflow entry. In the first pass we go over all source
slots
-in sequence, filter out overflow entries and move to the target table all other
-entries. Non-overflow entries from a block L will be distributed between blocks
-(2*L+0) and (2*L+1) of the target table. None of these target blocks can
-overflow, since they will be accommodating at most 8 input entries during this
-pass.
-
-For every non-overflow entry, the highest bit of a stamp in the source slot
-decides whether it will go to the left or to the right target block. It is
-further possible to avoid any conditional branches in this partitioning code,
so
-that the result is friendly to the CPU execution pipeline.
-
-.. image:: img/swiss_table_11.jpg
-
-2nd pass
-~~~~~~~~
-
-In the second pass of resizing, we scan all source slots again, this time
-focusing only on the overflow entries that were all skipped in the 1st pass. We
-simply reinsert them in the target table using generic insertion code with one
-exception. Since we know that all the source keys are different, there is no
-need to search for a matching stamp or run key comparisons (or look at the key
-values). We just need to find the first open block beginning with the start
-block in the target table and use its first empty slot as the insert
-destination.
-
-We expect overflow entries to be rare and therefore the relative cost of that
-pass should stay low.
+Design Philosophies
+===================
+
+Engine Independent Compute
+--------------------------
+
+If a node requires complex computation then it should encapsulate that work in
abstractions that don't depend on
+any particular engine design. For example, the hash join node uses utilities
such as a row encoder, a hash table,
+and an exec batch builder. Other places share implementations of sequencing
queues and row segmenters. The node
+itself should be kept minimal and simply maps from Acero to the abstraction.
+
+This helps to decouple designs from Acero's design details and allows them to
be more resilient to changes in the
+engine. It also helps to promote these abstractions as capabilities on their
own. Either for use in other engines
+or for potential new additions to pyarrow as compute utilities.
+
+Make Tasks not Threads
+----------------------
+
+If you need to run something in parallel then you should use thread tasks and
not dedicated threads.
+
+ * This keeps the thread count down (reduces thread contention and context
switches)
+ * This prevents deadlock (tasks get cancelled automatically in the event of a
failure)
+ * This simplifies profiling (Tasks can be easily measured, easier to know
where all the work is)
+ * This makes it possible to run without threads (sometimes users are doing
their own threading and
+ sometimes we need to run in thread-restricted environments like emscripten)
+
+Note: we do not always follow this advice currently. There is a dedicated
process thread in the asof join
+node. Dedicated threads are "ok" for experimental use but we'd like to
migrate away from them.
+
+Don't Block on CPU Threads
+--------------------------
+
+If you need to run a potentially long running activity that is not actively
using CPU resources (e.g. reading from
+disk, network I/O, waiting on an external library using its own threads) then
you should use asynchronous utilities
+to ensure that you do not block CPU threads.
+
+Don't Reinvent the Wheel
+------------------------
+
+Each node should not be a standalone island of utilities. Where possible,
computation should be pushed
+either into compute functions or into common shared utilities. This is the
only way a project as large as
+this can hope to be maintained.
+
+Avoid Query Optimization
+------------------------
+
+Writing an efficient Acero plan can be challenging. For example, filter
expressions and column selection
+should be pushed down into the scan node so that the data isn't read from
disk. Expressions should be
+simplified and common sub-expressions factored out. The build side of a hash
join node should be the
+smaller of the two inputs.
+
+However, figuring these problems out is a challenge reserved for a query
planner or a query optimizer.
+Creating a query optimizer is a challenging task beyond the scope of Acero.
With adoption of Substrait
+we hope utilities will eventually emerge that solve these problems. As a
result, we generally avoid doing
+any kind of query optimization within Acero. Acero should interpret
declarations as literally as possible.
+This helps reduce maintenance and avoids surprises.
+
+We also realize that this is not always possible. For example, the hash join
node currently detects if there
+is a chain of hash join operators and, if there is, it configure bloom filters
between the operators. This is
+technically a task that could be left to a query optimizer. However, this
behavior is rather specific to Acero
+and fairly niche and so it is unlikely it will be introduced to an optimizer
anytime soon.
+
+Performance Guidelines
+======================
+
+Batch Size
+----------
+
+Perhaps the most discussed performance criteria is batch size. Acero was
originally
+designed based on research to follow a morsel-batch model. Tasks are created
based on
+a large batch of rows (a morsel). The goal is for the morsel to be large
enough to justify
+the overhead of a task. Within a task the data is further subdivided into
batches.
+Each batch should be small enough to fit comfortable into CPU cache (often the
L2 cache).
+
+This sets up two loops. The outer loop is parallel and the inner loop is not:
+
+.. code:: python
+
+ for morsel in dataset: # parallel
+ for batch in morsel:
+ run_pipeline(batch)
+
+The advantage of this style of execution is that successive nodes (or
successive operations
+within an exec node) that access the same column are likely to benefit from
cache. It also
+is essential for functions that require random access to data. It maximizes
parallelism while
+minimizing the data transfer from main memory to CPU cache.
+
+.. figure:: img/microbatch.svg
+
+ If multiple passes through the data are needed (or random access) and the
batch is much bigger
+ then the cache then performance suffers. Breaking the task into smaller
batches helps improve
+ task locality.
+
+The morsel/batch model is reflected in a few places in Acero:
+
+ * In most source nodes we will try and grab batches of 1Mi rows. This is
often configurable.
+ * In the source node we then iterate and slice off batches of 32Ki rows.
This is not currently
+ configurable.
+ * The hash join node currently requires that a batches contain at 32Ki rows
or less as it uses
+ 16-bit signed integers as row indices in some places.
+
+However, this guidance is debateable. Profiling has shown that we do not get
any real benefit
+from moving to a smaller batch size. It seems any advantage we do get is lost
in per-batch
+overhead. Most of this overhead appears to be due to various per-batch
allocations. In addition,
+depending on your hardware, it's not clear that CPU Cache<->RAM will always be
the bottleneck. A
+combination of linear access, pre-fetch, and high CPU<->RAM bandwidth can
alleviate the penalty
+of cache misses.
+
+As a result, this section is included in the guide to provide historical
context, but should not
+be considered binding.
+
+Ongoing & Deprecated Work
+=========================
+
+The following efforts are ongoing. They are described here to explain certain
duplication in the
+code base as well as explain types that are going away.
+
+Scanner v2
+----------
+
+The scanner is currently a node in the datasets module registered with the
factory registry as "scan".
+This node was written prior to Acero and made extensive use of AsyncGenerator
to scan multiple files
+in parallel. Unfortunately, the use of AsyncGenerator made the scan difficult
to profile, difficult
+to debug, and impossible to cancel. A new scan node is in progress. It is
currently registered with
+the name "scan2". The new scan node uses the AsyncTaskScheduler instead of
AsyncGenerator and should
+provide additional features such as the ability to skip rows and handle nested
column projection (for
+formats that support it)
+
+OrderBySink and SelectKSink
+---------------------------
+
+These two exec nodes provided custom sink implementations. They were written
before ordered execution
+was added to Acero and were the only way to generate ordered output. However,
they had to be placed
+at the end of a plan and the fact that they were custom sink nodes made them
difficult to describe with
+Declaration. The OrderByNode and FetchNode replace these. These are kept at
the moment until existing
+bindings move away from them.
+
+Upstreaming Changes
+===================
+
+Acero is designed so that it can be extended without recompilation. You can
easily add new compute
+functions and exec nodes without creating a fork or compiling Acero. However,
as you develop new
+features that are generally useful, we hope you will make time to upstream
your changes.
+
+Even though we welcome these changes we have to admit that there is a cost to
this process. Upstreaming
+code requires that the new module behave correctly, but that is typically the
easier part to review.
+More importantly, upstreaming code is a process of transferring the
maintenance burden from yourself to
+the wider Arrow C++ project maintainers. This requires a deep understanding
of the code by maintainers,
+it requires the code be consistent with the style of the project, and it
requires that the code be well
+tested with unit tests to aid in regression.
+
+Because of this, we highly recommend taking the following steps:
+
+* As you are starting out you should send a message to the mailing list
announcing your intentions and
+ design. This will help you determine if there is wider interest in the
feature and others may have
+ ideas or suggestions to contribute early on in the process.
+
+ * If there is not much interest in the feature then keep in mind that it may
be difficult to eventually
+ upstream the change. The maintenance capacity of the team is limited and
we try and prioritize
+ features that are in high demand.
+
+* We recommend developing and testing the change on your own fork until you
get it to a point where you
+ are fairly confident things are working correctly. If the change is large
then you might also think
+ about how you can break up the change into smaller pieces. As you do this
you can share both the larger
+ PR (as a draft PR or a branch on your local fork) and the smaller PRs. This
way we can see the context
+ of the smaller PRs. However, if you do break things up, smaller PRs should
still ideally stand on their
+ own.
+
+* Any PR will need to have the following:
+
+ * Unit tests converting the new functionality
+
+ * Microbenchmarks if there is any significant compute work going on
+
+ * Examples demonstrating how to use the new feature
+
+ * Updates to the API reference and this guide
+
+ * Passing CI (you can enable GitHub Actions on your fork and that will allow
most CI jobs to run before
+ you create your PR)
+
+Others
+======
+
+.. toctree::
+ :maxdepth: 2
+
+ acero/swiss_table
diff --git a/docs/source/developers/cpp/img/swiss_table_1.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_1.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_1.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_1.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_10.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_10.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_10.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_10.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_11.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_11.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_11.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_11.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_2.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_2.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_2.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_2.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_3.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_3.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_3.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_3.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_4.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_4.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_4.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_4.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_5.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_5.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_5.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_5.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_6.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_6.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_6.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_6.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_7.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_7.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_7.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_7.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_8.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_8.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_8.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_8.jpg
diff --git a/docs/source/developers/cpp/img/swiss_table_9.jpg
b/docs/source/developers/cpp/acero/img/swiss_table_9.jpg
similarity index 100%
rename from docs/source/developers/cpp/img/swiss_table_9.jpg
rename to docs/source/developers/cpp/acero/img/swiss_table_9.jpg
diff --git a/docs/source/developers/cpp/acero.rst
b/docs/source/developers/cpp/acero/swiss_table.rst
similarity index 98%
copy from docs/source/developers/cpp/acero.rst
copy to docs/source/developers/cpp/acero/swiss_table.rst
index 688cfa9d5d..b590102306 100644
--- a/docs/source/developers/cpp/acero.rst
+++ b/docs/source/developers/cpp/acero/swiss_table.rst
@@ -16,12 +16,8 @@
.. under the License.
.. highlight:: console
-.. _development-cpp-acero:
-
-================
-Developing Acero
-================
+===========
Swiss Table
===========
@@ -30,7 +26,7 @@ key field values to a dense set of integer ids. Ids can later
be used in place
of keys to identify groups of rows with equal keys.
Introduction
-------------
+============
Hash group-by in Arrow uses a variant of a hash table based on a data structure
called Swiss table. Swiss table uses linear probing. There is an array of slots
@@ -49,7 +45,7 @@ from likening resulting sequences of empty slots to holes in
a one dimensional
cheese.
Interface
----------
+=========
Hash table used in query processing for implementing join and group-by
operators
does not need to provide all of the operations that a general purpose hash
table
@@ -121,7 +117,7 @@ is actually no need for a hash function callback. It is
enough that the caller
provides hash values for all entries in the batch when calling
lookup-or-insert.
Basic architecture and organization of data
--------------------------------------------
+===========================================
The hash table is an array of **slots**. Slots are grouped in groups of 8
called
**blocks**. The number of blocks is a power of 2. The empty hash table starts
@@ -210,7 +206,7 @@ in that range. This is useful when resizing a hash table or
merging two hash
tables together.
Interleaving status bytes and key ids
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-------------------------------------
Status bytes and key ids for all slots are stored in a single array of bytes.
They are first grouped by 8 into blocks, then each block of status bytes is
@@ -265,7 +261,7 @@ impact the lookups outside these events.
number of slots, but that should be a small fraction of the total size.
32-bit hash vs 64-bit hash
-~~~~~~~~~~~~~~~~~~~~~~~~~~
+--------------------------
Currently we use 32-bit hash values in Swiss table code and 32-bit integers as
key ids. For the robust implementation, sooner or later we will need to support
@@ -278,7 +274,7 @@ of false positives during lookups will start quickly
growing. 32-bit hash should
not be used with more than about 16 million inserted keys.
Low memory footprint and low chance of hash collisions
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+------------------------------------------------------
Swiss table is a good choice of a hash table for modern hardware, because it
combines lookups that can take advantage of special CPU instructions with space
@@ -310,7 +306,7 @@ of comparisons per lookup being close to 1 for keys already
present and 0 for
new keys.
Lookup
-------
+======
Lookup-or-insert operation, given a hash of a key, finds a list of candidate
slots with corresponding keys that are likely to be equal to the input key. The
@@ -328,7 +324,7 @@ involved, because of vectorization of the processing and
various optimizations
for common cases.
Search within a single block
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+----------------------------
There are three possible cases that can occur when searching for a match for a
given key (that is, for a given stamp of a key) within a single block,
@@ -409,7 +405,7 @@ instruction set allows to process quadruplets of 64-bit
values in a single
instruction, four searches at once.
Complete search potentially across multiple blocks
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+--------------------------------------------------
Full implementation of a search for a matching key may involve visiting
multiple
blocks beginning with the start block selected based on the hash of the key. We
@@ -425,7 +421,7 @@ stamp is encountered. Eventually the search stops when
either:
.. image:: img/swiss_table_9.jpg
Optimistic processing with two passes
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-------------------------------------
Hash table lookups may have high cost in the pessimistic case, when we
encounter
cases of hash collisions and full blocks that lead to visiting further blocks.
In
@@ -479,7 +475,7 @@ each key:
this technique only makes sense for small hash tables.
Dense comparisons
-~~~~~~~~~~~~~~~~~
+-----------------
If there is at least one key inserted into a hash table, then every slot
contains
a key id value that corresponds to some actual key that can be used in
@@ -500,7 +496,7 @@ on some heuristic, to verify matches reported by fast-path
lookups and is
referred to as **dense comparisons**.
Resizing
---------
+========
New hash table is initialized as empty and has only a single block with a space
for only a few key entries. Doubling of the hash table size becomes necessary
as
@@ -532,7 +528,7 @@ That means that resizing procedure does not ever need to
access the actual bytes
of the key.
1st pass
-~~~~~~~~
+--------
Based on the hash value for a given slot we can tell whether this slot contains
an overflow or non-overflow entry. In the first pass we go over all source
slots
@@ -550,7 +546,7 @@ that the result is friendly to the CPU execution pipeline.
.. image:: img/swiss_table_11.jpg
2nd pass
-~~~~~~~~
+--------
In the second pass of resizing, we scan all source slots again, this time
focusing only on the overflow entries that were all skipped in the 1st pass. We
diff --git a/docs/source/cpp/acero/async.md
b/docs/source/developers/cpp/img/async.md
similarity index 100%
rename from docs/source/cpp/acero/async.md
rename to docs/source/developers/cpp/img/async.md
diff --git a/docs/source/cpp/acero/async.svg
b/docs/source/developers/cpp/img/async.svg
similarity index 100%
rename from docs/source/cpp/acero/async.svg
rename to docs/source/developers/cpp/img/async.svg
diff --git a/docs/source/cpp/acero/dist_plan.svg
b/docs/source/developers/cpp/img/dist_plan.svg
similarity index 100%
rename from docs/source/cpp/acero/dist_plan.svg
rename to docs/source/developers/cpp/img/dist_plan.svg
diff --git a/docs/source/cpp/acero/microbatch.svg
b/docs/source/developers/cpp/img/microbatch.svg
similarity index 100%
rename from docs/source/cpp/acero/microbatch.svg
rename to docs/source/developers/cpp/img/microbatch.svg
diff --git a/docs/source/cpp/acero/ordered.svg
b/docs/source/developers/cpp/img/ordered.svg
similarity index 100%
rename from docs/source/cpp/acero/ordered.svg
rename to docs/source/developers/cpp/img/ordered.svg
diff --git a/docs/source/cpp/acero/pipeline.svg
b/docs/source/developers/cpp/img/pipeline.svg
similarity index 100%
rename from docs/source/cpp/acero/pipeline.svg
rename to docs/source/developers/cpp/img/pipeline.svg
diff --git a/docs/source/cpp/acero/pipeline_task.svg
b/docs/source/developers/cpp/img/pipeline_task.svg
similarity index 100%
rename from docs/source/cpp/acero/pipeline_task.svg
rename to docs/source/developers/cpp/img/pipeline_task.svg
diff --git a/docs/source/java/substrait.rst b/docs/source/java/substrait.rst
index fa20dbd61d..5048552613 100644
--- a/docs/source/java/substrait.rst
+++ b/docs/source/java/substrait.rst
@@ -19,7 +19,7 @@
Substrait
=========
-The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero
<../cpp/streaming_execution>`
+The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero
<../cpp/acero>`
query engine.
.. contents::
@@ -199,5 +199,5 @@ This Java program:
.. _`Substrait`: https://substrait.io/
.. _`Substrait Java`: https://github.com/substrait-io/substrait-java
-.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
+.. _`Acero`: https://arrow.apache.org/docs/cpp/acero.html
.. _`Extended Expression`:
https://github.com/substrait-io/substrait/blob/main/site/docs/expressions/extended_expression.md
diff --git a/docs/source/python/api/acero.rst b/docs/source/python/api/acero.rst
index a8107f3cc1..b75fc33061 100644
--- a/docs/source/python/api/acero.rst
+++ b/docs/source/python/api/acero.rst
@@ -46,7 +46,7 @@ and to execute this efficiently in a batched manner.
.. seealso::
- :doc:`Acero C++ user guide <../../cpp/streaming_execution>`
+ :doc:`Acero C++ user guide <../../cpp/acero>`
:ref:`api.substrait`
Alternative way to run Acero from a standardized Substrait plan.
diff --git a/docs/source/python/integration/substrait.rst
b/docs/source/python/integration/substrait.rst
index eaa6151e4d..8602e073f5 100644
--- a/docs/source/python/integration/substrait.rst
+++ b/docs/source/python/integration/substrait.rst
@@ -23,7 +23,7 @@ The ``arrow-substrait`` module implements support for the
Substrait_ format,
enabling conversion to and from Arrow objects.
The ``arrow-dataset`` module can execute Substrait_ plans via the
-:doc:`Acero <../cpp/streaming_execution>` query engine.
+:doc:`Acero <../cpp/acero>` query engine.
.. contents::
@@ -245,5 +245,5 @@ the expressions can be passed to the dataset scanner in the
form of
.. _`Substrait`: https://substrait.io/
.. _`Substrait Python`: https://github.com/substrait-io/substrait-python
-.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
+.. _`Acero`: https://arrow.apache.org/docs/cpp/acero.html
.. _`Extended Expression`:
https://github.com/substrait-io/substrait/blob/main/site/docs/expressions/extended_expression.md