This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new a64e3a61 feat(python/sedonadb): Expose memory pool and runtime
configuration in Python bindings (#608)
a64e3a61 is described below
commit a64e3a61205de23c9ae32371696a10d692a16ab5
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Feb 18 22:45:33 2026 +0800
feat(python/sedonadb): Expose memory pool and runtime configuration in
Python bindings (#608)
Co-authored-by: Dewey Dunnington <[email protected]>
---
Cargo.lock | 1 +
python/sedonadb/python/sedonadb/_options.py | 178 ++++++++++++-
python/sedonadb/python/sedonadb/context.py | 54 +++-
python/sedonadb/python/sedonadb/utility.py | 2 +-
python/sedonadb/src/context.rs | 9 +-
rust/sedona-geo-generic-alg/Cargo.toml | 2 +-
rust/sedona/Cargo.toml | 1 +
rust/sedona/src/context_builder.rs | 399 ++++++++++++++++++++++++++++
rust/sedona/src/lib.rs | 2 +
rust/sedona/src/size_parser.rs | 214 +++++++++++++++
sedona-cli/src/main.rs | 166 +-----------
11 files changed, 862 insertions(+), 166 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index e01ec7c0..c4fa1b95 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5117,6 +5117,7 @@ dependencies = [
"geo-types",
"object_store",
"parking_lot",
+ "regex",
"rstest",
"sedona-common",
"sedona-datasource",
diff --git a/python/sedonadb/python/sedonadb/_options.py
b/python/sedonadb/python/sedonadb/_options.py
index b7ac40a0..5da51077 100644
--- a/python/sedonadb/python/sedonadb/_options.py
+++ b/python/sedonadb/python/sedonadb/_options.py
@@ -14,16 +14,82 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+import os
+from typing import Literal, Optional, Union
+
+from sedonadb.utility import sedona # noqa: F401
class Options:
- """Global SedonaDB options"""
+ """Global SedonaDB options
+
+ Options are divided into two categories:
+
+ **Display options** can be changed at any time and affect how results are
+ presented:
+
+ - `interactive`: Enable/disable auto-display of DataFrames.
+ - `width`: Override the detected terminal width.
+
+ **Runtime options** configure the execution environment and must be set
+ *before* the first query is executed (i.e., before the internal context
+ is initialized). Attempting to change these after the context has been
+ created will raise a `RuntimeError`:
+
+ - `memory_limit`: Maximum memory for execution, in bytes or as a
+ human-readable string (e.g., `"4gb"`, `"512m"`).
+ - `temp_dir`: Directory for temporary/spill files.
+ - `memory_pool_type`: Memory pool type (`"greedy"` or `"fair"`).
+ - `unspillable_reserve_ratio`: Fraction of memory reserved for
+ unspillable consumers (only applies to the `"fair"` pool type).
+
+ Examples:
+
+ >>> sd = sedona.db.connect()
+ >>> sd.options.memory_limit = "4gb"
+ >>> sd.options.memory_pool_type = "fair"
+ >>> sd.options.temp_dir = "/tmp/sedona-spill"
+ >>> sd.options.interactive = True
+ >>> sd.sql("SELECT 1 as one")
+ ┌───────┐
+ │ one │
+ │ int64 │
+ ╞═══════╡
+ │ 1 │
+ └───────┘
+ """
def __init__(self):
+ # Display options (can be changed at any time)
self._interactive = False
self._width = None
+ # Runtime options (must be set before first query)
+ self._memory_limit = None
+ self._temp_dir = None
+ self._memory_pool_type = "greedy"
+ self._unspillable_reserve_ratio = None
+
+ # Set to True once the internal context is created; after this,
+ # runtime options become read-only.
+ self._runtime_frozen = False
+
+ def freeze_runtime(self) -> None:
+ """Mark runtime options as read-only.
+
+ Called after the internal context has been successfully created.
+ """
+ self._runtime_frozen = True
+
+ def _check_runtime_mutable(self, name: str) -> None:
+ if self._runtime_frozen:
+ raise RuntimeError(
+ f"Cannot change '{name}' after the context has been
initialized. "
+ f"Set this option before executing your first query."
+ )
+
+ # --- Display options ---
+
@property
def interactive(self) -> bool:
"""Use interactive mode
@@ -52,3 +118,111 @@ class Options:
@width.setter
def width(self, value: Optional[int]):
self._width = value
+
+ # --- Runtime options (must be set before first query) ---
+
+ @property
+ def memory_limit(self) -> Union[int, str, None]:
+ """Maximum memory for query execution.
+
+ Accepts an integer (bytes) or a human-readable string such as
+ `"4gb"`, `"512m"`, or `"1.5g"`. When set, a bounded memory pool is
+ created to enforce this limit. Without a memory limit, DataFusion's
+ default unbounded pool is used.
+
+ Must be set before the first query is executed.
+
+ Examples:
+
+ >>> sd = sedona.db.connect()
+ >>> sd.options.memory_limit = "4gb"
+ >>> sd.options.memory_limit = 4 * 1024 * 1024 * 1024 # equivalent
+ """
+ return self._memory_limit
+
+ @memory_limit.setter
+ def memory_limit(self, value: Union[int, str, None]) -> None:
+ self._check_runtime_mutable("memory_limit")
+ if value is not None and not isinstance(value, (int, str)):
+ raise TypeError(
+ f"memory_limit must be an int, str, or None, got
{type(value).__name__}"
+ )
+ self._memory_limit = value
+
+ @property
+ def temp_dir(self) -> Optional[str]:
+ """Directory for temporary/spill files.
+
+ When set, disk-based spilling will use this directory. When `None`,
+ DataFusion's default temporary directory is used.
+
+ Must be set before the first query is executed.
+ """
+ return self._temp_dir
+
+ @temp_dir.setter
+ def temp_dir(self, value: "Optional[Union[str, os.PathLike[str]]]") ->
None:
+ self._check_runtime_mutable("temp_dir")
+ if value is None:
+ self._temp_dir = None
+ elif isinstance(value, os.PathLike):
+ self._temp_dir = os.fspath(value)
+ elif isinstance(value, str):
+ self._temp_dir = value
+ else:
+ raise TypeError(
+ f"temp_dir must be a str, PathLike, or None, got
{type(value).__name__}"
+ )
+
+ @property
+ def memory_pool_type(self) -> str:
+ """Memory pool type: `"greedy"` or `"fair"`.
+
+ - `"greedy"`: A simple pool that grants reservations on a
+ first-come-first-served basis. This is the default.
+ - `"fair"`: A pool that fairly distributes memory among spillable
+ consumers and reserves a fraction for unspillable consumers
+ (configured via `unspillable_reserve_ratio`).
+
+ Only takes effect when `memory_limit` is set.
+ Must be set before the first query is executed.
+ """
+ return self._memory_pool_type
+
+ @memory_pool_type.setter
+ def memory_pool_type(self, value: Literal["greedy", "fair"]) -> None:
+ self._check_runtime_mutable("memory_pool_type")
+ if value not in ("greedy", "fair"):
+ raise ValueError(
+ f"memory_pool_type must be 'greedy' or 'fair', got '{value}'"
+ )
+ self._memory_pool_type = value
+
+ @property
+ def unspillable_reserve_ratio(self) -> Optional[float]:
+ """Fraction of memory reserved for unspillable consumers (0.0 - 1.0).
+
+ Only applies when `memory_pool_type` is `"fair"` and
+ `memory_limit` is set. Defaults to 0.2 when not explicitly set.
+
+ Must be set before the first query is executed.
+ """
+ return self._unspillable_reserve_ratio
+
+ @unspillable_reserve_ratio.setter
+ def unspillable_reserve_ratio(self, value: Optional[float]) -> None:
+ self._check_runtime_mutable("unspillable_reserve_ratio")
+ if value is None:
+ self._unspillable_reserve_ratio = None
+ return
+ if not isinstance(value, (int, float)):
+ raise TypeError(
+ "unspillable_reserve_ratio must be a number between 0.0 and
1.0 "
+ f"or None, got {type(value).__name__}"
+ )
+ value = float(value)
+ if not (0.0 <= value <= 1.0):
+ raise ValueError(
+ f"unspillable_reserve_ratio must be between 0.0 and 1.0, got
{value}"
+ )
+ self._unspillable_reserve_ratio = value
diff --git a/python/sedonadb/python/sedonadb/context.py
b/python/sedonadb/python/sedonadb/context.py
index 1dcb3fad..5ad0a5f5 100644
--- a/python/sedonadb/python/sedonadb/context.py
+++ b/python/sedonadb/python/sedonadb/context.py
@@ -36,6 +36,11 @@ class SedonaContext:
registered tables, and available memory. This is similar to a
Spark SessionContext or a database connection.
+ Runtime configuration (memory limits, spill directory, pool type) can
+ be set via `options` before executing the first query. Once the
+ first query runs, the internal execution context is created and
+ runtime options become read-only.
+
Examples:
>>> sd = sedona.db.connect()
@@ -47,12 +52,47 @@ class SedonaContext:
╞═══════╡
│ 1 │
└───────┘
+
+ Configuring memory limits:
+
+ >>> sd = sedona.db.connect()
+ >>> sd.options.memory_limit = "4gb"
+ >>> sd.options.memory_pool_type = "fair"
"""
def __init__(self):
- self._impl = InternalContext()
+ self.__impl = None
self.options = Options()
+ @property
+ def _impl(self):
+ """Lazily initialize the internal Rust context on first use.
+
+ This allows runtime options (memory_limit, temp_dir, etc.) to be
+ configured via `self.options` before the context is created.
+ Once created, runtime options are frozen.
+ """
+ if self.__impl is None:
+ # Build a dict[str, str] of non-None runtime options
+ opts = {}
+ if self.options.memory_limit is not None:
+ opts["memory_limit"] = str(self.options.memory_limit)
+ if self.options.temp_dir is not None:
+ opts["temp_dir"] = self.options.temp_dir
+ if self.options.memory_pool_type is not None:
+ opts["memory_pool_type"] = self.options.memory_pool_type
+ if self.options.unspillable_reserve_ratio is not None:
+ opts["unspillable_reserve_ratio"] = str(
+ self.options.unspillable_reserve_ratio
+ )
+
+ # Create the context first, then freeze options. If creation
+ # fails the user can still correct options and retry.
+ impl = InternalContext(opts)
+ self.__impl = impl
+ self.options.freeze_runtime()
+ return self.__impl
+
def create_data_frame(self, obj: Any, schema: Any = None) -> DataFrame:
"""Create a DataFrame from an in-memory or protocol-enabled object.
@@ -381,7 +421,17 @@ class SedonaContext:
def connect() -> SedonaContext:
- """Create a new [SedonaContext][sedonadb.context.SedonaContext]"""
+ """Create a new [SedonaContext][sedonadb.context.SedonaContext]
+
+ Runtime configuration (memory limits, spill directory, pool type)
+ can be set via `options` on the returned context before executing
+ the first query::
+
+ sd = sedona.db.connect()
+ sd.options.memory_limit = "4gb"
+ sd.options.memory_pool_type = "fair"
+ sd.options.temp_dir = "/tmp/sedona-spill"
+ """
return SedonaContext()
diff --git a/python/sedonadb/python/sedonadb/utility.py
b/python/sedonadb/python/sedonadb/utility.py
index 38f10a03..8c82510f 100644
--- a/python/sedonadb/python/sedonadb/utility.py
+++ b/python/sedonadb/python/sedonadb/utility.py
@@ -19,7 +19,7 @@
class Sedona:
"""Mock sedona Python module
- The Apache Sedaona Python ecosystem is centered around the apache-sedona
Python
+ The Apache Sedona Python ecosystem is centered around the apache-sedona
Python
package which provides the `sedona` modules. To decouple the maintenance
and
provide fine-grained dependency control for projects that need it, sedonadb
is distributed as a standalone package. This mock `sedona` module lets us
write
diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs
index 1647bd05..113df3b7 100644
--- a/python/sedonadb/src/context.rs
+++ b/python/sedonadb/src/context.rs
@@ -19,6 +19,7 @@ use std::{collections::HashMap, sync::Arc};
use datafusion_expr::ScalarUDFImpl;
use pyo3::prelude::*;
use sedona::context::SedonaContext;
+use sedona::context_builder::SedonaContextBuilder;
use tokio::runtime::Runtime;
use crate::{
@@ -39,7 +40,8 @@ pub struct InternalContext {
#[pymethods]
impl InternalContext {
#[new]
- fn new(py: Python) -> Result<Self, PySedonaError> {
+ #[pyo3(signature = (options=HashMap::new()))]
+ fn new(py: Python, options: HashMap<String, String>) -> Result<Self,
PySedonaError> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
@@ -47,7 +49,10 @@ impl InternalContext {
PySedonaError::SedonaPython(format!("Failed to build
multithreaded runtime: {e}"))
})?;
- let inner = wait_for_future(py, &runtime,
SedonaContext::new_local_interactive())??;
+ let builder = SedonaContextBuilder::from_options(&options)
+ .map_err(|e| PySedonaError::SedonaPython(e.to_string()))?;
+
+ let inner = wait_for_future(py, &runtime, builder.build())??;
Ok(Self {
inner,
diff --git a/rust/sedona-geo-generic-alg/Cargo.toml
b/rust/sedona-geo-generic-alg/Cargo.toml
index add91a43..178bb888 100644
--- a/rust/sedona-geo-generic-alg/Cargo.toml
+++ b/rust/sedona-geo-generic-alg/Cargo.toml
@@ -32,7 +32,7 @@ float_next_after = { workspace = true }
geo-traits = { workspace = true }
geo-types = { workspace = true, features = ["approx", "use-rstar_0_12"] }
sedona-geo-traits-ext = { workspace = true }
-log = "0.4.11"
+log = { workspace = true }
num-traits = { workspace = true }
robust = "1.1.0"
rstar = "0.12.0"
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index 199a783e..eefc5b93 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -66,6 +66,7 @@ geo-traits = { workspace = true }
geo-types = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
+regex = { workspace = true }
sedona-common = { workspace = true }
sedona-datasource = { workspace = true }
sedona-expr = { workspace = true }
diff --git a/rust/sedona/src/context_builder.rs
b/rust/sedona/src/context_builder.rs
new file mode 100644
index 00000000..ad6b79c8
--- /dev/null
+++ b/rust/sedona/src/context_builder.rs
@@ -0,0 +1,399 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, num::NonZeroUsize, path::PathBuf, sync::Arc};
+
+use datafusion::{
+ error::{DataFusionError, Result},
+ execution::{
+ disk_manager::{DiskManagerBuilder, DiskManagerMode},
+ memory_pool::{GreedyMemoryPool, MemoryPool, TrackConsumersPool},
+ runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
+ },
+};
+
+use crate::{
+ context::SedonaContext,
+ memory_pool::{SedonaFairSpillPool, DEFAULT_UNSPILLABLE_RESERVE_RATIO},
+ pool_type::PoolType,
+ size_parser,
+};
+
+/// Builder for constructing a [`SedonaContext`] with configurable runtime
+/// environment settings.
+///
+/// This builder centralizes the construction of memory pools, disk managers,
+/// and runtime environments so that the same logic can be reused across the
+/// CLI, Python bindings, ADBC driver, and any future entry points.
+///
+/// # Examples
+///
+/// ```rust,no_run
+/// # async fn example() -> datafusion::error::Result<()> {
+/// use sedona::context_builder::SedonaContextBuilder;
+/// use sedona::pool_type::PoolType;
+///
+/// let ctx = SedonaContextBuilder::new()
+/// .with_memory_limit(4 * 1024 * 1024 * 1024)
+/// .with_pool_type(PoolType::Fair)
+/// .with_temp_dir("/tmp/sedona-spill".to_string())
+/// .build()
+/// .await?;
+/// # Ok(())
+/// # }
+/// ```
+///
+/// String-based configuration (useful for ADBC connection options, etc.):
+///
+/// ```rust,no_run
+/// # async fn example() -> datafusion::error::Result<()> {
+/// use std::collections::HashMap;
+/// use sedona::context_builder::SedonaContextBuilder;
+///
+/// let mut opts = HashMap::new();
+/// opts.insert("memory_limit".to_string(), "4gb".to_string());
+/// opts.insert("memory_pool_type".to_string(), "fair".to_string());
+///
+/// let ctx = SedonaContextBuilder::from_options(&opts)?.build().await?;
+/// # Ok(())
+/// # }
+/// ```
+pub struct SedonaContextBuilder {
+ memory_limit: Option<usize>,
+ temp_dir: Option<String>,
+ pool_type: PoolType,
+ unspillable_reserve_ratio: f64,
+}
+
+impl Default for SedonaContextBuilder {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SedonaContextBuilder {
+ /// Create a new builder with default settings.
+ ///
+ /// Defaults:
+ /// - `memory_limit`: `None` (no limit, uses DataFusion's default
unbounded pool)
+ /// - `pool_type`: `PoolType::Greedy`
+ /// - `unspillable_reserve_ratio`: `0.2`
+ /// - `temp_dir`: `None` (uses DataFusion's default temp directory)
+ pub fn new() -> Self {
+ Self {
+ memory_limit: None,
+ temp_dir: None,
+ pool_type: PoolType::Greedy,
+ unspillable_reserve_ratio: DEFAULT_UNSPILLABLE_RESERVE_RATIO,
+ }
+ }
+
+ /// Create a builder from string-based key-value options.
+ ///
+ /// Recognized keys:
+ /// - `"memory_limit"`: Memory limit as a human-readable size string
+ /// (e.g., `"4gb"`, `"512m"`, `"1.5g"`) or plain bytes (e.g.,
+ /// `"4294967296"`). See [`size_parser::parse_size_string`] for
+ /// supported suffixes.
+ /// - `"temp_dir"`: Path for temporary/spill files
+ /// - `"memory_pool_type"`: `"greedy"` or `"fair"`
+ /// - `"unspillable_reserve_ratio"`: Float between 0.0 and 1.0
+ ///
+ /// Unrecognized keys are ignored.
+ pub fn from_options(options: &HashMap<String, String>) -> Result<Self> {
+ let mut builder = Self::new();
+
+ if let Some(memory_limit) = options.get("memory_limit") {
+ let limit = size_parser::parse_size_string(memory_limit)?;
+ builder = builder.with_memory_limit(limit);
+ }
+
+ if let Some(temp_dir) = options.get("temp_dir") {
+ builder = builder.with_temp_dir(temp_dir.clone());
+ }
+
+ if let Some(pool_type) = options.get("memory_pool_type") {
+ let pt: PoolType = pool_type
+ .parse()
+ .map_err(|e: String| DataFusionError::Configuration(e))?;
+ builder = builder.with_pool_type(pt);
+ }
+
+ if let Some(ratio) = options.get("unspillable_reserve_ratio") {
+ let r: f64 = ratio.parse().map_err(|_| {
+ DataFusionError::Configuration(format!(
+ "Invalid unspillable_reserve_ratio value '{ratio}':
expected a float"
+ ))
+ })?;
+ builder = builder.with_unspillable_reserve_ratio(r)?;
+ }
+
+ Ok(builder)
+ }
+
+ /// Set the memory limit in bytes.
+ ///
+ /// When set, a memory pool is created to enforce this limit. Without a
+ /// memory limit, DataFusion's default unbounded memory pool is used.
+ pub fn with_memory_limit(mut self, memory_limit: usize) -> Self {
+ self.memory_limit = Some(memory_limit);
+ self
+ }
+
+ /// Set the directory for temporary/spill files.
+ pub fn with_temp_dir(mut self, temp_dir: String) -> Self {
+ self.temp_dir = Some(temp_dir);
+ self
+ }
+
+ /// Set the memory pool type.
+ ///
+ /// - `PoolType::Greedy`: A simple pool that grants reservations on a
+ /// first-come-first-served basis. This is the default.
+ /// - `PoolType::Fair`: A pool that fairly distributes memory among
+ /// spillable consumers and reserves a fraction of memory for
+ /// unspillable consumers (configured via
+ ///
[`with_unspillable_reserve_ratio`](Self::with_unspillable_reserve_ratio)).
+ ///
+ /// Only takes effect when `memory_limit` is set.
+ pub fn with_pool_type(mut self, pool_type: PoolType) -> Self {
+ self.pool_type = pool_type;
+ self
+ }
+
+ /// Set the fraction of memory reserved for unspillable consumers.
+ ///
+ /// Must be between 0.0 and 1.0 (inclusive). Only applies when
+ /// `pool_type` is `PoolType::Fair` and `memory_limit` is set.
+ ///
+ /// Returns an error if the value is out of range.
+ pub fn with_unspillable_reserve_ratio(mut self, ratio: f64) ->
Result<Self> {
+ if !(0.0..=1.0).contains(&ratio) {
+ return Err(DataFusionError::Configuration(format!(
+ "unspillable_reserve_ratio must be between 0.0 and 1.0, got
{ratio}"
+ )));
+ }
+ self.unspillable_reserve_ratio = ratio;
+ Ok(self)
+ }
+
+ /// Build a [`RuntimeEnv`] from the current configuration.
+ ///
+ /// This constructs the memory pool and disk manager based on the
+ /// builder settings and returns the resulting runtime environment.
+ pub fn build_runtime_env(&self) -> Result<Arc<RuntimeEnv>> {
+ let mut rt_builder = RuntimeEnvBuilder::new();
+
+ if let Some(memory_limit) = self.memory_limit {
+ let track_capacity = NonZeroUsize::new(10).expect("track capacity
must be non-zero");
+ let pool: Arc<dyn MemoryPool> = match self.pool_type {
+ PoolType::Fair => Arc::new(TrackConsumersPool::new(
+ SedonaFairSpillPool::new(memory_limit,
self.unspillable_reserve_ratio),
+ track_capacity,
+ )),
+ PoolType::Greedy => Arc::new(TrackConsumersPool::new(
+ GreedyMemoryPool::new(memory_limit),
+ track_capacity,
+ )),
+ };
+ rt_builder = rt_builder.with_memory_pool(pool);
+ }
+
+ if let Some(ref temp_dir) = self.temp_dir {
+ let dm_builder = DiskManagerBuilder::default()
+
.with_mode(DiskManagerMode::Directories(vec![PathBuf::from(temp_dir)]));
+ rt_builder = rt_builder.with_disk_manager_builder(dm_builder);
+ }
+
+ rt_builder.build_arc()
+ }
+
+ /// Build a [`SedonaContext`] from the current configuration.
+ ///
+ /// This constructs the runtime environment and then creates a fully
+ /// configured interactive `SedonaContext`.
+ pub async fn build(self) -> Result<SedonaContext> {
+ let runtime_env = self.build_runtime_env()?;
+
SedonaContext::new_local_interactive_with_runtime_env(runtime_env).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_default_builder() {
+ let builder = SedonaContextBuilder::new();
+ assert!(builder.memory_limit.is_none());
+ assert!(builder.temp_dir.is_none());
+ assert_eq!(builder.pool_type, PoolType::Greedy);
+ assert!(
+ (builder.unspillable_reserve_ratio -
DEFAULT_UNSPILLABLE_RESERVE_RATIO).abs()
+ < f64::EPSILON
+ );
+ }
+
+ #[test]
+ fn test_builder_with_methods() {
+ let builder = SedonaContextBuilder::new()
+ .with_memory_limit(1024)
+ .with_temp_dir("/tmp/test".to_string())
+ .with_pool_type(PoolType::Fair)
+ .with_unspillable_reserve_ratio(0.3)
+ .unwrap();
+ assert_eq!(builder.memory_limit, Some(1024));
+ assert_eq!(builder.temp_dir, Some("/tmp/test".to_string()));
+ assert_eq!(builder.pool_type, PoolType::Fair);
+ assert!((builder.unspillable_reserve_ratio - 0.3).abs() <
f64::EPSILON);
+ }
+
+ #[test]
+ fn test_invalid_unspillable_reserve_ratio() {
+ let result =
SedonaContextBuilder::new().with_unspillable_reserve_ratio(-0.1);
+ assert!(result.is_err());
+
+ let result =
SedonaContextBuilder::new().with_unspillable_reserve_ratio(1.1);
+ assert!(result.is_err());
+
+ // Edge cases: 0.0 and 1.0 should be valid
+ let result =
SedonaContextBuilder::new().with_unspillable_reserve_ratio(0.0);
+ assert!(result.is_ok());
+
+ let result =
SedonaContextBuilder::new().with_unspillable_reserve_ratio(1.0);
+ assert!(result.is_ok());
+ }
+
+ #[test]
+ fn test_from_options() {
+ let mut opts = HashMap::new();
+ opts.insert("memory_limit".to_string(), "4096".to_string());
+ opts.insert("temp_dir".to_string(), "/tmp/spill".to_string());
+ opts.insert("memory_pool_type".to_string(), "fair".to_string());
+ opts.insert("unspillable_reserve_ratio".to_string(),
"0.3".to_string());
+
+ let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+ assert_eq!(builder.memory_limit, Some(4096));
+ assert_eq!(builder.temp_dir, Some("/tmp/spill".to_string()));
+ assert_eq!(builder.pool_type, PoolType::Fair);
+ assert!((builder.unspillable_reserve_ratio - 0.3).abs() <
f64::EPSILON);
+ }
+
+ #[test]
+ fn test_from_options_empty() {
+ let opts = HashMap::new();
+ let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+ assert!(builder.memory_limit.is_none());
+ assert!(builder.temp_dir.is_none());
+ assert_eq!(builder.pool_type, PoolType::Greedy);
+ }
+
+ #[test]
+ fn test_from_options_invalid_memory_limit() {
+ let mut opts = HashMap::new();
+ opts.insert("memory_limit".to_string(), "not_a_number".to_string());
+ assert!(SedonaContextBuilder::from_options(&opts).is_err());
+ }
+
+ #[test]
+ fn test_from_options_human_readable_memory_limit() {
+ let mut opts = HashMap::new();
+ opts.insert("memory_limit".to_string(), "4gb".to_string());
+ let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+ assert_eq!(builder.memory_limit, Some(4 * 1024 * 1024 * 1024));
+
+ let mut opts = HashMap::new();
+ opts.insert("memory_limit".to_string(), "512m".to_string());
+ let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+ assert_eq!(builder.memory_limit, Some(512 * 1024 * 1024));
+
+ let mut opts = HashMap::new();
+ opts.insert("memory_limit".to_string(), "1.5g".to_string());
+ let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+ assert_eq!(
+ builder.memory_limit,
+ Some((1.5 * 1024.0 * 1024.0 * 1024.0) as usize)
+ );
+ }
+
+ #[test]
+ fn test_from_options_invalid_pool_type() {
+ let mut opts = HashMap::new();
+ opts.insert("memory_pool_type".to_string(), "invalid".to_string());
+ assert!(SedonaContextBuilder::from_options(&opts).is_err());
+ }
+
+ #[test]
+ fn test_from_options_invalid_ratio() {
+ let mut opts = HashMap::new();
+ opts.insert("unspillable_reserve_ratio".to_string(),
"2.0".to_string());
+ assert!(SedonaContextBuilder::from_options(&opts).is_err());
+ }
+
+ #[test]
+ fn test_from_options_unrecognized_keys_ignored() {
+ let mut opts = HashMap::new();
+ opts.insert("unknown_key".to_string(), "value".to_string());
+ let builder = SedonaContextBuilder::from_options(&opts).unwrap();
+ assert!(builder.memory_limit.is_none());
+ }
+
+ #[test]
+ fn test_build_runtime_env_no_memory_limit() {
+ let builder = SedonaContextBuilder::new();
+ let result = builder.build_runtime_env();
+ assert!(result.is_ok());
+ }
+
+ #[test]
+ fn test_build_runtime_env_with_greedy_pool() {
+ let builder = SedonaContextBuilder::new()
+ .with_memory_limit(1024 * 1024)
+ .with_pool_type(PoolType::Greedy);
+ let result = builder.build_runtime_env();
+ assert!(result.is_ok());
+ }
+
+ #[test]
+ fn test_build_runtime_env_with_fair_pool() {
+ let builder = SedonaContextBuilder::new()
+ .with_memory_limit(1024 * 1024)
+ .with_pool_type(PoolType::Fair)
+ .with_unspillable_reserve_ratio(0.2)
+ .unwrap();
+ let result = builder.build_runtime_env();
+ assert!(result.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_build_context_default() {
+ let ctx = SedonaContextBuilder::new().build().await;
+ assert!(ctx.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_build_context_with_memory_limit() {
+ let ctx = SedonaContextBuilder::new()
+ .with_memory_limit(100 * 1024 * 1024)
+ .with_pool_type(PoolType::Fair)
+ .with_unspillable_reserve_ratio(0.2)
+ .unwrap()
+ .build()
+ .await;
+ assert!(ctx.is_ok());
+ }
+}
diff --git a/rust/sedona/src/lib.rs b/rust/sedona/src/lib.rs
index 05ce9898..6752c328 100644
--- a/rust/sedona/src/lib.rs
+++ b/rust/sedona/src/lib.rs
@@ -16,6 +16,7 @@
// under the License.
mod catalog;
pub mod context;
+pub mod context_builder;
mod exec;
pub mod memory_pool;
mod object_storage;
@@ -24,3 +25,4 @@ pub mod random_geometry_provider;
pub mod reader;
pub mod record_batch_reader_provider;
pub mod show;
+pub mod size_parser;
diff --git a/rust/sedona/src/size_parser.rs b/rust/sedona/src/size_parser.rs
new file mode 100644
index 00000000..d85d360f
--- /dev/null
+++ b/rust/sedona/src/size_parser.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Parse human-readable size strings (e.g., `"4gb"`, `"512m"`, `"1.5g"`) into
+//! byte counts.
+//!
+//! This is used by the CLI (`--memory-limit 4g`), the Python bindings
+//! (`sd.options.memory_limit = "4gb"`), and
+//!
[`SedonaContextBuilder::from_options`](crate::context_builder::SedonaContextBuilder::from_options).
+
+use std::collections::HashMap;
+use std::sync::LazyLock;
+
+use datafusion::error::{DataFusionError, Result};
+
+#[derive(Debug, Clone, Copy)]
+enum ByteUnit {
+ Byte,
+ KiB,
+ MiB,
+ GiB,
+ TiB,
+}
+
+impl ByteUnit {
+ fn multiplier(&self) -> u64 {
+ match self {
+ ByteUnit::Byte => 1,
+ ByteUnit::KiB => 1 << 10,
+ ByteUnit::MiB => 1 << 20,
+ ByteUnit::GiB => 1 << 30,
+ ByteUnit::TiB => 1 << 40,
+ }
+ }
+}
+
+static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
+ let mut m = HashMap::new();
+ m.insert("b", ByteUnit::Byte);
+ m.insert("k", ByteUnit::KiB);
+ m.insert("kb", ByteUnit::KiB);
+ m.insert("m", ByteUnit::MiB);
+ m.insert("mb", ByteUnit::MiB);
+ m.insert("g", ByteUnit::GiB);
+ m.insert("gb", ByteUnit::GiB);
+ m.insert("t", ByteUnit::TiB);
+ m.insert("tb", ByteUnit::TiB);
+ m
+});
+
+static SUFFIX_REGEX: LazyLock<regex::Regex> =
+ LazyLock::new(|| regex::Regex::new(r"^([0-9.]+)\s*([a-z]+)?$").unwrap());
+
+/// Parse a human-readable size string into a byte count.
+///
+/// Accepts formats like `"4gb"`, `"512m"`, `"1.5g"`, `"4096"`, `"100 mb"`.
+/// The suffix is case-insensitive. When no suffix is provided, the value is
+/// treated as bytes.
+///
+/// # Supported suffixes
+///
+/// | Suffix | Unit |
+/// |-----------|----------|
+/// | `b` | Bytes |
+/// | `k`, `kb` | KiB |
+/// | `m`, `mb` | MiB |
+/// | `g`, `gb` | GiB |
+/// | `t`, `tb` | TiB |
+///
+/// # Examples
+///
+/// ```
+/// use sedona::size_parser::parse_size_string;
+///
+/// assert_eq!(parse_size_string("4gb").unwrap(), 4 * 1024 * 1024 * 1024);
+/// assert_eq!(parse_size_string("512m").unwrap(), 512 * 1024 * 1024);
+/// assert_eq!(parse_size_string("4096").unwrap(), 4096);
+/// ```
+pub fn parse_size_string(size: &str) -> Result<usize> {
+ let lower = size.to_lowercase();
+ if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
+ let num_str = caps.get(1).unwrap().as_str();
+ let num = num_str
+ .parse::<f64>()
+ .map_err(|_| DataFusionError::Configuration(format!("Invalid size
string '{size}'")))?;
+
+ let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
+ let unit = BYTE_SUFFIXES.get(suffix).ok_or_else(|| {
+ DataFusionError::Configuration(format!("Invalid size string
'{size}'"))
+ })?;
+ let total_bytes = num * unit.multiplier() as f64;
+ if !total_bytes.is_finite() || total_bytes > usize::MAX as f64 {
+ return Err(DataFusionError::Configuration(format!(
+ "Size string '{size}' is too large"
+ )));
+ }
+
+ Ok(total_bytes as usize)
+ } else {
+ Err(DataFusionError::Configuration(format!(
+ "Invalid size string '{size}'"
+ )))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn bare_numbers_are_bytes() {
+ assert_eq!(parse_size_string("5").unwrap(), 5);
+ assert_eq!(parse_size_string("100").unwrap(), 100);
+ }
+
+ #[test]
+ fn byte_suffix() {
+ assert_eq!(parse_size_string("5b").unwrap(), 5);
+ }
+
+ #[test]
+ fn kib_suffixes() {
+ assert_eq!(parse_size_string("4k").unwrap(), 4 * 1024);
+ assert_eq!(parse_size_string("4kb").unwrap(), 4 * 1024);
+ }
+
+ #[test]
+ fn mib_suffixes() {
+ assert_eq!(parse_size_string("20m").unwrap(), 20 * 1024 * 1024);
+ assert_eq!(parse_size_string("20mb").unwrap(), 20 * 1024 * 1024);
+ }
+
+ #[test]
+ fn gib_suffixes() {
+ assert_eq!(parse_size_string("2g").unwrap(), 2 * 1024 * 1024 * 1024);
+ assert_eq!(parse_size_string("2gb").unwrap(), 2 * 1024 * 1024 * 1024);
+ }
+
+ #[test]
+ fn tib_suffixes() {
+ assert_eq!(
+ parse_size_string("3t").unwrap(),
+ 3 * 1024 * 1024 * 1024 * 1024
+ );
+ assert_eq!(
+ parse_size_string("4tb").unwrap(),
+ 4 * 1024 * 1024 * 1024 * 1024
+ );
+ }
+
+ #[test]
+ fn case_insensitive() {
+ assert_eq!(parse_size_string("4K").unwrap(), 4 * 1024);
+ assert_eq!(parse_size_string("4KB").unwrap(), 4 * 1024);
+ assert_eq!(parse_size_string("20M").unwrap(), 20 * 1024 * 1024);
+ assert_eq!(parse_size_string("20MB").unwrap(), 20 * 1024 * 1024);
+ assert_eq!(parse_size_string("2G").unwrap(), 2 * 1024 * 1024 * 1024);
+ assert_eq!(parse_size_string("2GB").unwrap(), 2 * 1024 * 1024 * 1024);
+ assert_eq!(
+ parse_size_string("2T").unwrap(),
+ 2 * 1024 * 1024 * 1024 * 1024
+ );
+ }
+
+ #[test]
+ fn decimal_values() {
+ assert_eq!(
+ parse_size_string("1.5g").unwrap(),
+ (1.5 * 1024.0 * 1024.0 * 1024.0) as usize
+ );
+ assert_eq!(
+ parse_size_string("0.5m").unwrap(),
+ (0.5 * 1024.0 * 1024.0) as usize
+ );
+ assert_eq!(
+ parse_size_string("9.5 gb").unwrap(),
+ (9.5 * 1024.0 * 1024.0 * 1024.0) as usize
+ );
+ }
+
+ #[test]
+ fn spaces_between_number_and_suffix() {
+ assert_eq!(parse_size_string("4 k").unwrap(), 4 * 1024);
+ assert_eq!(parse_size_string("20 mb").unwrap(), 20 * 1024 * 1024);
+ }
+
+ #[test]
+ fn invalid_input() {
+ assert!(parse_size_string("invalid").is_err());
+ assert!(parse_size_string("4kbx").is_err());
+ assert!(parse_size_string("-20mb").is_err());
+ assert!(parse_size_string("-100").is_err());
+ assert!(parse_size_string("12k12k").is_err());
+ }
+
+ #[test]
+ fn overflow() {
+ assert!(parse_size_string("99999999t").is_err());
+ }
+}
diff --git a/sedona-cli/src/main.rs b/sedona-cli/src/main.rs
index 79a7a928..c0964d84 100644
--- a/sedona-cli/src/main.rs
+++ b/sedona-cli/src/main.rs
@@ -15,18 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::env;
-use std::num::NonZeroUsize;
use std::path::Path;
use std::process::ExitCode;
-use std::sync::{Arc, LazyLock};
use datafusion::error::{DataFusionError, Result};
-use datafusion::execution::memory_pool::{GreedyMemoryPool, MemoryPool,
TrackConsumersPool};
-use datafusion::execution::runtime_env::RuntimeEnvBuilder;
-use sedona::context::SedonaContext;
-use sedona::memory_pool::{SedonaFairSpillPool,
DEFAULT_UNSPILLABLE_RESERVE_RATIO};
+use sedona::context_builder::SedonaContextBuilder;
+use sedona::memory_pool::DEFAULT_UNSPILLABLE_RESERVE_RATIO;
use sedona::pool_type::PoolType;
use sedona_cli::{
exec,
@@ -163,27 +158,13 @@ async fn main_inner() -> Result<()> {
env::set_current_dir(p).unwrap();
};
- let mut rt_builder = RuntimeEnvBuilder::new();
- // set memory pool size
+ let mut builder = SedonaContextBuilder::new()
+ .with_pool_type(args.mem_pool_type.clone())
+ .with_unspillable_reserve_ratio(args.unspillable_reserve_ratio)?;
if let Some(memory_limit) = args.memory_limit {
- // set memory pool type
- let track_capacity = NonZeroUsize::new(10).expect("track capacity must
be non-zero");
- let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
- PoolType::Fair => Arc::new(TrackConsumersPool::new(
- SedonaFairSpillPool::new(memory_limit,
args.unspillable_reserve_ratio),
- track_capacity,
- )),
- PoolType::Greedy => Arc::new(TrackConsumersPool::new(
- GreedyMemoryPool::new(memory_limit),
- track_capacity,
- )),
- };
-
- rt_builder = rt_builder.with_memory_pool(pool)
+ builder = builder.with_memory_limit(memory_limit);
}
- let runtime_env = rt_builder.build_arc()?;
-
- let ctx =
SedonaContext::new_local_interactive_with_runtime_env(runtime_env).await?;
+ let ctx = builder.build().await?;
let mut print_options = PrintOptions {
format: args.format,
@@ -238,69 +219,8 @@ fn parse_command(command: &str) -> Result<String, String> {
}
}
-#[derive(Debug, Clone, Copy)]
-enum ByteUnit {
- Byte,
- KiB,
- MiB,
- GiB,
- TiB,
-}
-
-impl ByteUnit {
- fn multiplier(&self) -> u64 {
- match self {
- ByteUnit::Byte => 1,
- ByteUnit::KiB => 1 << 10,
- ByteUnit::MiB => 1 << 20,
- ByteUnit::GiB => 1 << 30,
- ByteUnit::TiB => 1 << 40,
- }
- }
-}
-
-fn parse_size_string(size: &str, label: &str) -> Result<usize, String> {
- static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
- let mut m = HashMap::new();
- m.insert("b", ByteUnit::Byte);
- m.insert("k", ByteUnit::KiB);
- m.insert("kb", ByteUnit::KiB);
- m.insert("m", ByteUnit::MiB);
- m.insert("mb", ByteUnit::MiB);
- m.insert("g", ByteUnit::GiB);
- m.insert("gb", ByteUnit::GiB);
- m.insert("t", ByteUnit::TiB);
- m.insert("tb", ByteUnit::TiB);
- m
- });
-
- static SUFFIX_REGEX: LazyLock<regex::Regex> =
- LazyLock::new(||
regex::Regex::new(r"^([0-9.]+)\s*([a-z]+)?$").unwrap());
-
- let lower = size.to_lowercase();
- if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
- let num_str = caps.get(1).unwrap().as_str();
- let num = num_str
- .parse::<f64>()
- .map_err(|_| format!("Invalid numeric value in {label}
'{size}'"))?;
-
- let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
- let unit = BYTE_SUFFIXES
- .get(suffix)
- .ok_or_else(|| format!("Invalid {label} '{size}'"))?;
- let total_bytes = num * unit.multiplier() as f64;
- if !total_bytes.is_finite() || total_bytes > usize::MAX as f64 {
- return Err(format!("{label} '{size}' is too large"));
- }
-
- Ok(total_bytes as usize)
- } else {
- Err(format!("Invalid {label} '{size}'"))
- }
-}
-
pub fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
- parse_size_string(size, "memory pool size")
+ sedona::size_parser::parse_size_string(size).map_err(|e| e.to_string())
}
fn validate_unspillable_reserve_ratio(s: &str) -> Result<f64, String> {
@@ -314,73 +234,3 @@ fn validate_unspillable_reserve_ratio(s: &str) ->
Result<f64, String> {
}
Ok(value)
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- fn assert_conversion(input: &str, expected: Result<usize, String>) {
- let result = extract_memory_pool_size(input);
- match expected {
- Ok(v) => assert_eq!(result.unwrap(), v),
- Err(e) => assert_eq!(result.unwrap_err(), e),
- }
- }
-
- #[test]
- fn memory_pool_size() -> Result<(), String> {
- // Test basic sizes without suffix, assumed to be bytes
- assert_conversion("5", Ok(5));
- assert_conversion("100", Ok(100));
-
- // Test various units
- assert_conversion("5b", Ok(5));
- assert_conversion("4k", Ok(4 * 1024));
- assert_conversion("4kb", Ok(4 * 1024));
- assert_conversion("20m", Ok(20 * 1024 * 1024));
- assert_conversion("20mb", Ok(20 * 1024 * 1024));
- assert_conversion("2g", Ok(2 * 1024 * 1024 * 1024));
- assert_conversion("2gb", Ok(2 * 1024 * 1024 * 1024));
- assert_conversion("3t", Ok(3 * 1024 * 1024 * 1024 * 1024));
- assert_conversion("4tb", Ok(4 * 1024 * 1024 * 1024 * 1024));
-
- // Test case insensitivity
- assert_conversion("4K", Ok(4 * 1024));
- assert_conversion("4KB", Ok(4 * 1024));
- assert_conversion("20M", Ok(20 * 1024 * 1024));
- assert_conversion("20MB", Ok(20 * 1024 * 1024));
- assert_conversion("2G", Ok(2 * 1024 * 1024 * 1024));
- assert_conversion("2GB", Ok(2 * 1024 * 1024 * 1024));
- assert_conversion("2T", Ok(2 * 1024 * 1024 * 1024 * 1024));
-
- // Test decimal values
- assert_conversion("1.5g", Ok((1.5 * 1024.0 * 1024.0 * 1024.0) as
usize));
- assert_conversion("0.5m", Ok((0.5 * 1024.0 * 1024.0) as usize));
- assert_conversion("9.5 gb", Ok((9.5 * 1024.0 * 1024.0 * 1024.0) as
usize));
-
- // Test with spaces between number and suffix
- assert_conversion("4 k", Ok(4 * 1024));
- assert_conversion("20 mb", Ok(20 * 1024 * 1024));
-
- // Test invalid input
- assert_conversion(
- "invalid",
- Err("Invalid memory pool size 'invalid'".to_string()),
- );
- assert_conversion("4kbx", Err("Invalid memory pool size
'4kbx'".to_string()));
- assert_conversion("-20mb", Err("Invalid memory pool size
'-20mb'".to_string()));
- assert_conversion("-100", Err("Invalid memory pool size
'-100'".to_string()));
- assert_conversion(
- "12k12k",
- Err("Invalid memory pool size '12k12k'".to_string()),
- );
-
- // Test overflow
- assert_conversion(
- "99999999t",
- Err("memory pool size '99999999t' is too large".to_string()),
- );
-
- Ok(())
- }
-}