This is an automated email from the ASF dual-hosted git repository. paleolimbot pushed a commit to branch branch-0.3.0 in repository https://gitbox.apache.org/repos/asf/sedona-db.git
commit 5e89955762afc03d0f8ad79f72bc3bca6a17b6f9 Author: Kristin Cowalcijk <[email protected]> AuthorDate: Wed Mar 4 23:33:54 2026 +0800 feat(rust/sedona): Auto-configure spilled batch in-memory size threshold based on global memory limit (#680) --- rust/sedona/src/context.rs | 48 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 1664173a..1cfabd3a 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -28,7 +28,6 @@ use crate::{ }; use arrow_array::RecordBatch; use async_trait::async_trait; -use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::file_format::format_as_file_type; use datafusion::{ common::plan_err, @@ -41,6 +40,7 @@ use datafusion::{ prelude::{DataFrame, SessionConfig, SessionContext}, sql::parser::{DFParser, Statement}, }; +use datafusion::{dataframe::DataFrameWriteOptions, execution::memory_pool::MemoryLimit}; use datafusion_common::not_impl_err; use datafusion_expr::dml::InsertOp; use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect}; @@ -107,6 +107,7 @@ impl SedonaContext { // variables. let session_config = SessionConfig::from_env()?.with_information_schema(true); let mut session_config = add_sedona_option_extension(session_config); + let target_partitions = session_config.target_partitions(); // Always register the PROJ CrsProvider by default (if PROJ is not configured // before it is used an error will be raised). @@ -118,6 +119,19 @@ impl SedonaContext { opts.crs_provider = CrsProviderOption::new(Arc::new(sedona_proj::provider::ProjCrsProvider::default())); + // Set the spilled batch in-memory size threshold to 5% of the per-partition memory limit, + // with a minimum of 10MB. Batches larger than this threshold will be broken into smaller batches + // before writing to spill files. This is to avoid overshooting memory limit when reading super + // large spilled batches. + const SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR: usize = 20; // 5% == 1 / 20 + const MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES: usize = 10 * 1024 * 1024; // 10MB + if let MemoryLimit::Finite(memory_limit) = runtime_env.memory_pool.memory_limit() { + let per_partition_memory_limit = memory_limit.div_ceil(target_partitions); + opts.spatial_join.spilled_batch_in_memory_size_threshold = per_partition_memory_limit + .div_ceil(SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR) + .max(MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES); + } + #[cfg(feature = "pointcloud")] let session_config = session_config.with_option_extension( PointcloudOptions::default() @@ -838,4 +852,36 @@ mod tests { SedonaType::WkbView(Edges::Planar, deserialize_crs("EPSG:3857").unwrap()) ); } + + #[tokio::test] + async fn test_auto_configure_spilled_batch_threshold() { + use crate::context_builder::SedonaContextBuilder; + use sedona_common::option::SedonaOptions; + + // Specify a memory limit (10GB), spilled batch threshold will also be limited, + // but no lower than 10MB due to the minimum floor. + let memory_limit: usize = 10 * 1024 * 1024 * 1024; + let ctx = SedonaContextBuilder::new() + .with_memory_limit(memory_limit) + .build() + .await + .unwrap(); + let state = ctx.ctx.state(); + let opts = state + .config_options() + .extensions + .get::<SedonaOptions>() + .expect("SedonaOptions not found"); + assert!(opts.spatial_join.spilled_batch_in_memory_size_threshold >= 10 * 1024 * 1024); + + // Specify no memory limit, spilled batch threshold should be unlimited (0 is for unlimited) + let ctx = SedonaContextBuilder::new().build().await.unwrap(); + let state = ctx.ctx.state(); + let opts = state + .config_options() + .extensions + .get::<SedonaOptions>() + .expect("SedonaOptions not found"); + assert_eq!(opts.spatial_join.spilled_batch_in_memory_size_threshold, 0); + } }
