alamb commented on code in PR #14975:
URL: https://github.com/apache/datafusion/pull/14975#discussion_r2004211990
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -144,6 +219,94 @@ impl DiskManager {
.map_err(DataFusionError::IoError)?,
})
}
+
+ /// Write record batches to a temporary file, and return the spill file
handle.
Review Comment:
I am not sure about putting this method on the `DiskManager` itself -- I
think managing the contents of the spill files is something that will likely
get more sophisticated (e.g. if we want to explore using mmap and other
techniques to speed up reading)
I have been meaning to file tickets about this and I will do so shortly.
What would you think about introducing a new struct that would be
responsible for managing the spill files for a particular operation
Something like this perhaps:
```rust
struct SpillFiles {
env: Arc<RuntimeEnv>
files: Vec<RefCountedTempFile>
...
}
...
impl SpillFiles {
pub fn try_spill_record_batches(
&self,
batches: &[RecordBatch],
request_description: &str,
caller_spill_metrics: &mut SpillMetrics,
) -> Result<()> {..}
}
```
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -93,14 +115,67 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
})),
}
}
+ pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {
Review Comment:
I don't understand the need for this function and find the name confusing.
It seems like the only difference is that it will error if it doesn't have
exclusive access to the disk manager. I have an alternate suggestion above
##########
datafusion/core/tests/memory_limit/mod.rs:
##########
@@ -468,6 +470,83 @@ async fn test_stringview_external_sort() {
let _ = df.collect().await.expect("Query execution failed");
}
+// Tests for disk limit (`max_temp_directory_size` in `DiskManager`)
+// ------------------------------------------------------------------
+
+// Create a new `SessionContext` with speicified disk limit and memory pool
limit
+async fn setup_context(
+ disk_limit: u64,
+ memory_pool_limit: usize,
+) -> Result<SessionContext> {
+ let disk_manager =
DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)?
+ .with_max_temp_directory_size(disk_limit)?;
+
+ let runtime = RuntimeEnvBuilder::new()
Review Comment:
I think you coudl do this iinstead of adding a new function:
```rust
let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
let disk_manager = Arc::try_unwrap(disk_manager)
.expect("DiskManager should be unique")
.with_max_temp_directory_size(disk_limit)?;
```
##########
datafusion/execution/src/disk_manager.rs:
##########
@@ -93,14 +115,67 @@ impl DiskManager {
);
Ok(Arc::new(Self {
local_dirs: Mutex::new(Some(local_dirs)),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
})),
}
}
+ pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {
+ match config {
+ DiskManagerConfig::Existing(manager) => {
+ Arc::try_unwrap(manager).map_err(|_| {
+ DataFusionError::Internal("Failed to unwrap
Arc".to_string())
+ })
+ }
+ DiskManagerConfig::NewOs => Ok(Self {
+ local_dirs: Mutex::new(Some(vec![])),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
+ }),
+ DiskManagerConfig::NewSpecified(conf_dirs) => {
+ let local_dirs = create_local_dirs(conf_dirs)?;
+ debug!(
+ "Created local dirs {:?} as DataFusion working directory",
+ local_dirs
+ );
+ Ok(Self {
+ local_dirs: Mutex::new(Some(local_dirs)),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
+ })
+ }
+ DiskManagerConfig::Disabled => Ok(Self {
+ local_dirs: Mutex::new(None),
+ max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
+ used_disk_space: Count::default(),
+ }),
+ }
+ }
+
+ /// Set the maximum amount of data (in bytes) stored inside the temporary
directories.
+ pub fn with_max_temp_directory_size(
Review Comment:
I would expect that this function should be on the `DiskManagerConfig`
rather than `DiskManager` 🤔
Then you won't have to add the new `try_uwnrap` either
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]