geoffreyclaude commented on PR #14918:
URL: https://github.com/apache/datafusion/pull/14918#issuecomment-2694925444
> I still could not reproduce any improvement with this PR, FWIW. I still
think it is a good change so i merged it in, but it might be cool to find some
benchmark results that showed the improvement
@alamb: I've updated your reproducer to do a `COUNT(*)` instead of a
`LIMIT(10)`. The `COUNT(*)` only needs to read the metadata from the object
store, as the number of rows is stored in the Parquet metadata, and effectively
"runs" the query during the physical plan creation.
Without the fix, the physical plan creation takes ~2.5 seconds.
With the fix, it drops to ~700ms.
The logical planning and query execution take the same time in both.
<details>
```rust
use arrow::util::pretty::pretty_format_batches;
use datafusion::config::{ParquetOptions, TableParquetOptions};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::functions_aggregate::expr_fn::count;
use datafusion::logical_expr::utils::COUNT_STAR_EXPANSION;
use datafusion::physical_plan::displayable;
use datafusion::prelude::{Expr, SessionContext};
use futures::StreamExt;
use std::sync::Arc;
use std::time::Instant;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let ctx = SessionContext::new();
let object_store_url =
ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
let object_store = object_store::http::HttpBuilder::new()
.with_url(object_store_url.as_str())
.build()
.unwrap();
ctx.register_object_store(object_store_url.as_ref(),
Arc::new(object_store));
// urls are like
//
https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
//let base_url =
ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap();
let paths: Vec<ListingTableUrl> = (1..100).map(|i|
format!("https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{i}.parquet"))
.map(|url| ListingTableUrl::parse(&url).unwrap())
.collect();
let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::new())).with_collect_stat(true);
println!("Creating table / reading statistics....");
let config = ListingTableConfig::new_with_multi_paths(paths)
.with_listing_options(listing_options)
.infer_schema(&ctx.state())
.await?;
let start = Instant::now();
let listing_table = ListingTable::try_new(config).unwrap();
let df = ctx
.read_table(Arc::new(listing_table))?
.aggregate(vec![],
vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?;
println!("Done in {:?}", Instant::now() - start);
let logical_plan_str = df.logical_plan().display_indent().to_string();
println!("Logical plan:\n{logical_plan_str}");
println!("Creating physical plan...");
let start = Instant::now();
let physical_plan = df.create_physical_plan().await?;
println!("Done in {:?}", Instant::now() - start);
let physical_plan_str =
displayable(physical_plan.as_ref()).indent(true).to_string();
println!("Physical plan:\n{physical_plan_str}");
println!("Running query...");
let start = Instant::now();
let mut result_stream = physical_plan.execute(0, ctx.task_ctx())?;
let mut batches = vec![];
while let Some(record) = result_stream.next().await {
batches.push(record?);
}
println!(
"Got {} batches in {:?}",
batches.len(),
Instant::now() - start
);
let response_str = pretty_format_batches(&batches)?.to_string();
println!("Query result:\n{response_str}");
Ok(())
}
````
</details>
### main
```text
Creating table / reading statistics....
Done in 2.267666ms
Logical plan:
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
TableScan: ?table?
Creating physical plan...
Done in 2.510654875s
Physical plan:
ProjectionExec: expr=[98997497 as count(Int64(1))]
PlaceholderRowExec
Running query...
Got 1 batches in 215.333µs
Query result:
+-----------------+
| count(Int64(1)) |
+-----------------+
| 98997497 |
+-----------------+
```
### fix/concurrent_heads
```text
Creating table / reading statistics....
Done in 2.187417ms
Logical plan:
Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
TableScan: ?table?
Creating physical plan...
Done in 686.674166ms
Physical plan:
ProjectionExec: expr=[98997497 as count(Int64(1))]
PlaceholderRowExec
Running query...
Got 1 batches in 191.458µs
Query result:
+-----------------+
| count(Int64(1)) |
+-----------------+
| 98997497 |
+-----------------+
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]