This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2503bda50c feat: Add memory pool configuration to `datafusion-cli` 
(#7424)
2503bda50c is described below

commit 2503bda50ced1bc983280e0b7783a482bc1d5714
Author: Alex Huang <[email protected]>
AuthorDate: Mon Aug 28 18:43:38 2023 +0800

    feat: Add memory pool configuration to `datafusion-cli` (#7424)
    
    * support memory-limit
    
    * update doc
    
    * avoid unwrap
    
    * support memory pool type setting
    
    * format doc
    
    * fix else case
    
    * update doc
    
    * refactor
---
 datafusion-cli/src/main.rs    | 92 +++++++++++++++++++++++++++++++++++++++++--
 docs/source/user-guide/cli.md | 20 +++++-----
 2 files changed, 100 insertions(+), 12 deletions(-)

diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index aea499d603..8429738a09 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -18,6 +18,7 @@
 use clap::Parser;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionConfig;
+use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::prelude::SessionContext;
 use datafusion_cli::catalog::DynamicFileCatalog;
@@ -27,11 +28,30 @@ use datafusion_cli::{
 use mimalloc::MiMalloc;
 use std::env;
 use std::path::Path;
+use std::str::FromStr;
 use std::sync::Arc;
 
 #[global_allocator]
 static GLOBAL: MiMalloc = MiMalloc;
 
+#[derive(PartialEq, Debug)]
+enum PoolType {
+    Greedy,
+    Fair,
+}
+
+impl FromStr for PoolType {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s {
+            "Greedy" | "greedy" => Ok(PoolType::Greedy),
+            "Fair" | "fair" => Ok(PoolType::Fair),
+            _ => Err(format!("Invalid memory pool type '{}'", s)),
+        }
+    }
+}
+
 #[derive(Debug, Parser, PartialEq)]
 #[clap(author, version, about, long_about= None)]
 struct Args {
@@ -59,6 +79,14 @@ struct Args {
     )]
     command: Vec<String>,
 
+    #[clap(
+        short = 'm',
+        long,
+        help = "The memory pool limitation (e.g. '10g'), default to None (no 
limit)",
+        validator(is_valid_memory_pool_size)
+    )]
+    memory_limit: Option<String>,
+
     #[clap(
         short,
         long,
@@ -87,6 +115,12 @@ struct Args {
         help = "Reduce printing other than the results and work quietly"
     )]
     quiet: bool,
+
+    #[clap(
+        long,
+        help = "Specify the memory pool type 'greedy' or 'fair', default to 
'greedy'"
+    )]
+    mem_pool_type: Option<PoolType>,
 }
 
 #[tokio::main]
@@ -109,7 +143,29 @@ pub async fn main() -> Result<()> {
         session_config = session_config.with_batch_size(batch_size);
     };
 
-    let runtime_env = create_runtime_env()?;
+    let rn_config = RuntimeConfig::new();
+    let rn_config =
+        // set memory pool size
+        if let Some(memory_limit) = args.memory_limit {
+            let memory_limit = 
extract_memory_pool_size(&memory_limit).unwrap();
+            // set memory pool type
+            if let Some(mem_pool_type) = args.mem_pool_type {
+                match mem_pool_type {
+                    PoolType::Greedy => rn_config
+                        
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit))),
+                    PoolType::Fair => rn_config
+                        
.with_memory_pool(Arc::new(FairSpillPool::new(memory_limit))),
+                }
+            } else {
+                rn_config
+                
.with_memory_pool(Arc::new(GreedyMemoryPool::new(memory_limit)))
+            }
+        } else {
+            rn_config
+        };
+
+    let runtime_env = create_runtime_env(rn_config.clone())?;
+
     let mut ctx =
         SessionContext::with_config_rt(session_config.clone(), 
Arc::new(runtime_env));
     ctx.refresh_catalogs().await?;
@@ -162,8 +218,7 @@ pub async fn main() -> Result<()> {
     Ok(())
 }
 
-fn create_runtime_env() -> Result<RuntimeEnv> {
-    let rn_config = RuntimeConfig::new();
+fn create_runtime_env(rn_config: RuntimeConfig) -> Result<RuntimeEnv> {
     RuntimeEnv::new(rn_config)
 }
 
@@ -189,3 +244,34 @@ fn is_valid_batch_size(size: &str) -> Result<(), String> {
         _ => Err(format!("Invalid batch size '{}'", size)),
     }
 }
+
+fn is_valid_memory_pool_size(size: &str) -> Result<(), String> {
+    match extract_memory_pool_size(size) {
+        Ok(_) => Ok(()),
+        Err(e) => Err(e),
+    }
+}
+
+fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
+    let mut size = size;
+    let factor = if let Some(last_char) = size.chars().last() {
+        match last_char {
+            'm' | 'M' => {
+                size = &size[..size.len() - 1];
+                1024 * 1024
+            }
+            'g' | 'G' => {
+                size = &size[..size.len() - 1];
+                1024 * 1024 * 1024
+            }
+            _ => 1,
+        }
+    } else {
+        return Err(format!("Invalid memory pool size '{}'", size));
+    };
+
+    match size.parse::<usize>() {
+        Ok(size) if size > 0 => Ok(factor * size),
+        _ => Err(format!("Invalid memory pool size '{}'", size)),
+    }
+}
diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md
index 3d869d5a7e..e3a8cd74c3 100644
--- a/docs/source/user-guide/cli.md
+++ b/docs/source/user-guide/cli.md
@@ -118,15 +118,17 @@ USAGE:
     datafusion-cli [OPTIONS]
 
 OPTIONS:
-    -c, --batch-size <BATCH_SIZE>    The batch size of each query, or use 
DataFusion default
-    -f, --file <FILE>...             Execute commands from file(s), then exit
-        --format <FORMAT>            [default: table] [possible values: csv, 
tsv, table, json,
-                                     nd-json]
-    -h, --help                       Print help information
-    -p, --data-path <DATA_PATH>      Path to your data, default to current 
directory
-    -q, --quiet                      Reduce printing other than the results 
and work quietly
-    -r, --rc <RC>...                 Run the provided files on startup instead 
of ~/.datafusionrc
-    -V, --version                    Print version information
+    -c, --batch-size <BATCH_SIZE>           The batch size of each query, or 
use DataFusion default
+    -f, --file <FILE>...                    Execute commands from file(s), 
then exit
+        --format <FORMAT>                   [default: table] [possible values: 
csv, tsv, table, json,
+                                            nd-json]
+    -h, --help                              Print help information
+    -m, --memory-limit <MEMORY_LIMIT>       The memory pool limitation (e.g. 
'10g'), default to None (no limit)
+        --mem-pool-type <MEM_POOL_TYPE>     Specify the memory pool type 
'greedy' or 'fair', default to 'greedy'
+    -p, --data-path <DATA_PATH>             Path to your data, default to 
current directory
+    -q, --quiet                             Reduce printing other than the 
results and work quietly
+    -r, --rc <RC>...                        Run the provided files on startup 
instead of ~/.datafusionrc
+    -V, --version                           Print version information
 ```
 
 ## Selecting files directly

Reply via email to