geoffreyclaude commented on PR #14918:
URL: https://github.com/apache/datafusion/pull/14918#issuecomment-2694925444

   > I still could not reproduce any improvement with this PR, FWIW. I still 
think it is a good change so i merged it in, but it might be cool to find some 
benchmark results that showed the improvement
   
   @alamb: I've updated your reproducer to do a `COUNT(*)` instead of a 
`LIMIT(10)`. The `COUNT(*)` only needs to read the metadata from the object 
store, as the number of rows is stored in the Parquet metadata, and effectively 
"runs" the query during the physical plan creation.
   
   Without the fix, the physical plan creation takes ~2.5 seconds.
   With the fix, it drops to ~700ms. 
   
   The logical planning and query execution take the same time in both.
   
   <details>
   
   ```rust
   use arrow::util::pretty::pretty_format_batches;
   use datafusion::config::{ParquetOptions, TableParquetOptions};
   use datafusion::datasource::file_format::parquet::ParquetFormat;
   use datafusion::datasource::listing::{
       ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
   };
   use datafusion::execution::object_store::ObjectStoreUrl;
   use datafusion::functions_aggregate::expr_fn::count;
   use datafusion::logical_expr::utils::COUNT_STAR_EXPANSION;
   use datafusion::physical_plan::displayable;
   use datafusion::prelude::{Expr, SessionContext};
   use futures::StreamExt;
   use std::sync::Arc;
   use std::time::Instant;
   
   #[tokio::main]
   async fn main() -> datafusion::error::Result<()> {
       let ctx = SessionContext::new();
       let object_store_url =
           ObjectStoreUrl::parse("https://datasets.clickhouse.com";).unwrap();
       let object_store = object_store::http::HttpBuilder::new()
           .with_url(object_store_url.as_str())
           .build()
           .unwrap();
   
       ctx.register_object_store(object_store_url.as_ref(), 
Arc::new(object_store));
   
       // urls are like
       // 
https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_1.parquet'
       //let base_url = 
ObjectStoreUrl::parse("https://datasets.clickhouse.com";).unwrap();
       let paths: Vec<ListingTableUrl> = (1..100).map(|i| 
format!("https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{i}.parquet";))
           .map(|url| ListingTableUrl::parse(&url).unwrap())
           .collect();
   
       let listing_options =
           
ListingOptions::new(Arc::new(ParquetFormat::new())).with_collect_stat(true);
   
       println!("Creating table / reading statistics....");
       let config = ListingTableConfig::new_with_multi_paths(paths)
           .with_listing_options(listing_options)
           .infer_schema(&ctx.state())
           .await?;
       let start = Instant::now();
       let listing_table = ListingTable::try_new(config).unwrap();
       let df = ctx
           .read_table(Arc::new(listing_table))?
           .aggregate(vec![], 
vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?;
       println!("Done in {:?}", Instant::now() - start);
   
       let logical_plan_str = df.logical_plan().display_indent().to_string();
       println!("Logical plan:\n{logical_plan_str}");
   
       println!("Creating physical plan...");
   
       let start = Instant::now();
       let physical_plan = df.create_physical_plan().await?;
       println!("Done in {:?}", Instant::now() - start);
   
       let physical_plan_str = 
displayable(physical_plan.as_ref()).indent(true).to_string();
       println!("Physical plan:\n{physical_plan_str}");
   
       println!("Running query...");
       let start = Instant::now();
       let mut result_stream = physical_plan.execute(0, ctx.task_ctx())?;
       let mut batches = vec![];
   
       while let Some(record) = result_stream.next().await {
           batches.push(record?);
       }
       println!(
           "Got {} batches in  {:?}",
           batches.len(),
           Instant::now() - start
       );
   
       let response_str = pretty_format_batches(&batches)?.to_string();
       println!("Query result:\n{response_str}");
   
       Ok(())
   }
   ````
   </details>
   
   ### main
   ```text
   Creating table / reading statistics....
   Done in 2.267666ms
   Logical plan:
   Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
     TableScan: ?table?
   Creating physical plan...
   Done in 2.510654875s
   Physical plan:
   ProjectionExec: expr=[98997497 as count(Int64(1))]
     PlaceholderRowExec
   
   Running query...
   Got 1 batches in  215.333µs
   Query result:
   +-----------------+
   | count(Int64(1)) |
   +-----------------+
   | 98997497        |
   +-----------------+
   
   ```
   
   ### fix/concurrent_heads
   ```text
   Creating table / reading statistics....
   Done in 2.187417ms
   Logical plan:
   Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]]
     TableScan: ?table?
   Creating physical plan...
   Done in 686.674166ms
   Physical plan:
   ProjectionExec: expr=[98997497 as count(Int64(1))]
     PlaceholderRowExec
   
   Running query...
   Got 1 batches in  191.458µs
   Query result:
   +-----------------+
   | count(Int64(1)) |
   +-----------------+
   | 98997497        |
   +-----------------+
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to