This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7153fac Improve configuration and resource use of `MemoryManager` and
`DiskManager` (#1668)
7153fac is described below
commit 7153fac7d8ff77c71a67cd2ba3ec55d94a4cd794
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jan 25 16:34:24 2022 -0500
Improve configuration and resource use of `MemoryManager` and `DiskManager`
(#1668)
* Improve configuration and resource use of `MemoryManager` and
`DiskManager`
* fmt
---
ballista/rust/executor/src/executor.rs | 8 +-
datafusion/src/execution/context.rs | 94 +++++++++++++++-
datafusion/src/execution/disk_manager.rs | 106 +++++++++++++-----
datafusion/src/execution/memory_manager.rs | 165 ++++++++++++++++++++++++-----
datafusion/src/execution/mod.rs | 3 +
datafusion/src/execution/runtime_env.rs | 81 ++++++--------
datafusion/src/physical_plan/sorts/sort.rs | 10 +-
7 files changed, 355 insertions(+), 112 deletions(-)
diff --git a/ballista/rust/executor/src/executor.rs
b/ballista/rust/executor/src/executor.rs
index e7479bd..6bf1aeb 100644
--- a/ballista/rust/executor/src/executor.rs
+++ b/ballista/rust/executor/src/executor.rs
@@ -24,9 +24,10 @@ use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::scheduler::ExecutorSpecification;
use datafusion::error::DataFusionError;
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
+use datafusion::prelude::ExecutionConfig;
/// Ballista executor
pub struct Executor {
@@ -87,9 +88,8 @@ impl Executor {
))
}?;
- let runtime_config =
- RuntimeConfig::new().with_local_dirs(vec![self.work_dir.clone()]);
- let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
+ let config =
ExecutionConfig::new().with_temp_file_path(self.work_dir.clone());
+ let runtime = Arc::new(RuntimeEnv::new(config.runtime)?);
let partitions = exec.execute_shuffle_write(part, runtime).await?;
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index ceea83d..61cbf3a 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -39,7 +39,6 @@ use crate::{
},
};
use log::debug;
-use std::fs;
use std::path::Path;
use std::string::String;
use std::sync::Arc;
@@ -47,6 +46,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};
+use std::{fs, path::PathBuf};
use futures::{StreamExt, TryStreamExt};
use tokio::task::{self, JoinHandle};
@@ -94,7 +94,12 @@ use chrono::{DateTime, Utc};
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
-use super::options::{AvroReadOptions, CsvReadOptions};
+use super::{
+ disk_manager::DiskManagerConfig,
+ memory_manager::MemoryManagerConfig,
+ options::{AvroReadOptions, CsvReadOptions},
+ DiskManager, MemoryManager,
+};
/// ExecutionContext is the main interface for executing queries with
DataFusion. The context
/// provides the following functionality:
@@ -195,6 +200,11 @@ impl ExecutionContext {
}
}
+ /// Return the [RuntimeEnv] used to run queries with this
[ExecutionContext]
+ pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
+ self.state.lock().unwrap().runtime_env.clone()
+ }
+
/// Creates a dataframe that will execute a SQL query.
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
@@ -718,7 +728,7 @@ impl ExecutionContext {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
- let runtime = self.state.lock().unwrap().runtime_env.clone();
+ let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
@@ -758,7 +768,7 @@ impl ExecutionContext {
let path = path.as_ref();
// create directory to contain the Parquet files (one per partition)
let fs_path = Path::new(path);
- let runtime = self.state.lock().unwrap().runtime_env.clone();
+ let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
@@ -1057,6 +1067,48 @@ impl ExecutionConfig {
self.runtime = config;
self
}
+
+ /// Use an an existing [MemoryManager]
+ pub fn with_existing_memory_manager(mut self, existing:
Arc<MemoryManager>) -> Self {
+ self.runtime = self
+ .runtime
+ .with_memory_manager(MemoryManagerConfig::new_existing(existing));
+ self
+ }
+
+ /// Specify the total memory to use while running the DataFusion
+ /// plan to `max_memory * memory_fraction` in bytes.
+ ///
+ /// Note DataFusion does not yet respect this limit in all cases.
+ pub fn with_memory_limit(
+ mut self,
+ max_memory: usize,
+ memory_fraction: f64,
+ ) -> Result<Self> {
+ self.runtime =
+ self.runtime
+ .with_memory_manager(MemoryManagerConfig::try_new_limit(
+ max_memory,
+ memory_fraction,
+ )?);
+ Ok(self)
+ }
+
+ /// Use an an existing [DiskManager]
+ pub fn with_existing_disk_manager(mut self, existing: Arc<DiskManager>) ->
Self {
+ self.runtime = self
+ .runtime
+ .with_disk_manager(DiskManagerConfig::new_existing(existing));
+ self
+ }
+
+ /// Use the specified path to create any needed temporary files
+ pub fn with_temp_file_path(mut self, path: impl Into<PathBuf>) -> Self {
+ self.runtime = self
+ .runtime
+
.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]));
+ self
+ }
}
/// Holds per-execution properties and data (such as starting timestamps, etc).
@@ -1246,6 +1298,40 @@ mod tests {
use tempfile::TempDir;
use test::*;
+ #[tokio::test]
+ async fn shared_memory_and_disk_manager() {
+ // Demonstrate the ability to share DiskManager and
+ // MemoryManager between two different executions.
+ let ctx1 = ExecutionContext::new();
+
+ // configure with same memory / disk manager
+ let memory_manager = ctx1.runtime_env().memory_manager.clone();
+ let disk_manager = ctx1.runtime_env().disk_manager.clone();
+ let config = ExecutionConfig::new()
+ .with_existing_memory_manager(memory_manager.clone())
+ .with_existing_disk_manager(disk_manager.clone());
+
+ let ctx2 = ExecutionContext::with_config(config);
+
+ assert!(std::ptr::eq(
+ Arc::as_ptr(&memory_manager),
+ Arc::as_ptr(&ctx1.runtime_env().memory_manager)
+ ));
+ assert!(std::ptr::eq(
+ Arc::as_ptr(&memory_manager),
+ Arc::as_ptr(&ctx2.runtime_env().memory_manager)
+ ));
+
+ assert!(std::ptr::eq(
+ Arc::as_ptr(&disk_manager),
+ Arc::as_ptr(&ctx1.runtime_env().disk_manager)
+ ));
+ assert!(std::ptr::eq(
+ Arc::as_ptr(&disk_manager),
+ Arc::as_ptr(&ctx2.runtime_env().disk_manager)
+ ));
+ }
+
#[test]
fn optimize_explain() {
let schema = Schema::new(vec![Field::new("id", DataType::Int32,
false)]);
diff --git a/datafusion/src/execution/disk_manager.rs
b/datafusion/src/execution/disk_manager.rs
index c4a6b1d..c98df3b 100644
--- a/datafusion/src/execution/disk_manager.rs
+++ b/datafusion/src/execution/disk_manager.rs
@@ -19,30 +19,86 @@
//! hashed among the directories listed in RuntimeConfig::local_dirs.
use crate::error::{DataFusionError, Result};
-use log::info;
+use log::{debug, info};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::hash_map::DefaultHasher;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::path::{Path, PathBuf};
+use std::sync::Arc;
use tempfile::{Builder, TempDir};
+/// Configuration for temporary disk access
+#[derive(Debug, Clone)]
+pub enum DiskManagerConfig {
+ /// Use the provided [DiskManager] instance
+ Existing(Arc<DiskManager>),
+
+ /// Create a new [DiskManager] that creates temporary files within
+ /// a temporary directory chosen by the OS
+ NewOs,
+
+ /// Create a new [DiskManager] that creates temporary files within
+ /// the specified directories
+ NewSpecified(Vec<PathBuf>),
+}
+
+impl Default for DiskManagerConfig {
+ fn default() -> Self {
+ Self::NewOs
+ }
+}
+
+impl DiskManagerConfig {
+ /// Create temporary files in a temporary directory chosen by the OS
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Create temporary files using the provided disk manager
+ pub fn new_existing(existing: Arc<DiskManager>) -> Self {
+ Self::Existing(existing)
+ }
+
+ /// Create temporary files in the specified directories
+ pub fn new_specified(paths: Vec<PathBuf>) -> Self {
+ Self::NewSpecified(paths)
+ }
+}
+
/// Manages files generated during query execution, e.g. spill files generated
/// while processing dataset larger than available memory.
+#[derive(Debug)]
pub struct DiskManager {
local_dirs: Vec<TempDir>,
}
impl DiskManager {
- /// Create local dirs inside user provided dirs through conf
- pub fn new(conf_dirs: &[String]) -> Result<Self> {
- let local_dirs = create_local_dirs(conf_dirs)?;
- info!(
- "Created local dirs {:?} as DataFusion working directory",
- local_dirs
- );
- Ok(Self { local_dirs })
+ /// Create a DiskManager given the configuration
+ pub fn try_new(config: DiskManagerConfig) -> Result<Arc<Self>> {
+ match config {
+ DiskManagerConfig::Existing(manager) => Ok(manager),
+ DiskManagerConfig::NewOs => {
+ let tempdir =
tempfile::tempdir().map_err(DataFusionError::IoError)?;
+
+ debug!(
+ "Created directory {:?} as DataFusion working directory",
+ tempdir
+ );
+ Ok(Arc::new(Self {
+ local_dirs: vec![tempdir],
+ }))
+ }
+ DiskManagerConfig::NewSpecified(conf_dirs) => {
+ let local_dirs = create_local_dirs(conf_dirs)?;
+ info!(
+ "Created local dirs {:?} as DataFusion working directory",
+ local_dirs
+ );
+ Ok(Arc::new(Self { local_dirs }))
+ }
+ }
}
/// Create a file in conf dirs in randomized manner and return the file
path
@@ -52,20 +108,18 @@ impl DiskManager {
}
/// Setup local dirs by creating one new dir in each of the given dirs
-fn create_local_dirs(local_dir: &[String]) -> Result<Vec<TempDir>> {
- local_dir
+fn create_local_dirs(local_dirs: Vec<PathBuf>) -> Result<Vec<TempDir>> {
+ local_dirs
.iter()
- .map(|root| create_dir(root, "datafusion-"))
+ .map(|root| {
+ Builder::new()
+ .prefix("datafusion-")
+ .tempdir_in(root)
+ .map_err(DataFusionError::IoError)
+ })
.collect()
}
-fn create_dir(root: &str, prefix: &str) -> Result<TempDir> {
- Builder::new()
- .prefix(prefix)
- .tempdir_in(root)
- .map_err(DataFusionError::IoError)
-}
-
fn get_file(file_name: &str, local_dirs: &[TempDir]) -> String {
let mut hasher = DefaultHasher::new();
file_name.hash(&mut hasher);
@@ -98,8 +152,8 @@ fn rand_name() -> String {
#[cfg(test)]
mod tests {
+ use super::*;
use crate::error::Result;
- use crate::execution::disk_manager::{get_file, DiskManager};
use tempfile::TempDir;
#[test]
@@ -107,13 +161,13 @@ mod tests {
let local_dir1 = TempDir::new()?;
let local_dir2 = TempDir::new()?;
let local_dir3 = TempDir::new()?;
- let local_dirs = vec![
- local_dir1.path().to_str().unwrap().to_string(),
- local_dir2.path().to_str().unwrap().to_string(),
- local_dir3.path().to_str().unwrap().to_string(),
- ];
+ let config = DiskManagerConfig::new_specified(vec![
+ local_dir1.path().into(),
+ local_dir2.path().into(),
+ local_dir3.path().into(),
+ ]);
- let dm = DiskManager::new(&local_dirs)?;
+ let dm = DiskManager::try_new(config)?;
let actual = dm.create_tmp_file()?;
let name = actual.rsplit_once(std::path::MAIN_SEPARATOR).unwrap().1;
diff --git a/datafusion/src/execution/memory_manager.rs
b/datafusion/src/execution/memory_manager.rs
index caa597b..32f7975 100644
--- a/datafusion/src/execution/memory_manager.rs
+++ b/datafusion/src/execution/memory_manager.rs
@@ -17,7 +17,7 @@
//! Manages all available memory during query execution
-use crate::error::Result;
+use crate::error::{DataFusionError, Result};
use async_trait::async_trait;
use hashbrown::HashMap;
use log::info;
@@ -28,6 +28,84 @@ use std::sync::{Arc, Condvar, Mutex, Weak};
static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
+#[derive(Debug, Clone)]
+/// Configuration information for memory management
+pub enum MemoryManagerConfig {
+ /// Use the existing [MemoryManager]
+ Existing(Arc<MemoryManager>),
+
+ /// Create a new [MemoryManager] that will use up to some
+ /// fraction of total system memory.
+ New {
+ /// Max execution memory allowed for DataFusion. Defaults to
+ /// `usize::MAX`, which will not attempt to limit the memory
+ /// used during plan execution.
+ max_memory: usize,
+
+ /// The fraction of `max_memory` that the memory manager will
+ /// use for execution.
+ ///
+ /// The purpose of this config is to set aside memory for
+ /// untracked data structures, and imprecise size estimation
+ /// during memory acquisition. Defaults to 0.7
+ memory_fraction: f64,
+ },
+}
+
+impl Default for MemoryManagerConfig {
+ fn default() -> Self {
+ Self::New {
+ max_memory: usize::MAX,
+ memory_fraction: 0.7,
+ }
+ }
+}
+
+impl MemoryManagerConfig {
+ /// Create a new memory [MemoryManager] with no limit on the
+ /// memory used
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Create a configuration based on an existing [MemoryManager]
+ pub fn new_existing(existing: Arc<MemoryManager>) -> Self {
+ Self::Existing(existing)
+ }
+
+ /// Create a new [MemoryManager] with a `max_memory` and `fraction`
+ pub fn try_new_limit(max_memory: usize, memory_fraction: f64) ->
Result<Self> {
+ if max_memory == 0 {
+ return Err(DataFusionError::Plan(format!(
+ "invalid max_memory. Expected greater than 0, got {}",
+ max_memory
+ )));
+ }
+ if !(memory_fraction > 0f64 && memory_fraction <= 1f64) {
+ return Err(DataFusionError::Plan(format!(
+ "invalid fraction. Expected greater than 0 and less than 1.0,
got {}",
+ memory_fraction
+ )));
+ }
+
+ Ok(Self::New {
+ max_memory,
+ memory_fraction,
+ })
+ }
+
+ /// return the maximum size of the memory, in bytes, this config will allow
+ fn pool_size(&self) -> usize {
+ match self {
+ MemoryManagerConfig::Existing(existing) => existing.pool_size,
+ MemoryManagerConfig::New {
+ max_memory,
+ memory_fraction,
+ } => (*max_memory as f64 * *memory_fraction) as usize,
+ }
+ }
+}
+
fn next_id() -> usize {
CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
}
@@ -165,6 +243,7 @@ The memory management architecture is the following:
*/
/// Manage memory usage during physical plan execution
+#[derive(Debug)]
pub struct MemoryManager {
requesters: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn
MemoryConsumer>>>>,
trackers: Arc<Mutex<HashMap<MemoryConsumerId, Weak<dyn MemoryConsumer>>>>,
@@ -174,19 +253,27 @@ pub struct MemoryManager {
}
impl MemoryManager {
- /// Create new memory manager based on max available pool_size
+ /// Create new memory manager based on the configuration
#[allow(clippy::mutex_atomic)]
- pub fn new(pool_size: usize) -> Self {
- info!(
- "Creating memory manager with initial size {}",
- human_readable_size(pool_size)
- );
- Self {
- requesters: Arc::new(Mutex::new(HashMap::new())),
- trackers: Arc::new(Mutex::new(HashMap::new())),
- pool_size,
- requesters_total: Arc::new(Mutex::new(0)),
- cv: Condvar::new(),
+ pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
+ let pool_size = config.pool_size();
+
+ match config {
+ MemoryManagerConfig::Existing(manager) => manager,
+ MemoryManagerConfig::New { .. } => {
+ info!(
+ "Creating memory manager with initial size {}",
+ human_readable_size(pool_size)
+ );
+
+ Arc::new(Self {
+ requesters: Arc::new(Mutex::new(HashMap::new())),
+ trackers: Arc::new(Mutex::new(HashMap::new())),
+ pool_size,
+ requesters_total: Arc::new(Mutex::new(0)),
+ cv: Condvar::new(),
+ })
+ }
}
}
@@ -328,10 +415,8 @@ fn human_readable_size(size: usize) -> String {
#[cfg(test)]
mod tests {
+ use super::*;
use crate::error::Result;
- use crate::execution::memory_manager::{
- ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager,
- };
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use async_trait::async_trait;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -438,11 +523,10 @@ mod tests {
}
#[tokio::test]
- async fn basic_functionalities() -> Result<()> {
+ async fn basic_functionalities() {
let config = RuntimeConfig::new()
- .with_memory_fraction(1.0)
- .with_max_execution_memory(100);
- let runtime = Arc::new(RuntimeEnv::new(config)?);
+ .with_memory_manager(MemoryManagerConfig::try_new_limit(100,
1.0).unwrap());
+ let runtime = Arc::new(RuntimeEnv::new(config).unwrap());
let tracker1 = Arc::new(DummyTracker::new(0, runtime.clone(), 5));
runtime.register_consumer(&(tracker1.clone() as Arc<dyn
MemoryConsumer>));
@@ -463,8 +547,8 @@ mod tests {
runtime.register_consumer(&(requester1.clone() as Arc<dyn
MemoryConsumer>));
// first requester entered, should be able to use any of the remaining
80
- requester1.do_with_mem(40).await?;
- requester1.do_with_mem(10).await?;
+ requester1.do_with_mem(40).await.unwrap();
+ requester1.do_with_mem(10).await.unwrap();
assert_eq!(requester1.get_spills(), 0);
assert_eq!(requester1.mem_used(), 50);
assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(),
50);
@@ -472,17 +556,46 @@ mod tests {
let requester2 = Arc::new(DummyRequester::new(0, runtime.clone()));
runtime.register_consumer(&(requester2.clone() as Arc<dyn
MemoryConsumer>));
- requester2.do_with_mem(20).await?;
- requester2.do_with_mem(30).await?;
+ requester2.do_with_mem(20).await.unwrap();
+ requester2.do_with_mem(30).await.unwrap();
assert_eq!(requester2.get_spills(), 1);
assert_eq!(requester2.mem_used(), 30);
- requester1.do_with_mem(10).await?;
+ requester1.do_with_mem(10).await.unwrap();
assert_eq!(requester1.get_spills(), 1);
assert_eq!(requester1.mem_used(), 10);
assert_eq!(*runtime.memory_manager.requesters_total.lock().unwrap(),
40);
+ }
- Ok(())
+ #[tokio::test]
+ #[should_panic(expected = "invalid max_memory. Expected greater than 0,
got 0")]
+ async fn test_try_new_with_limit_0() {
+ MemoryManagerConfig::try_new_limit(0, 1.0).unwrap();
+ }
+
+ #[tokio::test]
+ #[should_panic(
+ expected = "invalid fraction. Expected greater than 0 and less than
1.0, got -9.6"
+ )]
+ async fn test_try_new_with_limit_neg_fraction() {
+ MemoryManagerConfig::try_new_limit(100, -9.6).unwrap();
+ }
+
+ #[tokio::test]
+ #[should_panic(
+ expected = "invalid fraction. Expected greater than 0 and less than
1.0, got 9.6"
+ )]
+ async fn test_try_new_with_limit_too_large() {
+ MemoryManagerConfig::try_new_limit(100, 9.6).unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_try_new_with_limit_pool_size() {
+ let config = MemoryManagerConfig::try_new_limit(100, 0.5).unwrap();
+ assert_eq!(config.pool_size(), 50);
+
+ let config = MemoryManagerConfig::try_new_limit(100000, 0.1).unwrap();
+ assert_eq!(config.pool_size(), 10000);
}
}
diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs
index 0c92627..e3b42ae 100644
--- a/datafusion/src/execution/mod.rs
+++ b/datafusion/src/execution/mod.rs
@@ -23,3 +23,6 @@ pub(crate) mod disk_manager;
pub mod memory_manager;
pub mod options;
pub mod runtime_env;
+
+pub use disk_manager::DiskManager;
+pub use memory_manager::{MemoryConsumer, MemoryConsumerId, MemoryManager};
diff --git a/datafusion/src/execution/runtime_env.rs
b/datafusion/src/execution/runtime_env.rs
index 1e1aecd..cdcd1f7 100644
--- a/datafusion/src/execution/runtime_env.rs
+++ b/datafusion/src/execution/runtime_env.rs
@@ -18,17 +18,25 @@
//! Execution runtime environment that tracks memory, disk and various
configurations
//! that are used during physical plan execution.
-use crate::error::Result;
-use crate::execution::disk_manager::DiskManager;
-use crate::execution::memory_manager::{MemoryConsumer, MemoryConsumerId,
MemoryManager};
+use crate::{
+ error::Result,
+ execution::{
+ disk_manager::{DiskManager, DiskManagerConfig},
+ memory_manager::{
+ MemoryConsumer, MemoryConsumerId, MemoryManager,
MemoryManagerConfig,
+ },
+ },
+};
+
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
#[derive(Clone)]
-/// Execution runtime environment
+/// Execution runtime environment. This structure is passed to the
+/// physical plans when they are run.
pub struct RuntimeEnv {
- /// Runtime configuration
- pub config: RuntimeConfig,
+ /// Default batch size while creating new batches
+ pub batch_size: usize,
/// Runtime memory management
pub memory_manager: Arc<MemoryManager>,
/// Manage temporary files during query execution
@@ -44,20 +52,22 @@ impl Debug for RuntimeEnv {
impl RuntimeEnv {
/// Create env based on configuration
pub fn new(config: RuntimeConfig) -> Result<Self> {
- let memory_manager = Arc::new(MemoryManager::new(
- (config.max_memory as f64 * config.memory_fraction) as usize,
- ));
- let disk_manager = Arc::new(DiskManager::new(&config.local_dirs)?);
- Ok(Self {
- config,
+ let RuntimeConfig {
+ batch_size,
memory_manager,
disk_manager,
+ } = config;
+
+ Ok(Self {
+ batch_size,
+ memory_manager: MemoryManager::new(memory_manager),
+ disk_manager: DiskManager::try_new(disk_manager)?,
})
}
/// Get execution batch size based on config
pub fn batch_size(&self) -> usize {
- self.config.batch_size
+ self.batch_size
}
/// Register the consumer to get it tracked
@@ -84,16 +94,10 @@ pub struct RuntimeConfig {
/// for buffer-in-memory batches since creating tiny batches would results
/// in too much metadata memory consumption.
pub batch_size: usize,
- /// Max execution memory allowed for DataFusion.
- /// Defaults to `usize::MAX`
- pub max_memory: usize,
- /// The fraction of total memory used for execution.
- /// The purpose of this config is to set aside memory for untracked data
structures,
- /// and imprecise size estimation during memory acquisition.
- /// Defaults to 0.7
- pub memory_fraction: f64,
- /// Local dirs to store temporary files during execution.
- pub local_dirs: Vec<String>,
+ /// DiskManager to manage temporary disk file usage
+ pub disk_manager: DiskManagerConfig,
+ /// MemoryManager to limit access to memory
+ pub memory_manager: MemoryManagerConfig,
}
impl RuntimeConfig {
@@ -110,40 +114,25 @@ impl RuntimeConfig {
self
}
- /// Customize exec size
- pub fn with_max_execution_memory(mut self, max_memory: usize) -> Self {
- assert!(max_memory > 0);
- self.max_memory = max_memory;
+ /// Customize disk manager
+ pub fn with_disk_manager(mut self, disk_manager: DiskManagerConfig) ->
Self {
+ self.disk_manager = disk_manager;
self
}
- /// Customize exec memory fraction
- pub fn with_memory_fraction(mut self, fraction: f64) -> Self {
- assert!(fraction > 0f64 && fraction <= 1f64);
- self.memory_fraction = fraction;
- self
- }
-
- /// Customize exec size
- pub fn with_local_dirs(mut self, local_dirs: Vec<String>) -> Self {
- assert!(!local_dirs.is_empty());
- self.local_dirs = local_dirs;
+ /// Customize memory manager
+ pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig)
-> Self {
+ self.memory_manager = memory_manager;
self
}
}
impl Default for RuntimeConfig {
fn default() -> Self {
- let tmp_dir = tempfile::tempdir().unwrap();
- let path = tmp_dir.path().to_str().unwrap().to_string();
- std::mem::forget(tmp_dir);
-
Self {
batch_size: 8192,
- // Effectively "no limit"
- max_memory: usize::MAX,
- memory_fraction: 0.7,
- local_dirs: vec![path],
+ disk_manager: DiskManagerConfig::default(),
+ memory_manager: MemoryManagerConfig::default(),
}
}
}
diff --git a/datafusion/src/physical_plan/sorts/sort.rs
b/datafusion/src/physical_plan/sorts/sort.rs
index 456023f..2933a5b 100644
--- a/datafusion/src/physical_plan/sorts/sort.rs
+++ b/datafusion/src/physical_plan/sorts/sort.rs
@@ -564,7 +564,7 @@ async fn do_sort(
mod tests {
use super::*;
use crate::datasource::object_store::local::LocalFileSystem;
- use crate::execution::runtime_env::RuntimeConfig;
+ use crate::execution::context::ExecutionConfig;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::expressions::col;
use crate::physical_plan::memory::MemoryExec;
@@ -648,11 +648,9 @@ mod tests {
#[tokio::test]
async fn test_sort_spill() -> Result<()> {
- let config = RuntimeConfig::new()
- .with_memory_fraction(1.0)
- // trigger spill there will be 4 batches with 5.5KB for each
- .with_max_execution_memory(12288);
- let runtime = Arc::new(RuntimeEnv::new(config)?);
+ // trigger spill there will be 4 batches with 5.5KB for each
+ let config = ExecutionConfig::new().with_memory_limit(12288, 1.0)?;
+ let runtime = Arc::new(RuntimeEnv::new(config.runtime)?);
let schema = test_util::aggr_test_schema();
let partitions = 4;