zeodtr commented on issue #7698:
URL:
https://github.com/apache/arrow-datafusion/issues/7698#issuecomment-1815885644
I've tried to optimize the logical planning and optimization routines.
As a result, for a wide aggregation query, the planning + optimization time
was reduced from 49 seconds to 0.8 seconds.
The details are as follows:
* The version of DataFusion: 31.0.0 (with small in-house modifications)
* The query
* a SELECT query
* ~3000 aggregation functions in the SELECT clause
* FROM clause has one table that has 3617 columns
* 16 GROUP BY columns
In the following, each optimization step is accumulated. Elapsed times are
reduced accordingly. All times are in milliseconds.
All steps do not require deep knowledge for plan building.
**No code change**
* elapsed time after creating a logical plan: 11468
* elapsed time after optimization: 48734
**Optimization 1: In `DFField`, precompute qualifier_name in `new...()`
functions, set it to a member variable, and use it in `qualified_name()`**
* elapsed time after creating a logical plan: 10571
* elapsed time after optimization: 29375
**Optimization 2: Apply https://github.com/apache/arrow-datafusion/pull/7870
(Use btree to search fields in DFSchema)**
* elapsed time after creating a logical plan: 2429
* elapsed time after optimization: 20307
**Optimization 3: Change `DFField`'s `qualified_name()` to return
`Arc<String>` instead of `String`**
And change other codes accordingly, to avoid string `clone()`ing.
* elapsed time after creating a logical plan: 2285
* elapsed time after optimization: 15503
**Optimization 4: precompute `using_columns` in
`logical_plan::builder::project()`**
Like this:
```rust
let using_columns = plan.using_columns()?;
for e in expr {
let e = e.into();
match e {
Expr::Wildcard => {
projected_expr.extend(expand_wildcard(input_schema, &plan,
None)?)
}
Expr::QualifiedWildcard { ref qualifier } => projected_expr
.extend(expand_qualified_wildcard(qualifier, input_schema,
None)?),
_ => projected_expr.push(columnize_expr(
normalize_col_with_using_columns(e, &plan, &using_columns)?,
input_schema,
)),
}
}
```
And implement `expr_rewriter::normalize_col_with_using_columns()` and
`logical_plan::builder::normalize_with_using_columns()` that receives
`using_columns` as an argument.
* elapsed time after creating a logical plan: 1491
* elapsed time after optimization: 14376
**Optimization 5: In `DFSchema::merge()` check `duplicated_field` with
`bool`-based functions instead of `Error`-based functions**
Like this:
```rust
let duplicated_field = match field.qualifier() {
Some(q) => self.has_field_with_qualified_name(q,
field.name()),
// for unqualified columns, check as unqualified name
None => self.has_field_with_unqualified_name(field.name()),
};
```
And implement `has_field_with_unqualified_name()` and
`has_field_with_qualified_name()` which returns `bool` without involving
`Error`. Since it is not an error condition, receiving `bool` is more
appropriate anyway.
Additionally `get_index_of_column_by_name()` which returns `Option<usize>`
instead of `Result<Option<usize>>` for the above functions.
`field_not_found()` and `unqualified_field_not_found()` are heavy when used
for a wide table since they return all valid field names in the `Error`. So
they must be avoided when not necessary.
* elapsed time after creating a logical plan: 899
* elapsed time after optimization: 6538
**Optimization 6: In `expr::utils::columnize()` use bool-based functions
instead of `Error`-based functions**
Similar to optimization 5.
Like this:
```rust
_ => match e.display_name() {
Ok(name) => match
input_schema.get_field_with_unqualified_name(&name) {
Some(field) => Expr::Column(field.qualified_column()),
// expression not provided as input, do not convert to a
column reference
None => e,
},
Err(_) => e,
},
```
And implement `get_field_with_unqualified_name()` in `DFSchema` which
returns `Option<&DFField>` instead of `Result<&DFField>`. Since it is not an
error condition, receiving `Option` is more appropriate anyway.
* elapsed time after creating a logical plan: 442
* elapsed time after optimization: 6033
**Optimization 7: Use `IndexSet` in `expr::utils::find_exprs_in_exprs()`**
Like this:
```rust
fn find_exprs_in_exprs<F>(exprs: &[Expr], test_fn: &F) -> Vec<Expr>
where
F: Fn(&Expr) -> bool,
{
exprs
.iter()
.flat_map(|expr| find_exprs_in_expr(expr, test_fn))
.fold(IndexSet::new(), |mut acc, expr| {
acc.insert(expr);
acc
})
.into_iter()
.collect()
}
```
`IndexSet` is in https://docs.rs/indexmap/latest/indexmap/ crate.
* elapsed time after creating a logical plan: 391
* elapsed time after optimization: 5889
**Optimization 8: In
`logical_plan::plan::calc_func_dependencies_for_project()` return early when
there is no functional dependencies**
Like this:
```rust
fn calc_func_dependencies_for_project(
exprs: &[Expr],
input: &LogicalPlan,
) -> Result<FunctionalDependencies> {
let input_schema = input.schema();
if !input_schema.has_functional_dependencies() {
return Ok(FunctionalDependencies::empty());
}
```
And implement `DFSchema::has_functional_dependencies()` and
`FunctionalDependencies::is_empty()` for it.
`calc_func_dependencies_for_project()` does heavy operation to get
`proj_indices` even before it calls `project_functional_dependencies()` which
is useless when there is no functional dependency (which is common since
functional dependency is rare in my opinion).
I think that functional dependency-related functions require arguments that
require heavy operation even before checking whether they are required or not.
So, it would be great if functional dependency-related functions receive
`FnOnce` instead of precomputed data, to skip heavy operations if not required.
* elapsed time after creating a logical plan: 219
* elapsed time after optimization: 845
Now, the resulting 0.8 seconds is acceptable for me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]