alamb commented on code in PR #4522:
URL: https://github.com/apache/arrow-datafusion/pull/4522#discussion_r1040863714
##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -97,339 +89,154 @@ impl MemoryManagerConfig {
memory_fraction,
})
}
-
- /// return the maximum size of the memory, in bytes, this config will allow
- fn pool_size(&self) -> usize {
- match self {
- MemoryManagerConfig::Existing(existing) => existing.pool_size,
- MemoryManagerConfig::New {
- max_memory,
- memory_fraction,
- } => (*max_memory as f64 * *memory_fraction) as usize,
- }
- }
-}
-
-fn next_id() -> usize {
- CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
- /// consumers that can grow its memory usage by requesting more from the
memory manager or
- /// shrinks its memory usage when we can no more assign available memory
to it.
- /// Examples are spillable sorter, spillable hashmap, etc.
- Requesting,
- /// consumers that are not spillable, counting in for only tracking
purpose.
- Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
- /// partition the consumer belongs to
- pub partition_id: usize,
- /// unique id
- pub id: usize,
-}
-
-impl MemoryConsumerId {
- /// Auto incremented new Id
- pub fn new(partition_id: usize) -> Self {
- let id = next_id();
- Self { partition_id, id }
- }
-}
-
-impl Display for MemoryConsumerId {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}:{}", self.partition_id, self.id)
- }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
- /// Display name of the consumer
- fn name(&self) -> String;
-
- /// Unique id of the consumer
- fn id(&self) -> &MemoryConsumerId;
-
- /// Ptr to MemoryManager
- fn memory_manager(&self) -> Arc<MemoryManager>;
-
- /// Partition that the consumer belongs to
- fn partition_id(&self) -> usize {
- self.id().partition_id
- }
-
- /// Type of the consumer
- fn type_(&self) -> &ConsumerType;
-
- /// Grow memory by `required` to buffer more data in memory,
- /// this may trigger spill before grow when the memory threshold is
- /// reached for this consumer.
- async fn try_grow(&self, required: usize) -> Result<()> {
- let current = self.mem_used();
- debug!(
- "trying to acquire {} whiling holding {} from consumer {}",
- human_readable_size(required),
- human_readable_size(current),
- self.id(),
- );
-
- let can_grow_directly =
- self.memory_manager().can_grow_directly(required, current);
- if !can_grow_directly {
- debug!(
- "Failed to grow memory of {} directly from consumer {},
spilling first ...",
- human_readable_size(required),
- self.id()
- );
- let freed = self.spill().await?;
- self.memory_manager()
- .record_free_then_acquire(freed, required);
- }
- Ok(())
- }
-
- /// Grow without spilling to the disk. It grows the memory directly
- /// so it should be only used when the consumer already allocated the
- /// memory and it is safe to grow without spilling.
- fn grow(&self, required: usize) {
- self.memory_manager().record_free_then_acquire(0, required);
- }
-
- /// Return `freed` memory to the memory manager,
- /// may wake up other requesters waiting for their minimum memory quota.
- fn shrink(&self, freed: usize) {
- self.memory_manager().record_free(freed);
- }
-
- /// Spill in-memory buffers to disk, free memory, return the previous used
- async fn spill(&self) -> Result<usize>;
-
- /// Current memory used by this consumer
- fn mem_used(&self) -> usize;
}
-impl Debug for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(
- f,
- "{}[{}]: {}",
- self.name(),
- self.id(),
- human_readable_size(self.mem_used())
- )
- }
-}
-
-impl Display for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}[{}]", self.name(), self.id(),)
- }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory
and RuntimeConfig.memory_fraction (float64 between 0..1).
- The actual max memory DataFusion could use `pool_size = max_memory *
memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory
Consumers'. Operators or others are encouraged to
- register themselves to the memory manager and report its usage through
`mem_used()`.
-3. There are two kinds of consumers:
- - 'Requesting' consumers that would acquire memory during its execution and
release memory through `spill` if no more memory is available.
- - 'Tracking' consumers that exist for reporting purposes to provide a more
accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer
could acquire a maximum of
- (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
- Memory Space for the DataFusion Lib / Process of `pool_size`
-
┌──────────────────────────────────────────────z─────────────────────────────┐
- │ z
│
- │ z
│
- │ Requesting z Tracking
│
- │ Memory Consumers z Memory Consumers
│
- │ z
│
- │ z
│
-
└──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
+/// The memory manager maintains a fixed size pool of memory
+/// from which portions can be allocated
Review Comment:
I think it would be good to expand these comments to explain the allocation
strategy of the manager
```suggestion
/// from which portions can be allocated
///
/// A cooperative MemoryManager which tracks memory in a cooperative
fashion.
/// `ExectionPlan` nodes such as `SortExec`, which require large amounts of
memory register their
/// memory requests with the MemoryManager which then tracks the total
memory that
/// has been allocated across all such nodes.
///
/// The MemoryManager does not guarantee any form of "fairness" -- the
operators that ask for
/// memory first are the ones that are given memory.
///
/// Note that more sophisticated memory polices can be created at a higher
level
/// by dividing available system memory across multiple `MemoryManagers`
and assigning
/// different MemoryManagers to diffrent pools.
```
##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -97,339 +89,154 @@ impl MemoryManagerConfig {
memory_fraction,
})
}
-
- /// return the maximum size of the memory, in bytes, this config will allow
- fn pool_size(&self) -> usize {
- match self {
- MemoryManagerConfig::Existing(existing) => existing.pool_size,
- MemoryManagerConfig::New {
- max_memory,
- memory_fraction,
- } => (*max_memory as f64 * *memory_fraction) as usize,
- }
- }
-}
-
-fn next_id() -> usize {
- CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
- /// consumers that can grow its memory usage by requesting more from the
memory manager or
- /// shrinks its memory usage when we can no more assign available memory
to it.
- /// Examples are spillable sorter, spillable hashmap, etc.
- Requesting,
- /// consumers that are not spillable, counting in for only tracking
purpose.
- Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
- /// partition the consumer belongs to
- pub partition_id: usize,
- /// unique id
- pub id: usize,
-}
-
-impl MemoryConsumerId {
- /// Auto incremented new Id
- pub fn new(partition_id: usize) -> Self {
- let id = next_id();
- Self { partition_id, id }
- }
-}
-
-impl Display for MemoryConsumerId {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}:{}", self.partition_id, self.id)
- }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
- /// Display name of the consumer
- fn name(&self) -> String;
-
- /// Unique id of the consumer
- fn id(&self) -> &MemoryConsumerId;
-
- /// Ptr to MemoryManager
- fn memory_manager(&self) -> Arc<MemoryManager>;
-
- /// Partition that the consumer belongs to
- fn partition_id(&self) -> usize {
- self.id().partition_id
- }
-
- /// Type of the consumer
- fn type_(&self) -> &ConsumerType;
-
- /// Grow memory by `required` to buffer more data in memory,
- /// this may trigger spill before grow when the memory threshold is
- /// reached for this consumer.
- async fn try_grow(&self, required: usize) -> Result<()> {
- let current = self.mem_used();
- debug!(
- "trying to acquire {} whiling holding {} from consumer {}",
- human_readable_size(required),
- human_readable_size(current),
- self.id(),
- );
-
- let can_grow_directly =
- self.memory_manager().can_grow_directly(required, current);
- if !can_grow_directly {
- debug!(
- "Failed to grow memory of {} directly from consumer {},
spilling first ...",
- human_readable_size(required),
- self.id()
- );
- let freed = self.spill().await?;
- self.memory_manager()
- .record_free_then_acquire(freed, required);
- }
- Ok(())
- }
-
- /// Grow without spilling to the disk. It grows the memory directly
- /// so it should be only used when the consumer already allocated the
- /// memory and it is safe to grow without spilling.
- fn grow(&self, required: usize) {
- self.memory_manager().record_free_then_acquire(0, required);
- }
-
- /// Return `freed` memory to the memory manager,
- /// may wake up other requesters waiting for their minimum memory quota.
- fn shrink(&self, freed: usize) {
- self.memory_manager().record_free(freed);
- }
-
- /// Spill in-memory buffers to disk, free memory, return the previous used
- async fn spill(&self) -> Result<usize>;
-
- /// Current memory used by this consumer
- fn mem_used(&self) -> usize;
}
-impl Debug for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(
- f,
- "{}[{}]: {}",
- self.name(),
- self.id(),
- human_readable_size(self.mem_used())
- )
- }
-}
-
-impl Display for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}[{}]", self.name(), self.id(),)
- }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory
and RuntimeConfig.memory_fraction (float64 between 0..1).
- The actual max memory DataFusion could use `pool_size = max_memory *
memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory
Consumers'. Operators or others are encouraged to
- register themselves to the memory manager and report its usage through
`mem_used()`.
-3. There are two kinds of consumers:
- - 'Requesting' consumers that would acquire memory during its execution and
release memory through `spill` if no more memory is available.
- - 'Tracking' consumers that exist for reporting purposes to provide a more
accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer
could acquire a maximum of
- (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
- Memory Space for the DataFusion Lib / Process of `pool_size`
-
┌──────────────────────────────────────────────z─────────────────────────────┐
- │ z
│
- │ z
│
- │ Requesting z Tracking
│
- │ Memory Consumers z Memory Consumers
│
- │ z
│
- │ z
│
-
└──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
+/// The memory manager maintains a fixed size pool of memory
+/// from which portions can be allocated
#[derive(Debug)]
pub struct MemoryManager {
- requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
- pool_size: usize,
- requesters_total: Arc<Mutex<usize>>,
- trackers_total: AtomicUsize,
- cv: Condvar,
+ state: Arc<MemoryManagerState>,
}
impl MemoryManager {
/// Create new memory manager based on the configuration
- #[allow(clippy::mutex_atomic)]
pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
- let pool_size = config.pool_size();
-
match config {
MemoryManagerConfig::Existing(manager) => manager,
- MemoryManagerConfig::New { .. } => {
+ MemoryManagerConfig::New {
+ max_memory,
+ memory_fraction,
+ } => {
+ let pool_size = (max_memory as f64 * memory_fraction) as usize;
debug!(
"Creating memory manager with initial size {}",
human_readable_size(pool_size)
);
Arc::new(Self {
- requesters: Arc::new(Mutex::new(HashSet::new())),
- pool_size,
- requesters_total: Arc::new(Mutex::new(0)),
- trackers_total: AtomicUsize::new(0),
- cv: Condvar::new(),
+ state: Arc::new(MemoryManagerState {
+ pool_size,
+ used: AtomicUsize::new(0),
+ }),
})
}
}
}
- fn get_tracker_total(&self) -> usize {
- self.trackers_total.load(Ordering::SeqCst)
- }
-
- pub(crate) fn grow_tracker_usage(&self, delta: usize) {
- self.trackers_total.fetch_add(delta, Ordering::SeqCst);
- }
-
- pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
- let update =
- self.trackers_total
- .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
- if x >= delta {
- Some(x - delta)
- } else {
- None
- }
- });
- update.unwrap_or_else(|_| {
- panic!(
- "Tracker total memory shrink by {} underflow, current value is
",
- delta
- )
- });
+ /// Returns the maximum pool size
+ ///
+ /// Note: this can be less than the amount allocated as a result of
[`MemoryManager::allocate`]
+ pub fn pool_size(&self) -> usize {
+ self.state.pool_size
}
- /// Return the total memory usage for all requesters
- pub fn get_requester_total(&self) -> usize {
- *self.requesters_total.lock()
+ /// Returns the number of allocated bytes
+ ///
+ /// Note: this can exceed the pool size as a result of
[`MemoryManager::allocate`]
+ pub fn allocated(&self) -> usize {
+ self.state.used.load(Ordering::Relaxed)
}
- /// Register a new memory requester
- pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
- self.requesters.lock().insert(requester_id.clone());
+ /// Returns a new empty allocation identified by `name`
+ pub fn new_tracked_allocation(&self, name: String) -> TrackedAllocation {
Review Comment:
I think it is worth leaving some sort of `requester_id` as part of this API
so that it is possible to extend in the future so that the first spilling
operator doesn't end up with all the memory, for example.
##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -97,339 +89,154 @@ impl MemoryManagerConfig {
memory_fraction,
})
}
-
- /// return the maximum size of the memory, in bytes, this config will allow
- fn pool_size(&self) -> usize {
- match self {
- MemoryManagerConfig::Existing(existing) => existing.pool_size,
- MemoryManagerConfig::New {
- max_memory,
- memory_fraction,
- } => (*max_memory as f64 * *memory_fraction) as usize,
- }
- }
-}
-
-fn next_id() -> usize {
- CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
- /// consumers that can grow its memory usage by requesting more from the
memory manager or
- /// shrinks its memory usage when we can no more assign available memory
to it.
- /// Examples are spillable sorter, spillable hashmap, etc.
- Requesting,
- /// consumers that are not spillable, counting in for only tracking
purpose.
- Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
- /// partition the consumer belongs to
- pub partition_id: usize,
- /// unique id
- pub id: usize,
-}
-
-impl MemoryConsumerId {
- /// Auto incremented new Id
- pub fn new(partition_id: usize) -> Self {
- let id = next_id();
- Self { partition_id, id }
- }
-}
-
-impl Display for MemoryConsumerId {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}:{}", self.partition_id, self.id)
- }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
- /// Display name of the consumer
- fn name(&self) -> String;
-
- /// Unique id of the consumer
- fn id(&self) -> &MemoryConsumerId;
-
- /// Ptr to MemoryManager
- fn memory_manager(&self) -> Arc<MemoryManager>;
-
- /// Partition that the consumer belongs to
- fn partition_id(&self) -> usize {
- self.id().partition_id
- }
-
- /// Type of the consumer
- fn type_(&self) -> &ConsumerType;
-
- /// Grow memory by `required` to buffer more data in memory,
- /// this may trigger spill before grow when the memory threshold is
- /// reached for this consumer.
- async fn try_grow(&self, required: usize) -> Result<()> {
- let current = self.mem_used();
- debug!(
- "trying to acquire {} whiling holding {} from consumer {}",
- human_readable_size(required),
- human_readable_size(current),
- self.id(),
- );
-
- let can_grow_directly =
- self.memory_manager().can_grow_directly(required, current);
- if !can_grow_directly {
- debug!(
- "Failed to grow memory of {} directly from consumer {},
spilling first ...",
- human_readable_size(required),
- self.id()
- );
- let freed = self.spill().await?;
- self.memory_manager()
- .record_free_then_acquire(freed, required);
- }
- Ok(())
- }
-
- /// Grow without spilling to the disk. It grows the memory directly
- /// so it should be only used when the consumer already allocated the
- /// memory and it is safe to grow without spilling.
- fn grow(&self, required: usize) {
- self.memory_manager().record_free_then_acquire(0, required);
- }
-
- /// Return `freed` memory to the memory manager,
- /// may wake up other requesters waiting for their minimum memory quota.
- fn shrink(&self, freed: usize) {
- self.memory_manager().record_free(freed);
- }
-
- /// Spill in-memory buffers to disk, free memory, return the previous used
- async fn spill(&self) -> Result<usize>;
-
- /// Current memory used by this consumer
- fn mem_used(&self) -> usize;
}
-impl Debug for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(
- f,
- "{}[{}]: {}",
- self.name(),
- self.id(),
- human_readable_size(self.mem_used())
- )
- }
-}
-
-impl Display for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}[{}]", self.name(), self.id(),)
- }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory
and RuntimeConfig.memory_fraction (float64 between 0..1).
- The actual max memory DataFusion could use `pool_size = max_memory *
memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory
Consumers'. Operators or others are encouraged to
- register themselves to the memory manager and report its usage through
`mem_used()`.
-3. There are two kinds of consumers:
- - 'Requesting' consumers that would acquire memory during its execution and
release memory through `spill` if no more memory is available.
- - 'Tracking' consumers that exist for reporting purposes to provide a more
accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer
could acquire a maximum of
- (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
- Memory Space for the DataFusion Lib / Process of `pool_size`
-
┌──────────────────────────────────────────────z─────────────────────────────┐
- │ z
│
- │ z
│
- │ Requesting z Tracking
│
- │ Memory Consumers z Memory Consumers
│
- │ z
│
- │ z
│
-
└──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
+/// The memory manager maintains a fixed size pool of memory
+/// from which portions can be allocated
#[derive(Debug)]
pub struct MemoryManager {
- requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
- pool_size: usize,
- requesters_total: Arc<Mutex<usize>>,
- trackers_total: AtomicUsize,
- cv: Condvar,
+ state: Arc<MemoryManagerState>,
}
impl MemoryManager {
/// Create new memory manager based on the configuration
- #[allow(clippy::mutex_atomic)]
pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
- let pool_size = config.pool_size();
-
match config {
MemoryManagerConfig::Existing(manager) => manager,
- MemoryManagerConfig::New { .. } => {
+ MemoryManagerConfig::New {
+ max_memory,
+ memory_fraction,
+ } => {
+ let pool_size = (max_memory as f64 * memory_fraction) as usize;
debug!(
"Creating memory manager with initial size {}",
human_readable_size(pool_size)
);
Arc::new(Self {
- requesters: Arc::new(Mutex::new(HashSet::new())),
- pool_size,
- requesters_total: Arc::new(Mutex::new(0)),
- trackers_total: AtomicUsize::new(0),
- cv: Condvar::new(),
+ state: Arc::new(MemoryManagerState {
+ pool_size,
+ used: AtomicUsize::new(0),
+ }),
})
}
}
}
- fn get_tracker_total(&self) -> usize {
- self.trackers_total.load(Ordering::SeqCst)
- }
-
- pub(crate) fn grow_tracker_usage(&self, delta: usize) {
- self.trackers_total.fetch_add(delta, Ordering::SeqCst);
- }
-
- pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
- let update =
- self.trackers_total
- .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
- if x >= delta {
- Some(x - delta)
- } else {
- None
- }
- });
- update.unwrap_or_else(|_| {
- panic!(
- "Tracker total memory shrink by {} underflow, current value is
",
- delta
- )
- });
+ /// Returns the maximum pool size
+ ///
+ /// Note: this can be less than the amount allocated as a result of
[`MemoryManager::allocate`]
+ pub fn pool_size(&self) -> usize {
+ self.state.pool_size
}
- /// Return the total memory usage for all requesters
- pub fn get_requester_total(&self) -> usize {
- *self.requesters_total.lock()
+ /// Returns the number of allocated bytes
+ ///
+ /// Note: this can exceed the pool size as a result of
[`MemoryManager::allocate`]
+ pub fn allocated(&self) -> usize {
+ self.state.used.load(Ordering::Relaxed)
}
- /// Register a new memory requester
- pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
- self.requesters.lock().insert(requester_id.clone());
+ /// Returns a new empty allocation identified by `name`
+ pub fn new_tracked_allocation(&self, name: String) -> TrackedAllocation {
+ TrackedAllocation::new_empty(name, Arc::clone(&self.state))
}
+}
- fn max_mem_for_requesters(&self) -> usize {
- let trk_total = self.get_tracker_total();
- self.pool_size.saturating_sub(trk_total)
+impl std::fmt::Display for MemoryManager {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "MemoryManager(capacity: {}, allocated: {})",
+ human_readable_size(self.state.pool_size),
+ human_readable_size(self.allocated()),
+ )
}
+}
- /// Grow memory attempt from a consumer, return if we could grant that
much to it
- fn can_grow_directly(&self, required: usize, current: usize) -> bool {
- let num_rqt = self.requesters.lock().len();
- let mut rqt_current_used = self.requesters_total.lock();
- let mut rqt_max = self.max_mem_for_requesters();
-
- let granted;
- loop {
- let max_per_rqt = rqt_max / num_rqt;
- let min_per_rqt = max_per_rqt / 2;
-
- if required + current >= max_per_rqt {
- granted = false;
- break;
- }
+#[derive(Debug)]
+struct MemoryManagerState {
+ pool_size: usize,
+ used: AtomicUsize,
+}
- let remaining =
rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
- if remaining >= required {
- granted = true;
- *rqt_current_used += required;
- break;
- } else if current < min_per_rqt {
- // if we cannot acquire at lease 1/2n memory, just wait for
others
- // to spill instead spill self frequently with limited total
mem
- debug!(
- "Cannot acquire a minimum amount of {} memory from the
manager of total {}, waiting for others to spill ...",
- human_readable_size(min_per_rqt),
human_readable_size(self.pool_size));
- let now = Instant::now();
- self.cv.wait(&mut rqt_current_used);
- let elapsed = now.elapsed();
- if elapsed > Duration::from_secs(10) {
- warn!("Elapsed on waiting for spilling: {:.2?}", elapsed);
- }
- } else {
- granted = false;
- break;
- }
+/// A [`TrackedAllocation`] tracks a reservation of memory in a
[`MemoryManager`]
+/// that is freed back to the memory pool on drop
+#[derive(Debug)]
+pub struct TrackedAllocation {
+ name: String,
+ size: usize,
+ state: Arc<MemoryManagerState>,
+}
- rqt_max = self.max_mem_for_requesters();
+impl TrackedAllocation {
+ fn new_empty(name: String, state: Arc<MemoryManagerState>) -> Self {
+ Self {
+ name,
+ size: 0,
+ state,
}
+ }
- granted
+ /// Returns the size of this [`TrackedAllocation`] in bytes
+ pub fn size(&self) -> usize {
+ self.size
}
- fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
- let mut requesters_total = self.requesters_total.lock();
- debug!(
- "free_then_acquire: total {}, freed {}, acquired {}",
- human_readable_size(*requesters_total),
- human_readable_size(freed),
- human_readable_size(acquired)
- );
- assert!(*requesters_total >= freed);
- *requesters_total -= freed;
- *requesters_total += acquired;
- self.cv.notify_all();
+ /// Frees all bytes from this allocation returning the number of bytes
freed
+ pub fn free(&mut self) -> usize {
+ let size = self.size;
+ if size != 0 {
+ self.shrink(size)
+ }
+ size
+ }
+
+ /// Frees `capacity` bytes from this allocation
+ ///
+ /// # Panics
+ ///
+ /// Panics if `capacity` exceeds [`Self::size`]
+ pub fn shrink(&mut self, capacity: usize) {
+ let new_size = self.size.checked_sub(capacity).unwrap();
+ self.state.used.fetch_sub(capacity, Ordering::SeqCst);
+ self.size = new_size
+ }
+
+ /// Sets the size of this allocation to `capacity`
+ pub fn resize(&mut self, capacity: usize) {
+ use std::cmp::Ordering;
+ match capacity.cmp(&self.size) {
+ Ordering::Greater => self.grow(capacity - self.size),
+ Ordering::Less => self.shrink(self.size - capacity),
+ _ => {}
+ }
}
- fn record_free(&self, freed: usize) {
- let mut requesters_total = self.requesters_total.lock();
- debug!(
- "free: total {}, freed {}",
- human_readable_size(*requesters_total),
- human_readable_size(freed)
- );
- assert!(*requesters_total >= freed);
- *requesters_total -= freed;
- self.cv.notify_all();
+ /// Increase the size of this by `capacity` bytes
+ pub fn grow(&mut self, capacity: usize) {
+ self.state.used.fetch_add(capacity, Ordering::SeqCst);
+ self.size += capacity;
}
- /// Drop a memory consumer and reclaim the memory
- pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize)
{
- // find in requesters first
- {
- let mut requesters = self.requesters.lock();
- if requesters.remove(id) {
- let mut total = self.requesters_total.lock();
- assert!(*total >= mem_used);
- *total -= mem_used;
- self.cv.notify_all();
- return;
- }
- }
- self.shrink_tracker_usage(mem_used);
- self.cv.notify_all();
+ /// Try to increase the size of this [`TrackedAllocation`] by `capacity`
bytes
Review Comment:
```suggestion
/// Try to increase the size of this [`TrackedAllocation`] by `capacity`
bytes,
/// returning an ResourcesExhausted error if such an allocation would
exceed
/// the configured pool size
```
##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -97,339 +89,154 @@ impl MemoryManagerConfig {
memory_fraction,
})
}
-
- /// return the maximum size of the memory, in bytes, this config will allow
- fn pool_size(&self) -> usize {
- match self {
- MemoryManagerConfig::Existing(existing) => existing.pool_size,
- MemoryManagerConfig::New {
- max_memory,
- memory_fraction,
- } => (*max_memory as f64 * *memory_fraction) as usize,
- }
- }
-}
-
-fn next_id() -> usize {
- CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
- /// consumers that can grow its memory usage by requesting more from the
memory manager or
- /// shrinks its memory usage when we can no more assign available memory
to it.
- /// Examples are spillable sorter, spillable hashmap, etc.
- Requesting,
- /// consumers that are not spillable, counting in for only tracking
purpose.
- Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
- /// partition the consumer belongs to
- pub partition_id: usize,
- /// unique id
- pub id: usize,
-}
-
-impl MemoryConsumerId {
- /// Auto incremented new Id
- pub fn new(partition_id: usize) -> Self {
- let id = next_id();
- Self { partition_id, id }
- }
-}
-
-impl Display for MemoryConsumerId {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}:{}", self.partition_id, self.id)
- }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
- /// Display name of the consumer
- fn name(&self) -> String;
-
- /// Unique id of the consumer
- fn id(&self) -> &MemoryConsumerId;
-
- /// Ptr to MemoryManager
- fn memory_manager(&self) -> Arc<MemoryManager>;
-
- /// Partition that the consumer belongs to
- fn partition_id(&self) -> usize {
- self.id().partition_id
- }
-
- /// Type of the consumer
- fn type_(&self) -> &ConsumerType;
-
- /// Grow memory by `required` to buffer more data in memory,
- /// this may trigger spill before grow when the memory threshold is
- /// reached for this consumer.
- async fn try_grow(&self, required: usize) -> Result<()> {
- let current = self.mem_used();
- debug!(
- "trying to acquire {} whiling holding {} from consumer {}",
- human_readable_size(required),
- human_readable_size(current),
- self.id(),
- );
-
- let can_grow_directly =
- self.memory_manager().can_grow_directly(required, current);
- if !can_grow_directly {
- debug!(
- "Failed to grow memory of {} directly from consumer {},
spilling first ...",
- human_readable_size(required),
- self.id()
- );
- let freed = self.spill().await?;
- self.memory_manager()
- .record_free_then_acquire(freed, required);
- }
- Ok(())
- }
-
- /// Grow without spilling to the disk. It grows the memory directly
- /// so it should be only used when the consumer already allocated the
- /// memory and it is safe to grow without spilling.
- fn grow(&self, required: usize) {
- self.memory_manager().record_free_then_acquire(0, required);
- }
-
- /// Return `freed` memory to the memory manager,
- /// may wake up other requesters waiting for their minimum memory quota.
- fn shrink(&self, freed: usize) {
- self.memory_manager().record_free(freed);
- }
-
- /// Spill in-memory buffers to disk, free memory, return the previous used
- async fn spill(&self) -> Result<usize>;
-
- /// Current memory used by this consumer
- fn mem_used(&self) -> usize;
}
-impl Debug for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(
- f,
- "{}[{}]: {}",
- self.name(),
- self.id(),
- human_readable_size(self.mem_used())
- )
- }
-}
-
-impl Display for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}[{}]", self.name(), self.id(),)
- }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory
and RuntimeConfig.memory_fraction (float64 between 0..1).
- The actual max memory DataFusion could use `pool_size = max_memory *
memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory
Consumers'. Operators or others are encouraged to
- register themselves to the memory manager and report its usage through
`mem_used()`.
-3. There are two kinds of consumers:
- - 'Requesting' consumers that would acquire memory during its execution and
release memory through `spill` if no more memory is available.
- - 'Tracking' consumers that exist for reporting purposes to provide a more
accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer
could acquire a maximum of
- (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
- Memory Space for the DataFusion Lib / Process of `pool_size`
-
┌──────────────────────────────────────────────z─────────────────────────────┐
- │ z
│
- │ z
│
- │ Requesting z Tracking
│
- │ Memory Consumers z Memory Consumers
│
- │ z
│
- │ z
│
-
└──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
+/// The memory manager maintains a fixed size pool of memory
+/// from which portions can be allocated
#[derive(Debug)]
pub struct MemoryManager {
- requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
- pool_size: usize,
- requesters_total: Arc<Mutex<usize>>,
- trackers_total: AtomicUsize,
- cv: Condvar,
+ state: Arc<MemoryManagerState>,
}
impl MemoryManager {
/// Create new memory manager based on the configuration
- #[allow(clippy::mutex_atomic)]
pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
- let pool_size = config.pool_size();
-
match config {
MemoryManagerConfig::Existing(manager) => manager,
- MemoryManagerConfig::New { .. } => {
+ MemoryManagerConfig::New {
+ max_memory,
+ memory_fraction,
+ } => {
+ let pool_size = (max_memory as f64 * memory_fraction) as usize;
debug!(
"Creating memory manager with initial size {}",
human_readable_size(pool_size)
);
Arc::new(Self {
- requesters: Arc::new(Mutex::new(HashSet::new())),
- pool_size,
- requesters_total: Arc::new(Mutex::new(0)),
- trackers_total: AtomicUsize::new(0),
- cv: Condvar::new(),
+ state: Arc::new(MemoryManagerState {
+ pool_size,
+ used: AtomicUsize::new(0),
+ }),
})
}
}
}
- fn get_tracker_total(&self) -> usize {
- self.trackers_total.load(Ordering::SeqCst)
- }
-
- pub(crate) fn grow_tracker_usage(&self, delta: usize) {
- self.trackers_total.fetch_add(delta, Ordering::SeqCst);
- }
-
- pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
- let update =
- self.trackers_total
- .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
- if x >= delta {
- Some(x - delta)
- } else {
- None
- }
- });
- update.unwrap_or_else(|_| {
- panic!(
- "Tracker total memory shrink by {} underflow, current value is
",
- delta
- )
- });
+ /// Returns the maximum pool size
+ ///
+ /// Note: this can be less than the amount allocated as a result of
[`MemoryManager::allocate`]
+ pub fn pool_size(&self) -> usize {
+ self.state.pool_size
}
- /// Return the total memory usage for all requesters
- pub fn get_requester_total(&self) -> usize {
- *self.requesters_total.lock()
+ /// Returns the number of allocated bytes
+ ///
+ /// Note: this can exceed the pool size as a result of
[`MemoryManager::allocate`]
+ pub fn allocated(&self) -> usize {
+ self.state.used.load(Ordering::Relaxed)
}
- /// Register a new memory requester
- pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
- self.requesters.lock().insert(requester_id.clone());
+ /// Returns a new empty allocation identified by `name`
+ pub fn new_tracked_allocation(&self, name: String) -> TrackedAllocation {
+ TrackedAllocation::new_empty(name, Arc::clone(&self.state))
}
+}
- fn max_mem_for_requesters(&self) -> usize {
- let trk_total = self.get_tracker_total();
- self.pool_size.saturating_sub(trk_total)
+impl std::fmt::Display for MemoryManager {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "MemoryManager(capacity: {}, allocated: {})",
+ human_readable_size(self.state.pool_size),
+ human_readable_size(self.allocated()),
+ )
}
+}
- /// Grow memory attempt from a consumer, return if we could grant that
much to it
- fn can_grow_directly(&self, required: usize, current: usize) -> bool {
- let num_rqt = self.requesters.lock().len();
- let mut rqt_current_used = self.requesters_total.lock();
- let mut rqt_max = self.max_mem_for_requesters();
-
- let granted;
- loop {
- let max_per_rqt = rqt_max / num_rqt;
- let min_per_rqt = max_per_rqt / 2;
-
- if required + current >= max_per_rqt {
- granted = false;
- break;
- }
+#[derive(Debug)]
+struct MemoryManagerState {
+ pool_size: usize,
+ used: AtomicUsize,
+}
- let remaining =
rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
- if remaining >= required {
- granted = true;
- *rqt_current_used += required;
- break;
- } else if current < min_per_rqt {
- // if we cannot acquire at lease 1/2n memory, just wait for
others
- // to spill instead spill self frequently with limited total
mem
- debug!(
- "Cannot acquire a minimum amount of {} memory from the
manager of total {}, waiting for others to spill ...",
- human_readable_size(min_per_rqt),
human_readable_size(self.pool_size));
- let now = Instant::now();
- self.cv.wait(&mut rqt_current_used);
- let elapsed = now.elapsed();
- if elapsed > Duration::from_secs(10) {
- warn!("Elapsed on waiting for spilling: {:.2?}", elapsed);
- }
- } else {
- granted = false;
- break;
- }
+/// A [`TrackedAllocation`] tracks a reservation of memory in a
[`MemoryManager`]
+/// that is freed back to the memory pool on drop
+#[derive(Debug)]
+pub struct TrackedAllocation {
+ name: String,
+ size: usize,
+ state: Arc<MemoryManagerState>,
+}
- rqt_max = self.max_mem_for_requesters();
+impl TrackedAllocation {
+ fn new_empty(name: String, state: Arc<MemoryManagerState>) -> Self {
+ Self {
+ name,
+ size: 0,
+ state,
}
+ }
- granted
+ /// Returns the size of this [`TrackedAllocation`] in bytes
+ pub fn size(&self) -> usize {
+ self.size
}
- fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
- let mut requesters_total = self.requesters_total.lock();
- debug!(
- "free_then_acquire: total {}, freed {}, acquired {}",
- human_readable_size(*requesters_total),
- human_readable_size(freed),
- human_readable_size(acquired)
- );
- assert!(*requesters_total >= freed);
- *requesters_total -= freed;
- *requesters_total += acquired;
- self.cv.notify_all();
+ /// Frees all bytes from this allocation returning the number of bytes
freed
+ pub fn free(&mut self) -> usize {
+ let size = self.size;
+ if size != 0 {
+ self.shrink(size)
+ }
+ size
+ }
+
+ /// Frees `capacity` bytes from this allocation
+ ///
+ /// # Panics
+ ///
+ /// Panics if `capacity` exceeds [`Self::size`]
+ pub fn shrink(&mut self, capacity: usize) {
+ let new_size = self.size.checked_sub(capacity).unwrap();
+ self.state.used.fetch_sub(capacity, Ordering::SeqCst);
+ self.size = new_size
+ }
+
+ /// Sets the size of this allocation to `capacity`
+ pub fn resize(&mut self, capacity: usize) {
+ use std::cmp::Ordering;
+ match capacity.cmp(&self.size) {
+ Ordering::Greater => self.grow(capacity - self.size),
+ Ordering::Less => self.shrink(self.size - capacity),
+ _ => {}
+ }
}
- fn record_free(&self, freed: usize) {
- let mut requesters_total = self.requesters_total.lock();
- debug!(
- "free: total {}, freed {}",
- human_readable_size(*requesters_total),
- human_readable_size(freed)
- );
- assert!(*requesters_total >= freed);
- *requesters_total -= freed;
- self.cv.notify_all();
+ /// Increase the size of this by `capacity` bytes
+ pub fn grow(&mut self, capacity: usize) {
+ self.state.used.fetch_add(capacity, Ordering::SeqCst);
+ self.size += capacity;
}
- /// Drop a memory consumer and reclaim the memory
- pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize)
{
- // find in requesters first
- {
- let mut requesters = self.requesters.lock();
- if requesters.remove(id) {
- let mut total = self.requesters_total.lock();
- assert!(*total >= mem_used);
- *total -= mem_used;
- self.cv.notify_all();
- return;
- }
- }
- self.shrink_tracker_usage(mem_used);
- self.cv.notify_all();
+ /// Try to increase the size of this [`TrackedAllocation`] by `capacity`
bytes
+ pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
+ let pool_size = self.state.pool_size;
+ self.state
+ .used
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |used| {
Review Comment:
❤️
##########
datafusion/core/tests/order_spill_fuzz.rs:
##########
@@ -110,12 +107,23 @@ async fn run_sort(pool_size: usize, size_spill:
Vec<(usize, bool)>) {
/// with randomized i32 content
fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
let mut rng = rand::thread_rng();
- let mut input: Vec<i32> = vec![0; len];
- rng.fill(&mut input[..]);
- let input = Int32Array::from_iter_values(input.into_iter());
-
- // split into several record batches
- let batch =
- RecordBatch::try_from_iter(vec![("x", Arc::new(input) as
ArrayRef)]).unwrap();
- stagger_batch_with_seed(batch, 42)
+ let max_batch = 1024;
Review Comment:
I believe stagger batches also adds zero length record batches, which was an
important edge case to cover as I recall
##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -97,339 +89,154 @@ impl MemoryManagerConfig {
memory_fraction,
})
}
-
- /// return the maximum size of the memory, in bytes, this config will allow
- fn pool_size(&self) -> usize {
- match self {
- MemoryManagerConfig::Existing(existing) => existing.pool_size,
- MemoryManagerConfig::New {
- max_memory,
- memory_fraction,
- } => (*max_memory as f64 * *memory_fraction) as usize,
- }
- }
-}
-
-fn next_id() -> usize {
- CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
- /// consumers that can grow its memory usage by requesting more from the
memory manager or
- /// shrinks its memory usage when we can no more assign available memory
to it.
- /// Examples are spillable sorter, spillable hashmap, etc.
- Requesting,
- /// consumers that are not spillable, counting in for only tracking
purpose.
- Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
- /// partition the consumer belongs to
- pub partition_id: usize,
- /// unique id
- pub id: usize,
-}
-
-impl MemoryConsumerId {
- /// Auto incremented new Id
- pub fn new(partition_id: usize) -> Self {
- let id = next_id();
- Self { partition_id, id }
- }
-}
-
-impl Display for MemoryConsumerId {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}:{}", self.partition_id, self.id)
- }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
- /// Display name of the consumer
- fn name(&self) -> String;
-
- /// Unique id of the consumer
- fn id(&self) -> &MemoryConsumerId;
-
- /// Ptr to MemoryManager
- fn memory_manager(&self) -> Arc<MemoryManager>;
-
- /// Partition that the consumer belongs to
- fn partition_id(&self) -> usize {
- self.id().partition_id
- }
-
- /// Type of the consumer
- fn type_(&self) -> &ConsumerType;
-
- /// Grow memory by `required` to buffer more data in memory,
- /// this may trigger spill before grow when the memory threshold is
- /// reached for this consumer.
- async fn try_grow(&self, required: usize) -> Result<()> {
- let current = self.mem_used();
- debug!(
- "trying to acquire {} whiling holding {} from consumer {}",
- human_readable_size(required),
- human_readable_size(current),
- self.id(),
- );
-
- let can_grow_directly =
- self.memory_manager().can_grow_directly(required, current);
- if !can_grow_directly {
- debug!(
- "Failed to grow memory of {} directly from consumer {},
spilling first ...",
- human_readable_size(required),
- self.id()
- );
- let freed = self.spill().await?;
- self.memory_manager()
- .record_free_then_acquire(freed, required);
- }
- Ok(())
- }
-
- /// Grow without spilling to the disk. It grows the memory directly
- /// so it should be only used when the consumer already allocated the
- /// memory and it is safe to grow without spilling.
- fn grow(&self, required: usize) {
- self.memory_manager().record_free_then_acquire(0, required);
- }
-
- /// Return `freed` memory to the memory manager,
- /// may wake up other requesters waiting for their minimum memory quota.
- fn shrink(&self, freed: usize) {
- self.memory_manager().record_free(freed);
- }
-
- /// Spill in-memory buffers to disk, free memory, return the previous used
- async fn spill(&self) -> Result<usize>;
-
- /// Current memory used by this consumer
- fn mem_used(&self) -> usize;
}
-impl Debug for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(
- f,
- "{}[{}]: {}",
- self.name(),
- self.id(),
- human_readable_size(self.mem_used())
- )
- }
-}
-
-impl Display for dyn MemoryConsumer {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- write!(f, "{}[{}]", self.name(), self.id(),)
- }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory
and RuntimeConfig.memory_fraction (float64 between 0..1).
- The actual max memory DataFusion could use `pool_size = max_memory *
memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory
Consumers'. Operators or others are encouraged to
- register themselves to the memory manager and report its usage through
`mem_used()`.
-3. There are two kinds of consumers:
- - 'Requesting' consumers that would acquire memory during its execution and
release memory through `spill` if no more memory is available.
- - 'Tracking' consumers that exist for reporting purposes to provide a more
accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer
could acquire a maximum of
- (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
- Memory Space for the DataFusion Lib / Process of `pool_size`
-
┌──────────────────────────────────────────────z─────────────────────────────┐
- │ z
│
- │ z
│
- │ Requesting z Tracking
│
- │ Memory Consumers z Memory Consumers
│
- │ z
│
- │ z
│
-
└──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
+/// The memory manager maintains a fixed size pool of memory
+/// from which portions can be allocated
#[derive(Debug)]
pub struct MemoryManager {
- requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
- pool_size: usize,
- requesters_total: Arc<Mutex<usize>>,
- trackers_total: AtomicUsize,
- cv: Condvar,
+ state: Arc<MemoryManagerState>,
}
impl MemoryManager {
/// Create new memory manager based on the configuration
- #[allow(clippy::mutex_atomic)]
pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
- let pool_size = config.pool_size();
-
match config {
MemoryManagerConfig::Existing(manager) => manager,
- MemoryManagerConfig::New { .. } => {
+ MemoryManagerConfig::New {
+ max_memory,
+ memory_fraction,
+ } => {
+ let pool_size = (max_memory as f64 * memory_fraction) as usize;
debug!(
"Creating memory manager with initial size {}",
human_readable_size(pool_size)
);
Arc::new(Self {
- requesters: Arc::new(Mutex::new(HashSet::new())),
- pool_size,
- requesters_total: Arc::new(Mutex::new(0)),
- trackers_total: AtomicUsize::new(0),
- cv: Condvar::new(),
+ state: Arc::new(MemoryManagerState {
+ pool_size,
+ used: AtomicUsize::new(0),
+ }),
})
}
}
}
- fn get_tracker_total(&self) -> usize {
- self.trackers_total.load(Ordering::SeqCst)
- }
-
- pub(crate) fn grow_tracker_usage(&self, delta: usize) {
- self.trackers_total.fetch_add(delta, Ordering::SeqCst);
- }
-
- pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
- let update =
- self.trackers_total
- .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
- if x >= delta {
- Some(x - delta)
- } else {
- None
- }
- });
- update.unwrap_or_else(|_| {
- panic!(
- "Tracker total memory shrink by {} underflow, current value is
",
- delta
- )
- });
+ /// Returns the maximum pool size
+ ///
+ /// Note: this can be less than the amount allocated as a result of
[`MemoryManager::allocate`]
+ pub fn pool_size(&self) -> usize {
+ self.state.pool_size
}
- /// Return the total memory usage for all requesters
- pub fn get_requester_total(&self) -> usize {
- *self.requesters_total.lock()
+ /// Returns the number of allocated bytes
+ ///
+ /// Note: this can exceed the pool size as a result of
[`MemoryManager::allocate`]
+ pub fn allocated(&self) -> usize {
+ self.state.used.load(Ordering::Relaxed)
}
- /// Register a new memory requester
- pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
- self.requesters.lock().insert(requester_id.clone());
+ /// Returns a new empty allocation identified by `name`
+ pub fn new_tracked_allocation(&self, name: String) -> TrackedAllocation {
+ TrackedAllocation::new_empty(name, Arc::clone(&self.state))
}
+}
- fn max_mem_for_requesters(&self) -> usize {
- let trk_total = self.get_tracker_total();
- self.pool_size.saturating_sub(trk_total)
+impl std::fmt::Display for MemoryManager {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(
+ f,
+ "MemoryManager(capacity: {}, allocated: {})",
+ human_readable_size(self.state.pool_size),
+ human_readable_size(self.allocated()),
+ )
}
+}
- /// Grow memory attempt from a consumer, return if we could grant that
much to it
- fn can_grow_directly(&self, required: usize, current: usize) -> bool {
- let num_rqt = self.requesters.lock().len();
- let mut rqt_current_used = self.requesters_total.lock();
- let mut rqt_max = self.max_mem_for_requesters();
-
- let granted;
- loop {
- let max_per_rqt = rqt_max / num_rqt;
- let min_per_rqt = max_per_rqt / 2;
-
- if required + current >= max_per_rqt {
- granted = false;
- break;
- }
+#[derive(Debug)]
+struct MemoryManagerState {
+ pool_size: usize,
+ used: AtomicUsize,
+}
- let remaining =
rqt_max.checked_sub(*rqt_current_used).unwrap_or_default();
- if remaining >= required {
- granted = true;
- *rqt_current_used += required;
- break;
- } else if current < min_per_rqt {
- // if we cannot acquire at lease 1/2n memory, just wait for
others
- // to spill instead spill self frequently with limited total
mem
- debug!(
- "Cannot acquire a minimum amount of {} memory from the
manager of total {}, waiting for others to spill ...",
- human_readable_size(min_per_rqt),
human_readable_size(self.pool_size));
- let now = Instant::now();
- self.cv.wait(&mut rqt_current_used);
- let elapsed = now.elapsed();
- if elapsed > Duration::from_secs(10) {
- warn!("Elapsed on waiting for spilling: {:.2?}", elapsed);
- }
- } else {
- granted = false;
- break;
- }
+/// A [`TrackedAllocation`] tracks a reservation of memory in a
[`MemoryManager`]
+/// that is freed back to the memory pool on drop
+#[derive(Debug)]
+pub struct TrackedAllocation {
+ name: String,
+ size: usize,
+ state: Arc<MemoryManagerState>,
+}
- rqt_max = self.max_mem_for_requesters();
+impl TrackedAllocation {
+ fn new_empty(name: String, state: Arc<MemoryManagerState>) -> Self {
+ Self {
+ name,
+ size: 0,
+ state,
}
+ }
- granted
+ /// Returns the size of this [`TrackedAllocation`] in bytes
+ pub fn size(&self) -> usize {
+ self.size
}
- fn record_free_then_acquire(&self, freed: usize, acquired: usize) {
- let mut requesters_total = self.requesters_total.lock();
- debug!(
- "free_then_acquire: total {}, freed {}, acquired {}",
- human_readable_size(*requesters_total),
- human_readable_size(freed),
- human_readable_size(acquired)
- );
- assert!(*requesters_total >= freed);
- *requesters_total -= freed;
- *requesters_total += acquired;
- self.cv.notify_all();
+ /// Frees all bytes from this allocation returning the number of bytes
freed
+ pub fn free(&mut self) -> usize {
+ let size = self.size;
+ if size != 0 {
+ self.shrink(size)
+ }
+ size
+ }
+
+ /// Frees `capacity` bytes from this allocation
+ ///
+ /// # Panics
+ ///
+ /// Panics if `capacity` exceeds [`Self::size`]
+ pub fn shrink(&mut self, capacity: usize) {
+ let new_size = self.size.checked_sub(capacity).unwrap();
+ self.state.used.fetch_sub(capacity, Ordering::SeqCst);
+ self.size = new_size
+ }
+
+ /// Sets the size of this allocation to `capacity`
+ pub fn resize(&mut self, capacity: usize) {
+ use std::cmp::Ordering;
+ match capacity.cmp(&self.size) {
+ Ordering::Greater => self.grow(capacity - self.size),
+ Ordering::Less => self.shrink(self.size - capacity),
+ _ => {}
+ }
}
- fn record_free(&self, freed: usize) {
- let mut requesters_total = self.requesters_total.lock();
- debug!(
- "free: total {}, freed {}",
- human_readable_size(*requesters_total),
- human_readable_size(freed)
- );
- assert!(*requesters_total >= freed);
- *requesters_total -= freed;
- self.cv.notify_all();
+ /// Increase the size of this by `capacity` bytes
+ pub fn grow(&mut self, capacity: usize) {
+ self.state.used.fetch_add(capacity, Ordering::SeqCst);
+ self.size += capacity;
}
- /// Drop a memory consumer and reclaim the memory
- pub(crate) fn drop_consumer(&self, id: &MemoryConsumerId, mem_used: usize)
{
- // find in requesters first
- {
- let mut requesters = self.requesters.lock();
- if requesters.remove(id) {
- let mut total = self.requesters_total.lock();
- assert!(*total >= mem_used);
- *total -= mem_used;
- self.cv.notify_all();
- return;
- }
- }
- self.shrink_tracker_usage(mem_used);
- self.cv.notify_all();
+ /// Try to increase the size of this [`TrackedAllocation`] by `capacity`
bytes
+ pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
+ let pool_size = self.state.pool_size;
+ self.state
+ .used
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |used| {
+ let new_used = used + capacity;
+ (new_used <= pool_size).then_some(new_used)
+ })
+ .map_err(|used|
DataFusionError::ResourcesExhausted(format!("Failed to allocate additional {}
bytes for {} with {} bytes already allocated - maximum available is {}",
capacity, self.name, self.size, used)))?;
Review Comment:
I think the order of the parameters is incorrect but I may be misreading it
```suggestion
.map_err(|used|
DataFusionError::ResourcesExhausted(format!("Failed to allocate additional {}
bytes for {} with {} bytes already allocated - maximum available is {}",
capacity, self.name, used, self.size)))?;
```
##########
datafusion/core/tests/memory_limit.rs:
##########
@@ -66,23 +68,21 @@ async fn group_by_hash() {
run_limit_test(
// group by dict column
"select count(*) from t GROUP BY service, host, pod, container",
- "Resources exhausted: Cannot spill GroupBy Hash Accumulators",
+ "Resources exhausted: Failed to allocate additional",
+ 1_000,
)
.await
}
/// 50 byte memory limit
-const MEMORY_LIMIT_BYTES: usize = 50;
const MEMORY_FRACTION: f64 = 0.95;
/// runs the specified query against 1000 rows with a 50
/// byte memory limit and no disk manager enabled.
-async fn run_limit_test(query: &str, expected_error: &str) {
- let generator = AccessLogGenerator::new().with_row_limit(Some(1000));
-
- let batches: Vec<RecordBatch> = generator
- // split up into more than one batch, as the size limit in sort is not
enforced until the second batch
- .flat_map(stagger_batch)
Review Comment:
I originally put in `stagger_batch` as sort didn't check memory limits on
the first batch. -- this change makes sense to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]