This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2503bda50c feat: Add memory pool configuration to `datafusion-cli`
(#7424)
2503bda50c is described below
commit 2503bda50ced1bc983280e0b7783a482bc1d5714
Author: Alex Huang <[email protected]>
AuthorDate: Mon Aug 28 18:43:38 2023 +0800
feat: Add memory pool configuration to `datafusion-cli` (#7424)
* support memory-limit
* update doc
* avoid unwrap
* support memory pool type setting
* format doc
* fix else case
* update doc
* refactor
---
datafusion-cli/src/main.rs | 92 +++++++++++++++++++++++++++++++++++++++++--
docs/source/user-guide/cli.md | 20 +++++-----
2 files changed, 100 insertions(+), 12 deletions(-)
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index aea499d603..8429738a09 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -18,6 +18,7 @@
use clap::Parser;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
+use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
@@ -27,11 +28,30 @@ use datafusion_cli::{
use mimalloc::MiMalloc;
use std::env;
use std::path::Path;
+use std::str::FromStr;
use std::sync::Arc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
+#[derive(PartialEq, Debug)]
+enum PoolType {
+ Greedy,
+ Fair,
+}
+
+impl FromStr for PoolType {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s {
+ "Greedy" | "greedy" => Ok(PoolType::Greedy),
+ "Fair" | "fair" => Ok(PoolType::Fair),
+ _ => Err(format!("Invalid memory pool type '{}'", s)),
+ }
+ }
+}
+
#[derive(Debug, Parser, PartialEq)]
#[clap(author, version, about, long_about= None)]
struct Args {
@@ -59,6 +79,14 @@ struct Args {
)]
command: Vec<String>,
+ #[clap(
+ short = 'm',
+ long,
+ help = "The memory pool limitation (e.g. '10g'), default to None (no
limit)",
+ validator(is_valid_memory_pool_size)
+ )]
+ memory_limit: Option<String>,
+
#[clap(
short,
long,
@@ -87,6 +115,12 @@ struct Args {
help = "Reduce printing other than the results and work quietly"
)]
quiet: bool,
+
+ #[clap(
+ long,
+ help = "Specify the memory pool type 'greedy' or 'fair', default to
'greedy'"
+ )]
+ mem_pool_type: Option<PoolType>,
}
#[tokio::main]
@@ -109,7 +143,29 @@ pub async fn main() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};
- let runtime_env = create_runtime_env()?;
+ let rn_config = RuntimeConfig::new();
+ let rn_config =
+ // set memory pool size
+ if let Some(memory_limit) = args.memory_limit {
+ let memory_limit =
extract_memory_pool_size(&memory_limit).unwrap();
+ // set memory pool type
+ if let Some(mem_pool_type) = args.mem_pool_type {
+ match mem_pool_type {
+ PoolType::Greedy => rn_config
+
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))),
+ PoolType::Fair => rn_config
+
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
+ }
+ } else {
+ rn_config
+
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
+ }
+ } else {
+ rn_config
+ };
+
+ let runtime_env = create_runtime_env(rn_config.clone())?;
+
let mut ctx =
SessionContext::with_config_rt(session_config.clone(),
Arc::new(runtime_env));
ctx.refresh_catalogs().await?;
@@ -162,8 +218,7 @@ pub async fn main() -> Result<()> {
Ok(())
}
-fn create_runtime_env() -> Result<RuntimeEnv> {
- let rn_config = RuntimeConfig::new();
+fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
RuntimeEnv::new(rn_config)
}
@@ -189,3 +244,34 @@ fn is_valid_batch_size(size: &str) -> Result<(), String> {
_ => Err(format!("Invalid batch size '{}'", size)),
}
}
+
+fn is_valid_memory_pool_size(size: &str) -> Result<(), String> {
+ match extract_memory_pool_size(size) {
+ Ok(_) => Ok(()),
+ Err(e) => Err(e),
+ }
+}
+
+fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
+ let mut size = size;
+ let factor = if let Some(last_char) = size.chars().last() {
+ match last_char {
+ 'm' | 'M' => {
+ size = &size[..size.len() - 1];
+ 1024 * 1024
+ }
+ 'g' | 'G' => {
+ size = &size[..size.len() - 1];
+ 1024 * 1024 * 1024
+ }
+ _ => 1,
+ }
+ } else {
+ return Err(format!("Invalid memory pool size '{}'", size));
+ };
+
+ match size.parse::<usize>() {
+ Ok(size) if size > 0 => Ok(factor * size),
+ _ => Err(format!("Invalid memory pool size '{}'", size)),
+ }
+}
diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md
index 3d869d5a7e..e3a8cd74c3 100644
--- a/docs/source/user-guide/cli.md
+++ b/docs/source/user-guide/cli.md
@@ -118,15 +118,17 @@ USAGE:
datafusion-cli [OPTIONS]
OPTIONS:
- -c, --batch-size <BATCH_SIZE> The batch size of each query, or use
DataFusion default
- -f, --file <FILE>... Execute commands from file(s), then exit
- --format <FORMAT> [default: table] [possible values: csv,
tsv, table, json,
- nd-json]
- -h, --help Print help information
- -p, --data-path <DATA_PATH> Path to your data, default to current
directory
- -q, --quiet Reduce printing other than the results
and work quietly
- -r, --rc <RC>... Run the provided files on startup instead
of ~/.datafusionrc
- -V, --version Print version information
+ -c, --batch-size <BATCH_SIZE> The batch size of each query, or
use DataFusion default
+ -f, --file <FILE>... Execute commands from file(s),
then exit
+ --format <FORMAT> [default: table] [possible values:
csv, tsv, table, json,
+ nd-json]
+ -h, --help Print help information
+ -m, --memory-limit <MEMORY_LIMIT> The memory pool limitation (e.g.
'10g'), default to None (no limit)
+ --mem-pool-type <MEM_POOL_TYPE> Specify the memory pool type
'greedy' or 'fair', default to 'greedy'
+ -p, --data-path <DATA_PATH> Path to your data, default to
current directory
+ -q, --quiet Reduce printing other than the
results and work quietly
+ -r, --rc <RC>... Run the provided files on startup
instead of ~/.datafusionrc
+ -V, --version Print version information
```
## Selecting files directly