tustvold commented on code in PR #4522:
URL: https://github.com/apache/arrow-datafusion/pull/4522#discussion_r1051033895
##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -17,419 +17,187 @@
//! Manages all available memory during query execution
-use crate::error::{DataFusionError, Result};
-use async_trait::async_trait;
-use hashbrown::HashSet;
-use log::{debug, warn};
-use parking_lot::{Condvar, Mutex};
-use std::fmt;
-use std::fmt::{Debug, Display, Formatter};
-use std::sync::atomic::{AtomicUsize, Ordering};
+use crate::error::Result;
use std::sync::Arc;
-use std::time::{Duration, Instant};
+mod pool;
pub mod proxy;
-static CONSUMER_ID: AtomicUsize = AtomicUsize::new(0);
-
-#[derive(Debug, Clone)]
-/// Configuration information for memory management
-pub enum MemoryManagerConfig {
- /// Use the existing [MemoryManager]
- Existing(Arc<MemoryManager>),
-
- /// Create a new [MemoryManager] that will use up to some
- /// fraction of total system memory.
- New {
- /// Max execution memory allowed for DataFusion. Defaults to
- /// `usize::MAX`, which will not attempt to limit the memory
- /// used during plan execution.
- max_memory: usize,
-
- /// The fraction of `max_memory` that the memory manager will
- /// use for execution.
- ///
- /// The purpose of this config is to set aside memory for
- /// untracked data structures, and imprecise size estimation
- /// during memory acquisition. Defaults to 0.7
- memory_fraction: f64,
- },
-}
+pub use pool::*;
-impl Default for MemoryManagerConfig {
- fn default() -> Self {
- Self::New {
- max_memory: usize::MAX,
- memory_fraction: 0.7,
- }
- }
-}
+/// The pool of memory from which [`MemoryManager`] and [`TrackedAllocation`]
allocate
+pub trait MemoryPool: Send + Sync + std::fmt::Debug {
+ /// Records the creation of a new [`TrackedAllocation`] with
[`AllocationOptions`]
+ fn allocate(&self, _options: &AllocationOptions) {}
-impl MemoryManagerConfig {
- /// Create a new memory [MemoryManager] with no limit on the
- /// memory used
- pub fn new() -> Self {
- Default::default()
- }
+ /// Records the destruction of a [`TrackedAllocation`] with
[`AllocationOptions`]
+ fn free(&self, _options: &AllocationOptions) {}
- /// Create a configuration based on an existing [MemoryManager]
- pub fn new_existing(existing: Arc<MemoryManager>) -> Self {
- Self::Existing(existing)
- }
+ /// Infallibly grow the provided `allocation` by `additional` bytes
+ ///
+ /// This must always succeed
+ fn grow(&self, allocation: &TrackedAllocation, additional: usize);
- /// Create a new [MemoryManager] with a `max_memory` and `fraction`
- pub fn try_new_limit(max_memory: usize, memory_fraction: f64) ->
Result<Self> {
- if max_memory == 0 {
- return Err(DataFusionError::Plan(format!(
- "invalid max_memory. Expected greater than 0, got {}",
- max_memory
- )));
- }
- if !(memory_fraction > 0f64 && memory_fraction <= 1f64) {
- return Err(DataFusionError::Plan(format!(
- "invalid fraction. Expected greater than 0 and less than 1.0,
got {}",
- memory_fraction
- )));
- }
+ /// Infallibly shrink the provided `allocation` by `shrink` bytes
+ fn shrink(&self, allocation: &TrackedAllocation, shrink: usize);
- Ok(Self::New {
- max_memory,
- memory_fraction,
- })
- }
+ /// Attempt to grow the provided `allocation` by `additional` bytes
+ ///
+ /// On error the `allocation` will not be increased in size
+ fn try_grow(&self, allocation: &TrackedAllocation, additional: usize) ->
Result<()>;
- /// return the maximum size of the memory, in bytes, this config will allow
- fn pool_size(&self) -> usize {
- match self {
- MemoryManagerConfig::Existing(existing) => existing.pool_size,
- MemoryManagerConfig::New {
- max_memory,
- memory_fraction,
- } => (*max_memory as f64 * *memory_fraction) as usize,
- }
- }
+ /// Return the total amount of memory allocated
+ fn allocated(&self) -> usize;
}
-fn next_id() -> usize {
- CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
+/// A cooperative MemoryManager which tracks memory in a cooperative fashion.
+/// `ExecutionPlan` nodes such as `SortExec`, which require large amounts of
memory
+/// register their memory requests with the MemoryManager which then tracks
the total
+/// memory that has been allocated across all such nodes.
+///
+/// The associated [`MemoryPool`] determines how to respond to memory
allocation
+/// requests, and any associated fairness control
+#[derive(Debug)]
+pub struct MemoryManager {
+ pool: Arc<dyn MemoryPool>,
}
-/// Type of the memory consumer
-pub enum ConsumerType {
- /// consumers that can grow its memory usage by requesting more from the
memory manager or
- /// shrinks its memory usage when we can no more assign available memory
to it.
- /// Examples are spillable sorter, spillable hashmap, etc.
- Requesting,
- /// consumers that are not spillable, counting in for only tracking
purpose.
- Tracking,
-}
+impl MemoryManager {
+ /// Create new memory manager based on the configuration
+ pub fn new(pool: Arc<dyn MemoryPool>) -> Self {
+ Self { pool }
+ }
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
- /// partition the consumer belongs to
- pub partition_id: usize,
- /// unique id
- pub id: usize,
-}
+ /// Returns the number of allocated bytes
+ ///
+ /// Note: this can exceed the pool size as a result of
[`MemoryManager::allocate`]
+ pub fn allocated(&self) -> usize {
+ self.pool.allocated()
+ }
-impl MemoryConsumerId {
- /// Auto incremented new Id
- pub fn new(partition_id: usize) -> Self {
- let id = next_id();
- Self { partition_id, id }
+ /// Returns a new empty allocation identified by `name`
+ pub fn new_allocation(&self, name: String) -> TrackedAllocation {
+ self.new_allocation_with_options(AllocationOptions::new(name))
}
-}
-impl Display for MemoryConsumerId {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}:{}", self.partition_id, self.id)
+ /// Returns a new empty allocation with the provided [`AllocationOptions`]
+ pub fn new_allocation_with_options(
+ &self,
+ options: AllocationOptions,
+ ) -> TrackedAllocation {
+ TrackedAllocation::new_empty(options, Arc::clone(&self.pool))
}
}
-#[async_trait]
-/// A memory consumer that either takes up memory (of type
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
- /// Display name of the consumer
- fn name(&self) -> String;
-
- /// Unique id of the consumer
- fn id(&self) -> &MemoryConsumerId;
-
- /// Ptr to MemoryManager
- fn memory_manager(&self) -> Arc<MemoryManager>;
-
- /// Partition that the consumer belongs to
- fn partition_id(&self) -> usize {
- self.id().partition_id
- }
+/// Options associated with a [`TrackedAllocation`]
+#[derive(Debug)]
+pub struct AllocationOptions {
Review Comment:
This is designed to be an extension point, so we can potentially add more
allocation options and correspondingly more sophisticated MemoryPool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]