adriangb commented on PR #18192: URL: https://github.com/apache/datafusion/pull/18192#issuecomment-3447736026
> Most users do not have this problem to start with. I'd argue that most users _do_ have this problem. Consider a query like: ```sql SELECT * FROM 'file.parquet' WHERE id IN (1, 2, 3, 4, 5...); ``` This PR improves memory usage for this query by avoiding duplicating the `InList` expression in a `FilterExec` and `ParquetSource` when deserializing. Here are flame graphs from `main` and `dedupe-expr` respectively: <img width="6036" height="3008" alt="image" src="https://github.com/user-attachments/assets/a596f941-18e3-464e-b8a9-f3732351fceb" /> <img width="6020" height="3002" alt="image" src="https://github.com/user-attachments/assets/c63d1608-1a7d-467c-b325-d5fe05c1319d" /> Raw data: [pprof.zip](https://github.com/user-attachments/files/23144387/pprof.zip) I generated this by adding the following example to `datafusion-examples`: ```rust use datafusion::{common::Result, prelude::*}; use datafusion_proto::bytes::{physical_plan_from_bytes, physical_plan_to_bytes}; use parquet::{arrow::ArrowWriter, file::properties::WriterProperties}; #[cfg(not(target_env = "msvc"))] #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; #[allow(non_upper_case_globals)] #[export_name = "malloc_conf"] pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:19\0"; #[tokio::main] async fn main() -> Result<()> { let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().unwrap().lock().await; prof_ctl.activate().unwrap(); let _plan = { let ctx = SessionContext::new(); let batches = ctx.sql("SELECT c FROM generate_series(1, 1000000) t(c)").await?.collect().await?; let file = std::fs::File::create("test.parquet")?; let props = WriterProperties::builder() // limit batch sizes so that we have useful statistics .set_max_row_group_size(4096) .build(); let mut writer = ArrowWriter::try_new(file, batches[0].schema(), Some(props))?; for batch in &batches { writer.write(batch)?; } writer.close()?; let mut df = ctx.read_parquet("test.parquet", ParquetReadOptions::default()).await?; df = df.filter(col("c").in_list((1_000..10_000).map(|v| lit(v)).collect(), false))?; let plan = df.create_physical_plan().await?; physical_plan_from_bytes(&physical_plan_to_bytes(plan)?, &ctx.task_ctx())? }; let pprof = prof_ctl.dump_pprof().unwrap(); std::fs::write("proto_memory.pprof", pprof).unwrap(); Ok(()) } ``` Full diff: [diff.txt](https://github.com/user-attachments/files/23144394/diff.txt) It looks like this was able to deduplicate the `InList` expression, taking memory use after deserializing from 5.7MB to 4MB. Not every plan is going to have a 40% memory savings, but I think many plans will have some small amount of memory saving, some will have even larger than 40%. This is also not accounting for the CPU cycles saved by not deserializing the same thing multiple times. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
