This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 4ea183fa feat(rust/sedona): Auto-configure spilled batch in-memory
size threshold based on global memory limit (#680)
4ea183fa is described below
commit 4ea183faa01389f52db650ef2dbeb1934fab005e
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 4 23:33:54 2026 +0800
feat(rust/sedona): Auto-configure spilled batch in-memory size threshold
based on global memory limit (#680)
---
rust/sedona/src/context.rs | 48 +++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 47 insertions(+), 1 deletion(-)
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 43a84393..ee70d598 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -28,7 +28,6 @@ use crate::{
};
use arrow_array::RecordBatch;
use async_trait::async_trait;
-use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::format_as_file_type;
use datafusion::{
common::plan_err,
@@ -41,6 +40,7 @@ use datafusion::{
prelude::{DataFrame, SessionConfig, SessionContext},
sql::parser::{DFParser, Statement},
};
+use datafusion::{dataframe::DataFrameWriteOptions,
execution::memory_pool::MemoryLimit};
use datafusion_common::not_impl_err;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect};
@@ -104,6 +104,7 @@ impl SedonaContext {
// variables.
let session_config =
SessionConfig::from_env()?.with_information_schema(true);
let mut session_config = add_sedona_option_extension(session_config);
+ let target_partitions = session_config.target_partitions();
// Always register the PROJ CrsProvider by default (if PROJ is not
configured
// before it is used an error will be raised).
@@ -115,6 +116,19 @@ impl SedonaContext {
opts.crs_provider =
CrsProviderOption::new(Arc::new(sedona_proj::provider::ProjCrsProvider::default()));
+ // Set the spilled batch in-memory size threshold to 5% of the
per-partition memory limit,
+ // with a minimum of 10MB. Batches larger than this threshold will be
broken into smaller batches
+ // before writing to spill files. This is to avoid overshooting memory
limit when reading super
+ // large spilled batches.
+ const SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR: usize = 20; // 5% == 1
/ 20
+ const MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES: usize = 10 * 1024 *
1024; // 10MB
+ if let MemoryLimit::Finite(memory_limit) =
runtime_env.memory_pool.memory_limit() {
+ let per_partition_memory_limit =
memory_limit.div_ceil(target_partitions);
+ opts.spatial_join.spilled_batch_in_memory_size_threshold =
per_partition_memory_limit
+ .div_ceil(SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR)
+ .max(MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES);
+ }
+
#[cfg(feature = "pointcloud")]
let session_config = session_config.with_option_extension(
LasOptions::default()
@@ -835,4 +849,36 @@ mod tests {
SedonaType::WkbView(Edges::Planar,
deserialize_crs("EPSG:3857").unwrap())
);
}
+
+ #[tokio::test]
+ async fn test_auto_configure_spilled_batch_threshold() {
+ use crate::context_builder::SedonaContextBuilder;
+ use sedona_common::option::SedonaOptions;
+
+ // Specify a memory limit (10GB), spilled batch threshold will also be
limited,
+ // but no lower than 10MB due to the minimum floor.
+ let memory_limit: usize = 10 * 1024 * 1024 * 1024;
+ let ctx = SedonaContextBuilder::new()
+ .with_memory_limit(memory_limit)
+ .build()
+ .await
+ .unwrap();
+ let state = ctx.ctx.state();
+ let opts = state
+ .config_options()
+ .extensions
+ .get::<SedonaOptions>()
+ .expect("SedonaOptions not found");
+ assert!(opts.spatial_join.spilled_batch_in_memory_size_threshold >= 10
* 1024 * 1024);
+
+ // Specify no memory limit, spilled batch threshold should be
unlimited (0 is for unlimited)
+ let ctx = SedonaContextBuilder::new().build().await.unwrap();
+ let state = ctx.ctx.state();
+ let opts = state
+ .config_options()
+ .extensions
+ .get::<SedonaOptions>()
+ .expect("SedonaOptions not found");
+ assert_eq!(opts.spatial_join.spilled_batch_in_memory_size_threshold,
0);
+ }
}