This is an automated email from the ASF dual-hosted git repository.
ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6afd539704 Add disk usage limit configuration to datafusion-cli
(#15586)
6afd539704 is described below
commit 6afd5397045840d728de02a5f148b66896b397b3
Author: Jyotir Sai <[email protected]>
AuthorDate: Sat Apr 5 21:52:50 2025 -0600
Add disk usage limit configuration to datafusion-cli (#15586)
* added disk limit option
* run prettier and cargo fmt
* help line update
---
datafusion-cli/src/main.rs | 50 +++++++++++++++++++++++++++++--------
docs/source/user-guide/cli/usage.md | 3 +++
2 files changed, 43 insertions(+), 10 deletions(-)
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 0b7a98f652..dad2d15f01 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -25,6 +25,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool,
MemoryPool};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
+use datafusion::execution::DiskManager;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::ParquetMetadataFunc;
@@ -39,6 +40,7 @@ use datafusion_cli::{
use clap::Parser;
use datafusion::common::config_err;
use datafusion::config::ConfigOptions;
+use datafusion::execution::disk_manager::DiskManagerConfig;
use mimalloc::MiMalloc;
#[global_allocator]
@@ -125,6 +127,14 @@ struct Args {
#[clap(long, help = "Enables console syntax highlighting")]
color: bool,
+
+ #[clap(
+ short = 'd',
+ long,
+ help = "Available disk space for spilling queries (e.g. '10g'),
default to None (uses DataFusion's default value of '100g')",
+ value_parser(extract_disk_limit)
+ )]
+ disk_limit: Option<usize>,
}
#[tokio::main]
@@ -165,6 +175,18 @@ async fn main_inner() -> Result<()> {
rt_builder = rt_builder.with_memory_pool(pool)
}
+ // set disk limit
+ if let Some(disk_limit) = args.disk_limit {
+ let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
+
+ let disk_manager = Arc::try_unwrap(disk_manager)
+ .expect("DiskManager should be a single instance")
+ .with_max_temp_directory_size(disk_limit.try_into().unwrap())?;
+
+ let disk_config =
DiskManagerConfig::new_existing(Arc::new(disk_manager));
+ rt_builder = rt_builder.with_disk_manager(disk_config);
+ }
+
let runtime_env = rt_builder.build_arc()?;
// enable dynamic file query
@@ -300,7 +322,7 @@ impl ByteUnit {
}
}
-fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
+fn parse_size_string(size: &str, label: &str) -> Result<usize, String> {
static BYTE_SUFFIXES: LazyLock<HashMap<&'static str, ByteUnit>> =
LazyLock::new(|| {
let mut m = HashMap::new();
@@ -322,25 +344,33 @@ fn extract_memory_pool_size(size: &str) -> Result<usize,
String> {
let lower = size.to_lowercase();
if let Some(caps) = SUFFIX_REGEX.captures(&lower) {
let num_str = caps.get(1).unwrap().as_str();
- let num = num_str.parse::<usize>().map_err(|_| {
- format!("Invalid numeric value in memory pool size '{}'", size)
- })?;
+ let num = num_str
+ .parse::<usize>()
+ .map_err(|_| format!("Invalid numeric value in {} '{}'", label,
size))?;
let suffix = caps.get(2).map(|m| m.as_str()).unwrap_or("b");
- let unit = &BYTE_SUFFIXES
+ let unit = BYTE_SUFFIXES
.get(suffix)
- .ok_or_else(|| format!("Invalid memory pool size '{}'", size))?;
- let memory_pool_size = usize::try_from(unit.multiplier())
+ .ok_or_else(|| format!("Invalid {} '{}'", label, size))?;
+ let total_bytes = usize::try_from(unit.multiplier())
.ok()
.and_then(|multiplier| num.checked_mul(multiplier))
- .ok_or_else(|| format!("Memory pool size '{}' is too large",
size))?;
+ .ok_or_else(|| format!("{} '{}' is too large", label, size))?;
- Ok(memory_pool_size)
+ Ok(total_bytes)
} else {
- Err(format!("Invalid memory pool size '{}'", size))
+ Err(format!("Invalid {} '{}'", label, size))
}
}
+pub fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
+ parse_size_string(size, "memory pool size")
+}
+
+pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
+ parse_size_string(size, "disk limit")
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/docs/source/user-guide/cli/usage.md
b/docs/source/user-guide/cli/usage.md
index fb238dad10..68b09d3199 100644
--- a/docs/source/user-guide/cli/usage.md
+++ b/docs/source/user-guide/cli/usage.md
@@ -57,6 +57,9 @@ OPTIONS:
--mem-pool-type <MEM_POOL_TYPE>
Specify the memory pool type 'greedy' or 'fair', default to
'greedy'
+ -d, --disk-limit <DISK_LIMIT>
+ Available disk space for spilling queries (e.g. '10g'), default to
None (uses DataFusion's default value of '100g')
+
-p, --data-path <DATA_PATH>
Path to your data, default to current directory
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]