This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new a64e3a61 feat(python/sedonadb): Expose memory pool and runtime 
configuration in Python bindings (#608)
a64e3a61 is described below

commit a64e3a61205de23c9ae32371696a10d692a16ab5
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Feb 18 22:45:33 2026 +0800

    feat(python/sedonadb): Expose memory pool and runtime configuration in 
Python bindings (#608)
    
    Co-authored-by: Dewey Dunnington <[email protected]>
---
 Cargo.lock                                  |   1 +
 python/sedonadb/python/sedonadb/_options.py | 178 ++++++++++++-
 python/sedonadb/python/sedonadb/context.py  |  54 +++-
 python/sedonadb/python/sedonadb/utility.py  |   2 +-
 python/sedonadb/src/context.rs              |   9 +-
 rust/sedona-geo-generic-alg/Cargo.toml      |   2 +-
 rust/sedona/Cargo.toml                      |   1 +
 rust/sedona/src/context_builder.rs          | 399 ++++++++++++++++++++++++++++
 rust/sedona/src/lib.rs                      |   2 +
 rust/sedona/src/size_parser.rs              | 214 +++++++++++++++
 sedona-cli/src/main.rs                      | 166 +-----------
 11 files changed, 862 insertions(+), 166 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e01ec7c0..c4fa1b95 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5117,6 +5117,7 @@ dependencies = [
  "geo-types",
  "object_store",
  "parking_lot",
+ "regex",
  "rstest",
  "sedona-common",
  "sedona-datasource",
diff --git a/python/sedonadb/python/sedonadb/_options.py 
b/python/sedonadb/python/sedonadb/_options.py
index b7ac40a0..5da51077 100644
--- a/python/sedonadb/python/sedonadb/_options.py
+++ b/python/sedonadb/python/sedonadb/_options.py
@@ -14,16 +14,82 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from typing import Optional
+import os
+from typing import Literal, Optional, Union
+
+from sedonadb.utility import sedona  # noqa: F401
 
 
 class Options:
-    """Global SedonaDB options"""
+    """Global SedonaDB options
+
+    Options are divided into two categories:
+
+    **Display options** can be changed at any time and affect how results are
+    presented:
+
+    - `interactive`: Enable/disable auto-display of DataFrames.
+    - `width`: Override the detected terminal width.
+
+    **Runtime options** configure the execution environment and must be set
+    *before* the first query is executed (i.e., before the internal context
+    is initialized). Attempting to change these after the context has been
+    created will raise a `RuntimeError`:
+
+    - `memory_limit`: Maximum memory for execution, in bytes or as a
+      human-readable string (e.g., `"4gb"`, `"512m"`).
+    - `temp_dir`: Directory for temporary/spill files.
+    - `memory_pool_type`: Memory pool type (`"greedy"` or `"fair"`).
+    - `unspillable_reserve_ratio`: Fraction of memory reserved for
+      unspillable consumers (only applies to the `"fair"` pool type).
+
+    Examples:
+
+        >>> sd = sedona.db.connect()
+        >>> sd.options.memory_limit = "4gb"
+        >>> sd.options.memory_pool_type = "fair"
+        >>> sd.options.temp_dir = "/tmp/sedona-spill"
+        >>> sd.options.interactive = True
+        >>> sd.sql("SELECT 1 as one")
+        ┌───────┐
+        │  one  │
+        │ int64 │
+        ╞═══════╡
+        │     1 │
+        └───────┘
+    """
 
     def __init__(self):
+        # Display options (can be changed at any time)
         self._interactive = False
         self._width = None
 
+        # Runtime options (must be set before first query)
+        self._memory_limit = None
+        self._temp_dir = None
+        self._memory_pool_type = "greedy"
+        self._unspillable_reserve_ratio = None
+
+        # Set to True once the internal context is created; after this,
+        # runtime options become read-only.
+        self._runtime_frozen = False
+
+    def freeze_runtime(self) -> None:
+        """Mark runtime options as read-only.
+
+        Called after the internal context has been successfully created.
+        """
+        self._runtime_frozen = True
+
+    def _check_runtime_mutable(self, name: str) -> None:
+        if self._runtime_frozen:
+            raise RuntimeError(
+                f"Cannot change '{name}' after the context has been 
initialized. "
+                f"Set this option before executing your first query."
+            )
+
+    # --- Display options ---
+
     @property
     def interactive(self) -> bool:
         """Use interactive mode
@@ -52,3 +118,111 @@ class Options:
     @width.setter
     def width(self, value: Optional[int]):
         self._width = value
+
+    # --- Runtime options (must be set before first query) ---
+
+    @property
+    def memory_limit(self) -> Union[int, str, None]:
+        """Maximum memory for query execution.
+
+        Accepts an integer (bytes) or a human-readable string such as
+        `"4gb"`, `"512m"`, or `"1.5g"`. When set, a bounded memory pool is
+        created to enforce this limit. Without a memory limit, DataFusion's
+        default unbounded pool is used.
+
+        Must be set before the first query is executed.
+
+        Examples:
+
+            >>> sd = sedona.db.connect()
+            >>> sd.options.memory_limit = "4gb"
+            >>> sd.options.memory_limit = 4 * 1024 * 1024 * 1024  # equivalent
+        """
+        return self._memory_limit
+
+    @memory_limit.setter
+    def memory_limit(self, value: Union[int, str, None]) -> None:
+        self._check_runtime_mutable("memory_limit")
+        if value is not None and not isinstance(value, (int, str)):
+            raise TypeError(
+                f"memory_limit must be an int, str, or None, got 
{type(value).__name__}"
+            )
+        self._memory_limit = value
+
+    @property
+    def temp_dir(self) -> Optional[str]:
+        """Directory for temporary/spill files.
+
+        When set, disk-based spilling will use this directory. When `None`,
+        DataFusion's default temporary directory is used.
+
+        Must be set before the first query is executed.
+        """
+        return self._temp_dir
+
+    @temp_dir.setter
+    def temp_dir(self, value: "Optional[Union[str, os.PathLike[str]]]") -> 
None:
+        self._check_runtime_mutable("temp_dir")
+        if value is None:
+            self._temp_dir = None
+        elif isinstance(value, os.PathLike):
+            self._temp_dir = os.fspath(value)
+        elif isinstance(value, str):
+            self._temp_dir = value
+        else:
+            raise TypeError(
+                f"temp_dir must be a str, PathLike, or None, got 
{type(value).__name__}"
+            )
+
+    @property
+    def memory_pool_type(self) -> str:
+        """Memory pool type: `"greedy"` or `"fair"`.
+
+        - `"greedy"`: A simple pool that grants reservations on a
+          first-come-first-served basis. This is the default.
+        - `"fair"`: A pool that fairly distributes memory among spillable
+          consumers and reserves a fraction for unspillable consumers
+          (configured via `unspillable_reserve_ratio`).
+
+        Only takes effect when `memory_limit` is set.
+        Must be set before the first query is executed.
+        """
+        return self._memory_pool_type
+
+    @memory_pool_type.setter
+    def memory_pool_type(self, value: Literal["greedy", "fair"]) -> None:
+        self._check_runtime_mutable("memory_pool_type")
+        if value not in ("greedy", "fair"):
+            raise ValueError(
+                f"memory_pool_type must be 'greedy' or 'fair', got '{value}'"
+            )
+        self._memory_pool_type = value
+
+    @property
+    def unspillable_reserve_ratio(self) -> Optional[float]:
+        """Fraction of memory reserved for unspillable consumers (0.0 - 1.0).
+
+        Only applies when `memory_pool_type` is `"fair"` and
+        `memory_limit` is set. Defaults to 0.2 when not explicitly set.
+
+        Must be set before the first query is executed.
+        """
+        return self._unspillable_reserve_ratio
+
+    @unspillable_reserve_ratio.setter
+    def unspillable_reserve_ratio(self, value: Optional[float]) -> None:
+        self._check_runtime_mutable("unspillable_reserve_ratio")
+        if value is None:
+            self._unspillable_reserve_ratio = None
+            return
+        if not isinstance(value, (int, float)):
+            raise TypeError(
+                "unspillable_reserve_ratio must be a number between 0.0 and 
1.0 "
+                f"or None, got {type(value).__name__}"
+            )
+        value = float(value)
+        if not (0.0 <= value <= 1.0):
+            raise ValueError(
+                f"unspillable_reserve_ratio must be between 0.0 and 1.0, got 
{value}"
+            )
+        self._unspillable_reserve_ratio = value
diff --git a/python/sedonadb/python/sedonadb/context.py 
b/python/sedonadb/python/sedonadb/context.py
index 1dcb3fad..5ad0a5f5 100644
--- a/python/sedonadb/python/sedonadb/context.py
+++ b/python/sedonadb/python/sedonadb/context.py
@@ -36,6 +36,11 @@ class SedonaContext:
     registered tables, and available memory. This is similar to a
     Spark SessionContext or a database connection.
 
+    Runtime configuration (memory limits, spill directory, pool type) can
+    be set via `options` before executing the first query.  Once the
+    first query runs, the internal execution context is created and
+    runtime options become read-only.
+
     Examples:
 
         >>> sd = sedona.db.connect()
@@ -47,12 +52,47 @@ class SedonaContext:
         ╞═══════╡
         │     1 │
         └───────┘
+
+    Configuring memory limits:
+
+        >>> sd = sedona.db.connect()
+        >>> sd.options.memory_limit = "4gb"
+        >>> sd.options.memory_pool_type = "fair"
     """
 
     def __init__(self):
-        self._impl = InternalContext()
+        self.__impl = None
         self.options = Options()
 
+    @property
+    def _impl(self):
+        """Lazily initialize the internal Rust context on first use.
+
+        This allows runtime options (memory_limit, temp_dir, etc.) to be
+        configured via `self.options` before the context is created.
+        Once created, runtime options are frozen.
+        """
+        if self.__impl is None:
+            # Build a dict[str, str] of non-None runtime options
+            opts = {}
+            if self.options.memory_limit is not None:
+                opts["memory_limit"] = str(self.options.memory_limit)
+            if self.options.temp_dir is not None:
+                opts["temp_dir"] = self.options.temp_dir
+            if self.options.memory_pool_type is not None:
+                opts["memory_pool_type"] = self.options.memory_pool_type
+            if self.options.unspillable_reserve_ratio is not None:
+                opts["unspillable_reserve_ratio"] = str(
+                    self.options.unspillable_reserve_ratio
+                )
+
+            # Create the context first, then freeze options. If creation
+            # fails the user can still correct options and retry.
+            impl = InternalContext(opts)
+            self.__impl = impl
+            self.options.freeze_runtime()
+        return self.__impl
+
     def create_data_frame(self, obj: Any, schema: Any = None) -> DataFrame:
         """Create a DataFrame from an in-memory or protocol-enabled object.
 
@@ -381,7 +421,17 @@ class SedonaContext:
 
 
 def connect() -> SedonaContext:
-    """Create a new [SedonaContext][sedonadb.context.SedonaContext]"""
+    """Create a new [SedonaContext][sedonadb.context.SedonaContext]
+
+    Runtime configuration (memory limits, spill directory, pool type)
+    can be set via `options` on the returned context before executing
+    the first query::
+
+        sd = sedona.db.connect()
+        sd.options.memory_limit = "4gb"
+        sd.options.memory_pool_type = "fair"
+        sd.options.temp_dir = "/tmp/sedona-spill"
+    """
     return SedonaContext()
 
 
diff --git a/python/sedonadb/python/sedonadb/utility.py 
b/python/sedonadb/python/sedonadb/utility.py
index 38f10a03..8c82510f 100644
--- a/python/sedonadb/python/sedonadb/utility.py
+++ b/python/sedonadb/python/sedonadb/utility.py
@@ -19,7 +19,7 @@
 class Sedona:
     """Mock sedona Python module
 
-    The Apache Sedaona Python ecosystem is centered around the apache-sedona 
Python
+    The Apache Sedona Python ecosystem is centered around the apache-sedona 
Python
     package which provides the `sedona` modules. To decouple the maintenance 
and
     provide fine-grained dependency control for projects that need it, sedonadb
     is distributed as a standalone package. This mock `sedona` module lets us 
write
diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs
index 1647bd05..113df3b7 100644
--- a/python/sedonadb/src/context.rs
+++ b/python/sedonadb/src/context.rs
@@ -19,6 +19,7 @@ use std::{collections::HashMap, sync::Arc};
 use datafusion_expr::ScalarUDFImpl;
 use pyo3::prelude::*;
 use sedona::context::SedonaContext;
+use sedona::context_builder::SedonaContextBuilder;
 use tokio::runtime::Runtime;
 
 use crate::{
@@ -39,7 +40,8 @@ pub struct InternalContext {
 #[pymethods]
 impl InternalContext {
     #[new]
-    fn new(py: Python) -> Result<Self, PySedonaError> {
+    #[pyo3(signature = (options=HashMap::new()))]
+    fn new(py: Python, options: HashMap<String, String>) -> Result<Self, 
PySedonaError> {
         let runtime = tokio::runtime::Builder::new_multi_thread()
             .enable_all()
             .build()
@@ -47,7 +49,10 @@ impl InternalContext {
                 PySedonaError::SedonaPython(format!("Failed to build 
multithreaded runtime: {e}"))
             })?;
 
-        let inner = wait_for_future(py, &runtime, 
SedonaContext::new_local_interactive())??;
+        let builder = SedonaContextBuilder::from_options(&options)
+            .map_err(|e| PySedonaError::SedonaPython(e.to_string()))?;
+
+        let inner = wait_for_future(py, &runtime, builder.build())??;
 
         Ok(Self {
             inner,
diff --git a/rust/sedona-geo-generic-alg/Cargo.toml 
b/rust/sedona-geo-generic-alg/Cargo.toml
index add91a43..178bb888 100644
--- a/rust/sedona-geo-generic-alg/Cargo.toml
+++ b/rust/sedona-geo-generic-alg/Cargo.toml
@@ -32,7 +32,7 @@ float_next_after = { workspace = true }
 geo-traits = { workspace = true }
 geo-types = { workspace = true, features = ["approx", "use-rstar_0_12"] }
 sedona-geo-traits-ext = { workspace = true }
-log = "0.4.11"
+log = { workspace = true }
 num-traits = { workspace = true }
 robust = "1.1.0"
 rstar = "0.12.0"
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index 199a783e..eefc5b93 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -66,6 +66,7 @@ geo-traits = { workspace = true }
 geo-types = { workspace = true }
 object_store = { workspace = true }
 parking_lot = { workspace = true }
+regex = { workspace = true }
 sedona-common = { workspace = true }
 sedona-datasource = { workspace = true }
 sedona-expr = { workspace = true }
diff --git a/rust/sedona/src/context_builder.rs 
b/rust/sedona/src/context_builder.rs
new file mode 100644
index 00000000..ad6b79c8
--- /dev/null
+++ b/rust/sedona/src/context_builder.rs
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, sync::Arc};
+
+use datafusion::{
+    error::{DataFusionError, Result},
+    execution::{
+        disk_manager::{DiskManagerBuilder, DiskManagerMode},
+        memory_pool::{GreedyMemoryPool, MemoryPool, TrackConsumersPool},
+        runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+    },
+};
+
+use crate::{
+    context::SedonaContext,
+    memory_pool::{SedonaFairSpillPool, DEFAULT_UNSPILLABLE_RESERVE_RATIO},
+    pool_type::PoolType,
+    size_parser,
+};
+
+/// Builder for constructing a [`SedonaContext`] with configurable runtime
+/// environment settings.
+///
+/// This builder centralizes the construction of memory pools, disk managers,
+/// and runtime environments so that the same logic can be reused across the
+/// CLI, Python bindings, ADBC driver, and any future entry points.
+///
+/// # Examples
+///
+/// ```rust,no_run
+/// # async fn example() -> datafusion::error::Result<()> {
+/// use sedona::context_builder::SedonaContextBuilder;
+/// use sedona::pool_type::PoolType;
+///
+/// let ctx = SedonaContextBuilder::new()
+///     .with_memory_limit(4 * 1024 * 1024 * 1024)
+///     .with_pool_type(PoolType::Fair)
+///     .with_temp_dir("/tmp/sedona-spill".to_string())
+///     .build()
+///     .await?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// String-based configuration (useful for ADBC connection options, etc.):
+///
+/// ```rust,no_run
+/// # async fn example() -> datafusion::error::Result<()> {
+/// use std::collections::HashMap;
+/// use sedona::context_builder::SedonaContextBuilder;
+///
+/// let mut opts = HashMap::new();
+/// opts.insert("memory_limit".to_string(), "4gb".to_string());
+/// opts.insert("memory_pool_type".to_string(), "fair".to_string());
+///
+/// let ctx = SedonaContextBuilder::from_options(&opts)?.build().await?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct SedonaContextBuilder {
+    memory_limit: Option<usize>,
+    temp_dir: Option<String>,
+    pool_type: PoolType,
+    unspillable_reserve_ratio: f64,
+}
+
+impl Default for SedonaContextBuilder {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SedonaContextBuilder {
+    /// Create a new builder with default settings.
+    ///
+    /// Defaults:
+    /// - `memory_limit`: `None` (no limit, uses DataFusion's default 
unbounded pool)
+    /// - `pool_type`: `PoolType::Greedy`
+    /// - `unspillable_reserve_ratio`: `0.2`
+    /// - `temp_dir`: `None` (uses DataFusion's default temp directory)
+    pub fn new() -> Self {
+        Self {
+            memory_limit: None,
+            temp_dir: None,
+            pool_type: PoolType::Greedy,
+            unspillable_reserve_ratio: DEFAULT_UNSPILLABLE_RESERVE_RATIO,
+        }
+    }
+
+    /// Create a builder from string-based key-value options.
+    ///
+    /// Recognized keys:
+    /// - `"memory_limit"`: Memory limit as a human-readable size string
+    ///   (e.g., `"4gb"`, `"512m"`, `"1.5g"`) or plain bytes (e.g.,
+    ///   `"4294967296"`). See [`size_parser::parse_size_string`] for
+    ///   supported suffixes.
+    /// - `"temp_dir"`: Path for temporary/spill files
+    /// - `"memory_pool_type"`: `"greedy"` or `"fair"`
+    /// - `"unspillable_reserve_ratio"`: Float between 0.0 and 1.0
+    ///
+    /// Unrecognized keys are ignored.
+    pub fn from_options(options: &HashMap<String, String>) -> Result<Self> {
+        let mut builder = Self::new();
+
+        if let Some(memory_limit) = options.get("memory_limit") {
+            let limit = size_parser::parse_size_string(memory_limit)?;
+            builder = builder.with_memory_limit(limit);
+        }
+
+        if let Some(temp_dir) = options.get("temp_dir") {
+            builder = builder.with_temp_dir(temp_dir.clone());
+        }
+
+        if let Some(pool_type) = options.get("memory_pool_type") {
+            let pt: PoolType = pool_type
+                .parse()
+                .map_err(|e: String| DataFusionError::Configuration(e))?;
+            builder = builder.with_pool_type(pt);
+        }
+
+        if let Some(ratio) = options.get("unspillable_reserve_ratio") {
+            let r: f64 = ratio.parse().map_err(|_| {
+                DataFusionError::Configuration(format!(
+                    "Invalid unspillable_reserve_ratio value '{ratio}': 
expected a float"
+                ))
+            })?;
+            builder = builder.with_unspillable_reserve_ratio(r)?;
+        }
+
+        Ok(builder)
+    }
+
+    /// Set the memory limit in bytes.
+    ///
+    /// When set, a memory pool is created to enforce this limit. Without a
+    /// memory limit, DataFusion's default unbounded memory pool is used.
+    pub fn with_memory_limit(mut self, memory_limit: usize) -> Self {
+        self.memory_limit = Some(memory_limit);
+        self
+    }
+
+    /// Set the directory for temporary/spill files.
+    pub fn with_temp_dir(mut self, temp_dir: String) -> Self {
+        self.temp_dir = Some(temp_dir);
+        self
+    }
+
+    /// Set the memory pool type.
+    ///
+    /// - `PoolType::Greedy`: A simple pool that grants reservations on a
+    ///   first-come-first-served basis. This is the default.
+    /// - `PoolType::Fair`: A pool that fairly distributes memory among
+    ///   spillable consumers and reserves a fraction of memory for
+    ///   unspillable consumers (configured via
+    ///   
[`with_unspillable_reserve_ratio`](Self::with_unspillable_reserve_ratio)).
+    ///
+    /// Only takes effect when `memory_limit` is set.
+    pub fn with_pool_type(mut self, pool_type: PoolType) -> Self {
+        self.pool_type = pool_type;
+        self
+    }
+
+    /// Set the fraction of memory reserved for unspillable consumers.
+    ///
+    /// Must be between 0.0 and 1.0 (inclusive). Only applies when
+    /// `pool_type` is `PoolType::Fair` and `memory_limit` is set.
+    ///
+    /// Returns an error if the value is out of range.
+    pub fn with_unspillable_reserve_ratio(mut self, ratio: f64) -> 
Result<Self> {
+        if !(0.0..=1.0).contains(&ratio) {
+            return Err(DataFusionError::Configuration(format!(
+                "unspillable_reserve_ratio must be between 0.0 and 1.0, got 
{ratio}"
+            )));
+        }
+        self.unspillable_reserve_ratio = ratio;
+        Ok(self)
+    }
+
+    /// Build a [`RuntimeEnv`] from the current configuration.
+    ///
+    /// This constructs the memory pool and disk manager based on the
+    /// builder settings and returns the resulting runtime environment.
+    pub fn build_runtime_env(&self) -> Result<Arc<RuntimeEnv>> {
+        let mut rt_builder = RuntimeEnvBuilder::new();
+
+        if let Some(memory_limit) = self.memory_limit {
+            let track_capacity = NonZeroUsize::new(10).expect("track capacity 
must be non-zero");
+            let pool: Arc<dyn MemoryPool> = match self.pool_type {
+                PoolType::Fair => Arc::new(TrackConsumersPool::new(
+                    SedonaFairSpillPool::new(memory_limit, 
self.unspillable_reserve_ratio),
+                    track_capacity,
+                )),
+                PoolType::Greedy => Arc::new(TrackConsumersPool::new(
+                    GreedyMemoryPool::new(memory_limit),
+                    track_capacity,
+                )),
+            };
+            rt_builder = rt_builder.with_memory_pool(pool);
+        }
+
+        if let Some(ref temp_dir) = self.temp_dir {
+            let dm_builder = DiskManagerBuilder::default()
+                
.with_mode(DiskManagerMode::Directories(vec![PathBuf::from(temp_dir)]));
+            rt_builder = rt_builder.with_disk_manager_builder(dm_builder);
+        }
+
+        rt_builder.build_arc()
+    }
+
+    /// Build a [`SedonaContext`] from the current configuration.
+    ///
+    /// This constructs the runtime environment and then creates a fully
+    /// configured interactive `SedonaContext`.
+    pub async fn build(self) -> Result<SedonaContext> {
+        let runtime_env = self.build_runtime_env()?;
+        
SedonaContext::new_local_interactive_with_runtime_env(runtime_env).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_default_builder() {
+        let builder = SedonaContextBuilder::new();
+        assert!(builder.memory_limit.is_none());
+        assert!(builder.temp_dir.is_none());
+        assert_eq!(builder.pool_type, PoolType::Greedy);
+        assert!(
+            (builder.unspillable_reserve_ratio - 
DEFAULT_UNSPILLABLE_RESERVE_RATIO).abs()
+                < f64::EPSILON
+        );
+    }
+
+    #[test]
+    fn test_builder_with_methods() {
+        let builder = SedonaContextBuilder::new()
+            .with_memory_limit(1024)
+            .with_temp_dir("/tmp/test".to_string())
+            .with_pool_type(PoolType::Fair)
+            .with_unspillable_reserve_ratio(0.3)
+            .unwrap();
+        assert_eq!(builder.memory_limit, Some(1024));
+        assert_eq!(builder.temp_dir, Some("/tmp/test".to_string()));
+        assert_eq!(builder.pool_type, PoolType::Fair);
+        assert!((builder.unspillable_reserve_ratio - 0.3).abs() < 
f64::EPSILON);
+    }
+
+    #[test]
+    fn test_invalid_unspillable_reserve_ratio() {
+        let result = 
SedonaContextBuilder::new().with_unspillable_reserve_ratio(-0.1);
+        assert!(result.is_err());
+
+        let result = 
SedonaContextBuilder::new().with_unspillable_reserve_ratio(1.1);
+        assert!(result.is_err());
+
+        // Edge cases: 0.0 and 1.0 should be valid
+        let result = 
SedonaContextBuilder::new().with_unspillable_reserve_ratio(0.0);
+        assert!(result.is_ok());
+
+        let result = 
SedonaContextBuilder::new().with_unspillable_reserve_ratio(1.0);
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_from_options() {
+        let mut opts = HashMap::new();
+        opts.insert("memory_limit".to_string(), "4096".to_string());
+        opts.insert("temp_dir".to_string(), "/tmp/spill".to_string());
+        opts.insert("memory_pool_type".to_string(), "fair".to_string());
+        opts.insert("unspillable_reserve_ratio".to_string(), 
"0.3".to_string());
+
+        let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+        assert_eq!(builder.memory_limit, Some(4096));
+        assert_eq!(builder.temp_dir, Some("/tmp/spill".to_string()));
+        assert_eq!(builder.pool_type, PoolType::Fair);
+        assert!((builder.unspillable_reserve_ratio - 0.3).abs() < 
f64::EPSILON);
+    }
+
+    #[test]
+    fn test_from_options_empty() {
+        let opts = HashMap::new();
+        let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+        assert!(builder.memory_limit.is_none());
+        assert!(builder.temp_dir.is_none());
+        assert_eq!(builder.pool_type, PoolType::Greedy);
+    }
+
+    #[test]
+    fn test_from_options_invalid_memory_limit() {
+        let mut opts = HashMap::new();
+        opts.insert("memory_limit".to_string(), "not_a_number".to_string());
+        assert!(SedonaContextBuilder::from_options(&opts).is_err());
+    }
+
+    #[test]
+    fn test_from_options_human_readable_memory_limit() {
+        let mut opts = HashMap::new();
+        opts.insert("memory_limit".to_string(), "4gb".to_string());
+        let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+        assert_eq!(builder.memory_limit, Some(4 * 1024 * 1024 * 1024));
+
+        let mut opts = HashMap::new();
+        opts.insert("memory_limit".to_string(), "512m".to_string());
+        let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+        assert_eq!(builder.memory_limit, Some(512 * 1024 * 1024));
+
+        let mut opts = HashMap::new();
+        opts.insert("memory_limit".to_string(), "1.5g".to_string());
+        let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+        assert_eq!(
+            builder.memory_limit,
+            Some((1.5 * 1024.0 * 1024.0 * 1024.0) as usize)
+        );
+    }
+
+    #[test]
+    fn test_from_options_invalid_pool_type() {
+        let mut opts = HashMap::new();
+        opts.insert("memory_pool_type".to_string(), "invalid".to_string());
+        assert!(SedonaContextBuilder::from_options(&opts).is_err());
+    }
+
+    #[test]
+    fn test_from_options_invalid_ratio() {
+        let mut opts = HashMap::new();
+        opts.insert("unspillable_reserve_ratio".to_string(), 
"2.0".to_string());
+        assert!(SedonaContextBuilder::from_options(&opts).is_err());
+    }
+
+    #[test]
+    fn test_from_options_unrecognized_keys_ignored() {
+        let mut opts = HashMap::new();
+        opts.insert("unknown_key".to_string(), "value".to_string());
+        let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+        assert!(builder.memory_limit.is_none());
+    }
+
+    #[test]
+    fn test_build_runtime_env_no_memory_limit() {
+        let builder = SedonaContextBuilder::new();
+        let result = builder.build_runtime_env();
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_build_runtime_env_with_greedy_pool() {
+        let builder = SedonaContextBuilder::new()
+            .with_memory_limit(1024 * 1024)
+            .with_pool_type(PoolType::Greedy);
+        let result = builder.build_runtime_env();
+        assert!(result.is_ok());
+    }
+
+    #[test]
+    fn test_build_runtime_env_with_fair_pool() {
+        let builder = SedonaContextBuilder::new()
+            .with_memory_limit(1024 * 1024)
+            .with_pool_type(PoolType::Fair)
+            .with_unspillable_reserve_ratio(0.2)
+            .unwrap();
+        let result = builder.build_runtime_env();
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_build_context_default() {
+        let ctx = SedonaContextBuilder::new().build().await;
+        assert!(ctx.is_ok());
+    }
+
+    #[tokio::test]
+    async fn test_build_context_with_memory_limit() {
+        let ctx = SedonaContextBuilder::new()
+            .with_memory_limit(100 * 1024 * 1024)
+            .with_pool_type(PoolType::Fair)
+            .with_unspillable_reserve_ratio(0.2)
+            .unwrap()
+            .build()
+            .await;
+        assert!(ctx.is_ok());
+    }
+}
diff --git a/rust/sedona/src/lib.rs b/rust/sedona/src/lib.rs
index 05ce9898..6752c328 100644
--- a/rust/sedona/src/lib.rs
+++ b/rust/sedona/src/lib.rs
@@ -16,6 +16,7 @@
 // under the License.
 mod catalog;
 pub mod context;
+pub mod context_builder;
 mod exec;
 pub mod memory_pool;
 mod object_storage;
@@ -24,3 +25,4 @@ pub mod random_geometry_provider;
 pub mod reader;
 pub mod record_batch_reader_provider;
 pub mod show;
+pub mod size_parser;
diff --git a/rust/sedona/src/size_parser.rs b/rust/sedona/src/size_parser.rs
new file mode 100644
index 00000000..d85d360f
--- /dev/null
+++ b/rust/sedona/src/size_parser.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parse human-readable size strings (e.g., `"4gb"`, `"512m"`, `"1.5g"`) into
+//! byte counts.
+//!
+//! This is used by the CLI (`--memory-limit 4g`), the Python bindings
+//! (`sd.options.memory_limit = "4gb"`), and
+//! 
[`SedonaContextBuilder::from_options`](crate::context_builder::SedonaContextBuilder::from_options).
+
+use std::collections::HashMap;
+use std::sync::LazyLock;
+
+use datafusion::error::{DataFusionError, Result};
+
+#[derive(Debug, Clone, Copy)]
+enum ByteUnit {
+    Byte,
+    KiB,
+    MiB,
+    GiB,
+    TiB,
+}
+
+impl ByteUnit {
+    fn multiplier(&self) -> u64 {
+        match self {
+            ByteUnit::Byte => 1,
+            ByteUnit::KiB => 1 << 10,
+            ByteUnit::MiB => 1 << 20,
+            ByteUnit::GiB => 1 << 30,
+            ByteUnit::TiB => 1 << 40,
+        }
+    }
+}
+
+static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> = 
LazyLock::new(|| {
+    let mut m = HashMap::new();
+    m.insert("b", ByteUnit::Byte);
+    m.insert("k", ByteUnit::KiB);
+    m.insert("kb", ByteUnit::KiB);
+    m.insert("m", ByteUnit::MiB);
+    m.insert("mb", ByteUnit::MiB);
+    m.insert("g", ByteUnit::GiB);
+    m.insert("gb", ByteUnit::GiB);
+    m.insert("t", ByteUnit::TiB);
+    m.insert("tb", ByteUnit::TiB);
+    m
+});
+
+static SUFFIX_REGEX: LazyLock<regex::Regex> =
+    LazyLock::new(|| regex::Regex::new(r"^([0-9.]+)\s*([a-z]+)?$").unwrap());
+
+/// Parse a human-readable size string into a byte count.
+///
+/// Accepts formats like `"4gb"`, `"512m"`, `"1.5g"`, `"4096"`, `"100 mb"`.
+/// The suffix is case-insensitive. When no suffix is provided, the value is
+/// treated as bytes.
+///
+/// # Supported suffixes
+///
+/// | Suffix    | Unit     |
+/// |-----------|----------|
+/// | `b`       | Bytes    |
+/// | `k`, `kb` | KiB      |
+/// | `m`, `mb` | MiB      |
+/// | `g`, `gb` | GiB      |
+/// | `t`, `tb` | TiB      |
+///
+/// # Examples
+///
+/// ```
+/// use sedona::size_parser::parse_size_string;
+///
+/// assert_eq!(parse_size_string("4gb").unwrap(), 4 * 1024 * 1024 * 1024);
+/// assert_eq!(parse_size_string("512m").unwrap(), 512 * 1024 * 1024);
+/// assert_eq!(parse_size_string("4096").unwrap(), 4096);
+/// ```
+pub fn parse_size_string(size: &str) -> Result<usize> {
+    let lower = size.to_lowercase();
+    if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
+        let num_str = caps.get(1).unwrap().as_str();
+        let num = num_str
+            .parse::<f64>()
+            .map_err(|_| DataFusionError::Configuration(format!("Invalid size 
string '{size}'")))?;
+
+        let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
+        let unit = BYTE_SUFFIXES.get(suffix).ok_or_else(|| {
+            DataFusionError::Configuration(format!("Invalid size string 
'{size}'"))
+        })?;
+        let total_bytes = num * unit.multiplier() as f64;
+        if !total_bytes.is_finite() || total_bytes > usize::MAX as f64 {
+            return Err(DataFusionError::Configuration(format!(
+                "Size string '{size}' is too large"
+            )));
+        }
+
+        Ok(total_bytes as usize)
+    } else {
+        Err(DataFusionError::Configuration(format!(
+            "Invalid size string '{size}'"
+        )))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn bare_numbers_are_bytes() {
+        assert_eq!(parse_size_string("5").unwrap(), 5);
+        assert_eq!(parse_size_string("100").unwrap(), 100);
+    }
+
+    #[test]
+    fn byte_suffix() {
+        assert_eq!(parse_size_string("5b").unwrap(), 5);
+    }
+
+    #[test]
+    fn kib_suffixes() {
+        assert_eq!(parse_size_string("4k").unwrap(), 4 * 1024);
+        assert_eq!(parse_size_string("4kb").unwrap(), 4 * 1024);
+    }
+
+    #[test]
+    fn mib_suffixes() {
+        assert_eq!(parse_size_string("20m").unwrap(), 20 * 1024 * 1024);
+        assert_eq!(parse_size_string("20mb").unwrap(), 20 * 1024 * 1024);
+    }
+
+    #[test]
+    fn gib_suffixes() {
+        assert_eq!(parse_size_string("2g").unwrap(), 2 * 1024 * 1024 * 1024);
+        assert_eq!(parse_size_string("2gb").unwrap(), 2 * 1024 * 1024 * 1024);
+    }
+
+    #[test]
+    fn tib_suffixes() {
+        assert_eq!(
+            parse_size_string("3t").unwrap(),
+            3 * 1024 * 1024 * 1024 * 1024
+        );
+        assert_eq!(
+            parse_size_string("4tb").unwrap(),
+            4 * 1024 * 1024 * 1024 * 1024
+        );
+    }
+
+    #[test]
+    fn case_insensitive() {
+        assert_eq!(parse_size_string("4K").unwrap(), 4 * 1024);
+        assert_eq!(parse_size_string("4KB").unwrap(), 4 * 1024);
+        assert_eq!(parse_size_string("20M").unwrap(), 20 * 1024 * 1024);
+        assert_eq!(parse_size_string("20MB").unwrap(), 20 * 1024 * 1024);
+        assert_eq!(parse_size_string("2G").unwrap(), 2 * 1024 * 1024 * 1024);
+        assert_eq!(parse_size_string("2GB").unwrap(), 2 * 1024 * 1024 * 1024);
+        assert_eq!(
+            parse_size_string("2T").unwrap(),
+            2 * 1024 * 1024 * 1024 * 1024
+        );
+    }
+
+    #[test]
+    fn decimal_values() {
+        assert_eq!(
+            parse_size_string("1.5g").unwrap(),
+            (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
+        );
+        assert_eq!(
+            parse_size_string("0.5m").unwrap(),
+            (0.5 * 1024.0 * 1024.0) as usize
+        );
+        assert_eq!(
+            parse_size_string("9.5 gb").unwrap(),
+            (9.5 * 1024.0 * 1024.0 * 1024.0) as usize
+        );
+    }
+
+    #[test]
+    fn spaces_between_number_and_suffix() {
+        assert_eq!(parse_size_string("4 k").unwrap(), 4 * 1024);
+        assert_eq!(parse_size_string("20 mb").unwrap(), 20 * 1024 * 1024);
+    }
+
+    #[test]
+    fn invalid_input() {
+        assert!(parse_size_string("invalid").is_err());
+        assert!(parse_size_string("4kbx").is_err());
+        assert!(parse_size_string("-20mb").is_err());
+        assert!(parse_size_string("-100").is_err());
+        assert!(parse_size_string("12k12k").is_err());
+    }
+
+    #[test]
+    fn overflow() {
+        assert!(parse_size_string("99999999t").is_err());
+    }
+}
diff --git a/sedona-cli/src/main.rs b/sedona-cli/src/main.rs
index 79a7a928..c0964d84 100644
--- a/sedona-cli/src/main.rs
+++ b/sedona-cli/src/main.rs
@@ -15,18 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::env;
-use std::num::NonZeroUsize;
 use std::path::Path;
 use std::process::ExitCode;
-use std::sync::{Arc, LazyLock};
 
 use datafusion::error::{DataFusionError, Result};
-use datafusion::execution::memory_pool::{GreedyMemoryPool, MemoryPool, 
TrackConsumersPool};
-use datafusion::execution::runtime_env::RuntimeEnvBuilder;
-use sedona::context::SedonaContext;
-use sedona::memory_pool::{SedonaFairSpillPool, 
DEFAULT_UNSPILLABLE_RESERVE_RATIO};
+use sedona::context_builder::SedonaContextBuilder;
+use sedona::memory_pool::DEFAULT_UNSPILLABLE_RESERVE_RATIO;
 use sedona::pool_type::PoolType;
 use sedona_cli::{
     exec,
@@ -163,27 +158,13 @@ async fn main_inner() -> Result<()> {
         env::set_current_dir(p).unwrap();
     };
 
-    let mut rt_builder = RuntimeEnvBuilder::new();
-    // set memory pool size
+    let mut builder = SedonaContextBuilder::new()
+        .with_pool_type(args.mem_pool_type.clone())
+        .with_unspillable_reserve_ratio(args.unspillable_reserve_ratio)?;
     if let Some(memory_limit) = args.memory_limit {
-        // set memory pool type
-        let track_capacity = NonZeroUsize::new(10).expect("track capacity must 
be non-zero");
-        let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
-            PoolType::Fair => Arc::new(TrackConsumersPool::new(
-                SedonaFairSpillPool::new(memory_limit, 
args.unspillable_reserve_ratio),
-                track_capacity,
-            )),
-            PoolType::Greedy => Arc::new(TrackConsumersPool::new(
-                GreedyMemoryPool::new(memory_limit),
-                track_capacity,
-            )),
-        };
-
-        rt_builder = rt_builder.with_memory_pool(pool)
+        builder = builder.with_memory_limit(memory_limit);
     }
-    let runtime_env = rt_builder.build_arc()?;
-
-    let ctx = 
SedonaContext::new_local_interactive_with_runtime_env(runtime_env).await?;
+    let ctx = builder.build().await?;
 
     let mut print_options = PrintOptions {
         format: args.format,
@@ -238,69 +219,8 @@ fn parse_command(command: &str) -> Result<String, String> {
     }
 }
 
-#[derive(Debug, Clone, Copy)]
-enum ByteUnit {
-    Byte,
-    KiB,
-    MiB,
-    GiB,
-    TiB,
-}
-
-impl ByteUnit {
-    fn multiplier(&self) -> u64 {
-        match self {
-            ByteUnit::Byte => 1,
-            ByteUnit::KiB => 1 << 10,
-            ByteUnit::MiB => 1 << 20,
-            ByteUnit::GiB => 1 << 30,
-            ByteUnit::TiB => 1 << 40,
-        }
-    }
-}
-
-fn parse_size_string(size: &str, label: &str) -> Result<usize, String> {
-    static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> = 
LazyLock::new(|| {
-        let mut m = HashMap::new();
-        m.insert("b", ByteUnit::Byte);
-        m.insert("k", ByteUnit::KiB);
-        m.insert("kb", ByteUnit::KiB);
-        m.insert("m", ByteUnit::MiB);
-        m.insert("mb", ByteUnit::MiB);
-        m.insert("g", ByteUnit::GiB);
-        m.insert("gb", ByteUnit::GiB);
-        m.insert("t", ByteUnit::TiB);
-        m.insert("tb", ByteUnit::TiB);
-        m
-    });
-
-    static SUFFIX_REGEX: LazyLock<regex::Regex> =
-        LazyLock::new(|| 
regex::Regex::new(r"^([0-9.]+)\s*([a-z]+)?$").unwrap());
-
-    let lower = size.to_lowercase();
-    if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
-        let num_str = caps.get(1).unwrap().as_str();
-        let num = num_str
-            .parse::<f64>()
-            .map_err(|_| format!("Invalid numeric value in {label} 
'{size}'"))?;
-
-        let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
-        let unit = BYTE_SUFFIXES
-            .get(suffix)
-            .ok_or_else(|| format!("Invalid {label} '{size}'"))?;
-        let total_bytes = num * unit.multiplier() as f64;
-        if !total_bytes.is_finite() || total_bytes > usize::MAX as f64 {
-            return Err(format!("{label} '{size}' is too large"));
-        }
-
-        Ok(total_bytes as usize)
-    } else {
-        Err(format!("Invalid {label} '{size}'"))
-    }
-}
-
 pub fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
-    parse_size_string(size, "memory pool size")
+    sedona::size_parser::parse_size_string(size).map_err(|e| e.to_string())
 }
 
 fn validate_unspillable_reserve_ratio(s: &str) -> Result<f64, String> {
@@ -314,73 +234,3 @@ fn validate_unspillable_reserve_ratio(s: &str) -> 
Result<f64, String> {
     }
     Ok(value)
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    fn assert_conversion(input: &str, expected: Result<usize, String>) {
-        let result = extract_memory_pool_size(input);
-        match expected {
-            Ok(v) => assert_eq!(result.unwrap(), v),
-            Err(e) => assert_eq!(result.unwrap_err(), e),
-        }
-    }
-
-    #[test]
-    fn memory_pool_size() -> Result<(), String> {
-        // Test basic sizes without suffix, assumed to be bytes
-        assert_conversion("5", Ok(5));
-        assert_conversion("100", Ok(100));
-
-        // Test various units
-        assert_conversion("5b", Ok(5));
-        assert_conversion("4k", Ok(4 * 1024));
-        assert_conversion("4kb", Ok(4 * 1024));
-        assert_conversion("20m", Ok(20 * 1024 * 1024));
-        assert_conversion("20mb", Ok(20 * 1024 * 1024));
-        assert_conversion("2g", Ok(2 * 1024 * 1024 * 1024));
-        assert_conversion("2gb", Ok(2 * 1024 * 1024 * 1024));
-        assert_conversion("3t", Ok(3 * 1024 * 1024 * 1024 * 1024));
-        assert_conversion("4tb", Ok(4 * 1024 * 1024 * 1024 * 1024));
-
-        // Test case insensitivity
-        assert_conversion("4K", Ok(4 * 1024));
-        assert_conversion("4KB", Ok(4 * 1024));
-        assert_conversion("20M", Ok(20 * 1024 * 1024));
-        assert_conversion("20MB", Ok(20 * 1024 * 1024));
-        assert_conversion("2G", Ok(2 * 1024 * 1024 * 1024));
-        assert_conversion("2GB", Ok(2 * 1024 * 1024 * 1024));
-        assert_conversion("2T", Ok(2 * 1024 * 1024 * 1024 * 1024));
-
-        // Test decimal values
-        assert_conversion("1.5g", Ok((1.5 * 1024.0 * 1024.0 * 1024.0) as 
usize));
-        assert_conversion("0.5m", Ok((0.5 * 1024.0 * 1024.0) as usize));
-        assert_conversion("9.5 gb", Ok((9.5 * 1024.0 * 1024.0 * 1024.0) as 
usize));
-
-        // Test with spaces between number and suffix
-        assert_conversion("4 k", Ok(4 * 1024));
-        assert_conversion("20 mb", Ok(20 * 1024 * 1024));
-
-        // Test invalid input
-        assert_conversion(
-            "invalid",
-            Err("Invalid memory pool size 'invalid'".to_string()),
-        );
-        assert_conversion("4kbx", Err("Invalid memory pool size 
'4kbx'".to_string()));
-        assert_conversion("-20mb", Err("Invalid memory pool size 
'-20mb'".to_string()));
-        assert_conversion("-100", Err("Invalid memory pool size 
'-100'".to_string()));
-        assert_conversion(
-            "12k12k",
-            Err("Invalid memory pool size '12k12k'".to_string()),
-        );
-
-        // Test overflow
-        assert_conversion(
-            "99999999t",
-            Err("memory pool size '99999999t' is too large".to_string()),
-        );
-
-        Ok(())
-    }
-}


Reply via email to