geoffreyclaude commented on PR #14918: URL: https://github.com/apache/datafusion/pull/14918#issuecomment-2694925444
> I still could not reproduce any improvement with this PR, FWIW. I still think it is a good change so i merged it in, but it might be cool to find some benchmark results that showed the improvement @alamb: I've updated your reproducer to do a `COUNT(*)` instead of a `LIMIT(10)`. The `COUNT(*)` only needs to read the metadata from the object store, as the number of rows is stored in the Parquet metadata, and effectively "runs" the query during the physical plan creation. Without the fix, the physical plan creation takes ~2.5 seconds. With the fix, it drops to ~700ms. The logical planning and query execution take the same time in both. <details> ```rust use arrow::util::pretty::pretty_format_batches; use datafusion::config::{ParquetOptions, TableParquetOptions}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::functions_aggregate::expr_fn::count; use datafusion::logical_expr::utils::COUNT_STAR_EXPANSION; use datafusion::physical_plan::displayable; use datafusion::prelude::{Expr, SessionContext}; use futures::StreamExt; use std::sync::Arc; use std::time::Instant; #[tokio::main] async fn main() -> datafusion::error::Result<()> { let ctx = SessionContext::new(); let object_store_url = ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap(); let object_store = object_store::http::HttpBuilder::new() .with_url(object_store_url.as_str()) .build() .unwrap(); ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store)); // urls are like // https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet' //let base_url = ObjectStoreUrl::parse("https://datasets.clickhouse.com").unwrap(); let paths: Vec<ListingTableUrl> = (1..100).map(|i| format!("https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{i}.parquet")) .map(|url| ListingTableUrl::parse(&url).unwrap()) .collect(); let listing_options = ListingOptions::new(Arc::new(ParquetFormat::new())).with_collect_stat(true); println!("Creating table / reading statistics...."); let config = ListingTableConfig::new_with_multi_paths(paths) .with_listing_options(listing_options) .infer_schema(&ctx.state()) .await?; let start = Instant::now(); let listing_table = ListingTable::try_new(config).unwrap(); let df = ctx .read_table(Arc::new(listing_table))? .aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?; println!("Done in {:?}", Instant::now() - start); let logical_plan_str = df.logical_plan().display_indent().to_string(); println!("Logical plan:\n{logical_plan_str}"); println!("Creating physical plan..."); let start = Instant::now(); let physical_plan = df.create_physical_plan().await?; println!("Done in {:?}", Instant::now() - start); let physical_plan_str = displayable(physical_plan.as_ref()).indent(true).to_string(); println!("Physical plan:\n{physical_plan_str}"); println!("Running query..."); let start = Instant::now(); let mut result_stream = physical_plan.execute(0, ctx.task_ctx())?; let mut batches = vec![]; while let Some(record) = result_stream.next().await { batches.push(record?); } println!( "Got {} batches in {:?}", batches.len(), Instant::now() - start ); let response_str = pretty_format_batches(&batches)?.to_string(); println!("Query result:\n{response_str}"); Ok(()) } ```` </details> ### main ```text Creating table / reading statistics.... Done in 2.267666ms Logical plan: Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] TableScan: ?table? Creating physical plan... Done in 2.510654875s Physical plan: ProjectionExec: expr=[98997497 as count(Int64(1))] PlaceholderRowExec Running query... Got 1 batches in 215.333µs Query result: +-----------------+ | count(Int64(1)) | +-----------------+ | 98997497 | +-----------------+ ``` ### fix/concurrent_heads ```text Creating table / reading statistics.... Done in 2.187417ms Logical plan: Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] TableScan: ?table? Creating physical plan... Done in 686.674166ms Physical plan: ProjectionExec: expr=[98997497 as count(Int64(1))] PlaceholderRowExec Running query... Got 1 batches in 191.458µs Query result: +-----------------+ | count(Int64(1)) | +-----------------+ | 98997497 | +-----------------+ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org