Andy Grove created ARROW-9464:
---------------------------------
Summary: [Rust] [DataFusion] Physical plan refactor to support
async and optimization rules
Key: ARROW-9464
URL: https://issues.apache.org/jira/browse/ARROW-9464
Project: Apache Arrow
Issue Type: Improvement
Components: Rust, Rust - DataFusion
Reporter: Andy Grove
I would like to propose a refactor of the physical/execution planning based
experience I have had in implementing distributed execution in Ballista.
This will likely need subtasks but here is an overview of the changes I am
proposing.
>> 1. Introduce enum to represent physical plan.
By wrapping the execution plan structs in an enum, we make it possible to build
a tree representing the physical plan just like we do with the logical plan.
This makes it easy to print physical plans and also to apply transformations to
it.
{code:java}
pub enum PhysicalPlan {
/// Projection.
Projection(Arc<ProjectionExec>),
/// Filter a.k.a predicate.
Filter(Arc<FilterExec>),
/// Hash aggregate
HashAggregate(Arc<HashAggregateExec>),
/// Performs a hash join of two child relations by first shuffling the data
using the join keys.
ShuffledHashJoin(ShuffledHashJoinExec),
/// Performs a shuffle that will result in the desired partitioning.
ShuffleExchange(Arc<ShuffleExchangeExec>),
/// Reads results from a ShuffleExchange
ShuffleReader(Arc<ShuffleReaderExec>),
/// Scans a partitioned data source
ParquetScan(Arc<ParquetScanExec>),
/// Scans an in-memory table
InMemoryTableScan(Arc<InMemoryTableScanExec>),
}{code}
>> 2. Introduce physical plan optimization rule to insert "shuffle" operators
We should extend the ExecutionPlan trait so that each operator can specify its
input and output partitioning needs, and then have an optimization rule that
can insert any repartioning or reordering steps required.
For example, these are the methods to be added to ExecutionPlan. This design is
based on Apache Spark.
{code:java}
/// Specifies how data is partitioned across different nodes in the cluster
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(0)
}
/// Specifies the data distribution requirements of all the children for this
operator
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
/// Specifies how data is ordered in each partition
fn output_ordering(&self) -> Option<Vec<SortOrder>> {
None
}
/// Specifies the data distribution requirements of all the children for this
operator
fn required_child_ordering(&self) -> Option<Vec<Vec<SortOrder>>> {
None
}
{code}
A good example of applying this rule would be in the case of hash aggregates
where we perform a partial aggregate in parallel across partitions and then
coalesce the results and apply a final hash aggregate.
Another example would be a SortMergeExec specifying the sort order required for
its children.
>> 3. Make execution async
The execution plan trait should use the async keyword. This will require adding
dependencies on async_trait and smol. This allows us to remove much of the
manual thread management and have more efficient execution.
The main benefits of these changes are:
# Simplify implementation of physical operators, because the optimizer will
take care of repartitioning concerns
# The ability to print a physical query plan
# More efficient query execution because of the use of async
# Easier for projects like Ballista to use DataFusion and add their own
optimization rules e.g. replacing repartitioning steps with distributed
equivalents
--
This message was sent by Atlassian Jira
(v8.3.4#803005)