This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ea183fa feat(rust/sedona): Auto-configure spilled batch in-memory 
size threshold based on global memory limit (#680)
4ea183fa is described below

commit 4ea183faa01389f52db650ef2dbeb1934fab005e
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Mar 4 23:33:54 2026 +0800

    feat(rust/sedona): Auto-configure spilled batch in-memory size threshold 
based on global memory limit (#680)
---
 rust/sedona/src/context.rs | 48 +++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)

diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 43a84393..ee70d598 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -28,7 +28,6 @@ use crate::{
 };
 use arrow_array::RecordBatch;
 use async_trait::async_trait;
-use datafusion::dataframe::DataFrameWriteOptions;
 use datafusion::datasource::file_format::format_as_file_type;
 use datafusion::{
     common::plan_err,
@@ -41,6 +40,7 @@ use datafusion::{
     prelude::{DataFrame, SessionConfig, SessionContext},
     sql::parser::{DFParser, Statement},
 };
+use datafusion::{dataframe::DataFrameWriteOptions, 
execution::memory_pool::MemoryLimit};
 use datafusion_common::not_impl_err;
 use datafusion_expr::dml::InsertOp;
 use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect};
@@ -104,6 +104,7 @@ impl SedonaContext {
         // variables.
         let session_config = 
SessionConfig::from_env()?.with_information_schema(true);
         let mut session_config = add_sedona_option_extension(session_config);
+        let target_partitions = session_config.target_partitions();
 
         // Always register the PROJ CrsProvider by default (if PROJ is not 
configured
         // before it is used an error will be raised).
@@ -115,6 +116,19 @@ impl SedonaContext {
         opts.crs_provider =
             
CrsProviderOption::new(Arc::new(sedona_proj::provider::ProjCrsProvider::default()));
 
+        // Set the spilled batch in-memory size threshold to 5% of the 
per-partition memory limit,
+        // with a minimum of 10MB. Batches larger than this threshold will be 
broken into smaller batches
+        // before writing to spill files. This is to avoid overshooting memory 
limit when reading super
+        // large spilled batches.
+        const SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR: usize = 20; // 5% == 1 
/ 20
+        const MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES: usize = 10 * 1024 * 
1024; // 10MB
+        if let MemoryLimit::Finite(memory_limit) = 
runtime_env.memory_pool.memory_limit() {
+            let per_partition_memory_limit = 
memory_limit.div_ceil(target_partitions);
+            opts.spatial_join.spilled_batch_in_memory_size_threshold = 
per_partition_memory_limit
+                .div_ceil(SPILLED_BATCH_THRESHOLD_PERCENT_DIVISOR)
+                .max(MIN_SPILLED_BATCH_IN_MEMORY_THRESHOLD_BYTES);
+        }
+
         #[cfg(feature = "pointcloud")]
         let session_config = session_config.with_option_extension(
             LasOptions::default()
@@ -835,4 +849,36 @@ mod tests {
             SedonaType::WkbView(Edges::Planar, 
deserialize_crs("EPSG:3857").unwrap())
         );
     }
+
+    #[tokio::test]
+    async fn test_auto_configure_spilled_batch_threshold() {
+        use crate::context_builder::SedonaContextBuilder;
+        use sedona_common::option::SedonaOptions;
+
+        // Specify a memory limit (10GB), spilled batch threshold will also be 
limited,
+        // but no lower than 10MB due to the minimum floor.
+        let memory_limit: usize = 10 * 1024 * 1024 * 1024;
+        let ctx = SedonaContextBuilder::new()
+            .with_memory_limit(memory_limit)
+            .build()
+            .await
+            .unwrap();
+        let state = ctx.ctx.state();
+        let opts = state
+            .config_options()
+            .extensions
+            .get::<SedonaOptions>()
+            .expect("SedonaOptions not found");
+        assert!(opts.spatial_join.spilled_batch_in_memory_size_threshold >= 10 
* 1024 * 1024);
+
+        // Specify no memory limit, spilled batch threshold should be 
unlimited (0 is for unlimited)
+        let ctx = SedonaContextBuilder::new().build().await.unwrap();
+        let state = ctx.ctx.state();
+        let opts = state
+            .config_options()
+            .extensions
+            .get::<SedonaOptions>()
+            .expect("SedonaOptions not found");
+        assert_eq!(opts.spatial_join.spilled_batch_in_memory_size_threshold, 
0);
+    }
 }

Reply via email to