mslapek commented on PR #5745: URL: https://github.com/apache/arrow-datafusion/pull/5745#issuecomment-1485376378
@mingmwang Thank you for your observation. Let's make an experiment...
## Experiment
Copied `arrow-datafusion/datafusion/core/tests/data/aggregate_simple.csv` to
`aggregate_simple.csv`.
Script:
```rust
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
ctx.register_csv(
"my_table",
"datafusion/core/tests/data/aggregate_simple.csv",
CsvReadOptions::default(),
).await?;
ctx.register_csv(
"my_table2",
"datafusion/core/tests/data/aggregate_simple2.csv",
CsvReadOptions::default(),
).await?;
let sql = "SELECT COUNT(*) FROM (SELECT * from my_table UNION ALL SELECT
* from my_table2)";
let df = ctx.sql(sql).await?;
df.clone().explain(true, false)?.show().await?;
df.show().await?;
Ok(())
}
```
Optimized logical and physical plans are below.
## Without `push_down_aggregate`
*Explain after optimizations*
```
...
| logical_plan | Aggregate:
groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
|
| | Union
|
| |
TableScan: my_table projection=[c1]
|
| |
Projection: my_table2.c1 AS my_table.c1
|
| |
TableScan: my_table2 projection=[c1]
...
| physical_plan |
AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
|
| |
CoalescePartitionsExec
|
| |
AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
|
| |
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2
|
| |
UnionExec
|
| |
CsvExec: files={1 group:
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple.csv]]},
has_header=true, limit=None, projection=[c1] |
| |
ProjectionExec: expr=[c1@0 as my_table.c1]
|
| |
CsvExec: files={1 group:
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple2.csv]]},
has_header=true, limit=None, projection=[c1] |
| |
|
+------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
These CSV files will be naively mixed together with `RoundRobinBatch` - only
to count the rows at the end! 😧
> actually the partial agg is already pushed down.
❗️ No, it isn't...
## With `push_down_aggregate`
*Explain after optimizations*
```
...
| logical_plan | Projection:
SUM(COUNT(aggr_1)) AS COUNT(UInt8(1))
|
| | Aggregate:
groupBy=[[]], aggr=[[SUM(COUNT(aggr_1))]]
|
| | Union
|
| |
Aggregate: groupBy=[[]], aggr=[[COUNT(aggr_1)]]
|
| |
Projection: UInt8(1) AS aggr_1
|
| |
TableScan: my_table projection=[c1]
|
| |
Aggregate: groupBy=[[]], aggr=[[COUNT(aggr_1)]]
|
| |
Projection: UInt8(1) AS aggr_1
|
| |
TableScan: my_table2 projection=[c1]
...
| physical_plan |
ProjectionExec: expr=[SUM(COUNT(aggr_1))@0 as COUNT(UInt8(1))]
|
| |
AggregateExec: mode=Final, gby=[], aggr=[SUM(COUNT(aggr_1))]
|
| |
CoalescePartitionsExec
|
| |
AggregateExec: mode=Partial, gby=[], aggr=[SUM(COUNT(aggr_1))]
|
| |
UnionExec
|
| |
AggregateExec: mode=Final, gby=[], aggr=[COUNT(aggr_1)]
|
| |
CoalescePartitionsExec
|
| |
AggregateExec: mode=Partial, gby=[], aggr=[COUNT(aggr_1)]
|
| |
ProjectionExec: expr=[1 as aggr_1]
|
| |
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
|
| |
CsvExec: files={1 group:
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple.csv]]},
has_header=true, limit=None, projection=[c1] |
| |
AggregateExec: mode=Final, gby=[], aggr=[COUNT(aggr_1)]
|
| |
CoalescePartitionsExec
|
| |
AggregateExec: mode=Partial, gby=[], aggr=[COUNT(aggr_1)]
|
| |
ProjectionExec: expr=[1 as aggr_1]
|
| |
RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
|
| |
CsvExec: files={1 group:
[[/home/hello/repo/arrow-datafusion/datafusion/core/tests/data/aggregate_simple2.csv]]},
has_header=true, limit=None, projection=[c1] |
|
```
There is no mixing of the file contents.
To `AggregateExec: mode=Partial, gby=[], aggr=[SUM(COUNT(aggr_1))]` are
going only two rows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
