2010YOUY01 commented on code in PR #14975: URL: https://github.com/apache/datafusion/pull/14975#discussion_r1977146593
########## datafusion/physical-plan/src/spill.rs: ########## @@ -54,41 +52,13 @@ pub(crate) fn read_spill_as_stream( Ok(builder.build()) } -/// Spills in-memory `batches` to disk. -/// -/// Returns total number of the rows spilled to disk. -pub(crate) fn spill_record_batches( - batches: &[RecordBatch], - path: PathBuf, - schema: SchemaRef, -) -> Result<(usize, usize)> { - let mut writer = IPCStreamWriter::new(path.as_ref(), schema.as_ref())?; - for batch in batches { - writer.write(batch)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes), - ); - Ok((writer.num_rows, writer.num_bytes)) -} - -fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(path)?); - let reader = StreamReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch.map_err(Into::into)) - .map_err(|e| exec_datafusion_err!("{e}"))?; - } - Ok(()) -} - /// Spill the `RecordBatch` to disk as smaller batches -/// split by `batch_size_rows` +/// split by `batch_size_rows`. +#[deprecated( + since = "46.0.0", + note = "This function is deprecated. Use `datafusion_execution::DiskManager::try_spill_record_batch_by_size` instead. Note this method is mainly used within DataFusion for spilling operators. If you only + want the functionality of writing `RecordBatch`es to disk, consider using `arrow::ipc::writer::StreamWriter` instead." +)] Review Comment: This function doesn't have to be public, see `spill_record_batches()` is also non-public. I believe this utility function is used mainly for tracking temp files and metrics for DataFusion executor's internal use, the major functionality for writing `RecordBatch`es to disk is in `arrow::ipc`, so I guess deprecating it won't cause major issues. ########## datafusion/core/tests/memory_limit/mod.rs: ########## @@ -468,6 +470,83 @@ async fn test_stringview_external_sort() { let _ = df.collect().await.expect("Query execution failed"); } +// Tests for disk limit (`max_temp_directory_size` in `DiskManager`) +// ------------------------------------------------------------------ + +// Create a new `SessionContext` with speicified disk limit and memory pool limit +async fn setup_context( + disk_limit: u64, + memory_pool_limit: usize, +) -> Result<SessionContext> { + let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)? + .with_max_temp_directory_size(disk_limit)?; + + let runtime = RuntimeEnvBuilder::new() Review Comment: We can't use the builder to specify disk limit for now, because `DiskManagerConfig` is an `enum` instead of `struct`, so now the setup routine is a bit hacky. Changing it I think will inevitably cause API change, so I prefer to leave it to a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org