tustvold commented on code in PR #4522:
URL: https://github.com/apache/arrow-datafusion/pull/4522#discussion_r1041024552


##########
datafusion/core/src/execution/memory_manager/mod.rs:
##########
@@ -97,339 +89,154 @@ impl MemoryManagerConfig {
             memory_fraction,
         })
     }
-
-    /// return the maximum size of the memory, in bytes, this config will allow
-    fn pool_size(&self) -> usize {
-        match self {
-            MemoryManagerConfig::Existing(existing) => existing.pool_size,
-            MemoryManagerConfig::New {
-                max_memory,
-                memory_fraction,
-            } => (*max_memory as f64 * *memory_fraction) as usize,
-        }
-    }
-}
-
-fn next_id() -> usize {
-    CONSUMER_ID.fetch_add(1, Ordering::SeqCst)
-}
-
-/// Type of the memory consumer
-pub enum ConsumerType {
-    /// consumers that can grow its memory usage by requesting more from the 
memory manager or
-    /// shrinks its memory usage when we can no more assign available memory 
to it.
-    /// Examples are spillable sorter, spillable hashmap, etc.
-    Requesting,
-    /// consumers that are not spillable, counting in for only tracking 
purpose.
-    Tracking,
-}
-
-#[derive(Clone, Debug, Hash, Eq, PartialEq)]
-/// Id that uniquely identifies a Memory Consumer
-pub struct MemoryConsumerId {
-    /// partition the consumer belongs to
-    pub partition_id: usize,
-    /// unique id
-    pub id: usize,
-}
-
-impl MemoryConsumerId {
-    /// Auto incremented new Id
-    pub fn new(partition_id: usize) -> Self {
-        let id = next_id();
-        Self { partition_id, id }
-    }
-}
-
-impl Display for MemoryConsumerId {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(f, "{}:{}", self.partition_id, self.id)
-    }
-}
-
-#[async_trait]
-/// A memory consumer that either takes up memory (of type 
`ConsumerType::Tracking`)
-/// or grows/shrinks memory usage based on available memory (of type 
`ConsumerType::Requesting`).
-pub trait MemoryConsumer: Send + Sync {
-    /// Display name of the consumer
-    fn name(&self) -> String;
-
-    /// Unique id of the consumer
-    fn id(&self) -> &MemoryConsumerId;
-
-    /// Ptr to MemoryManager
-    fn memory_manager(&self) -> Arc<MemoryManager>;
-
-    /// Partition that the consumer belongs to
-    fn partition_id(&self) -> usize {
-        self.id().partition_id
-    }
-
-    /// Type of the consumer
-    fn type_(&self) -> &ConsumerType;
-
-    /// Grow memory by `required` to buffer more data in memory,
-    /// this may trigger spill before grow when the memory threshold is
-    /// reached for this consumer.
-    async fn try_grow(&self, required: usize) -> Result<()> {
-        let current = self.mem_used();
-        debug!(
-            "trying to acquire {} whiling holding {} from consumer {}",
-            human_readable_size(required),
-            human_readable_size(current),
-            self.id(),
-        );
-
-        let can_grow_directly =
-            self.memory_manager().can_grow_directly(required, current);
-        if !can_grow_directly {
-            debug!(
-                "Failed to grow memory of {} directly from consumer {}, 
spilling first ...",
-                human_readable_size(required),
-                self.id()
-            );
-            let freed = self.spill().await?;
-            self.memory_manager()
-                .record_free_then_acquire(freed, required);
-        }
-        Ok(())
-    }
-
-    /// Grow without spilling to the disk. It grows the memory directly
-    /// so it should be only used when the consumer already allocated the
-    /// memory and it is safe to grow without spilling.
-    fn grow(&self, required: usize) {
-        self.memory_manager().record_free_then_acquire(0, required);
-    }
-
-    /// Return `freed` memory to the memory manager,
-    /// may wake up other requesters waiting for their minimum memory quota.
-    fn shrink(&self, freed: usize) {
-        self.memory_manager().record_free(freed);
-    }
-
-    /// Spill in-memory buffers to disk, free memory, return the previous used
-    async fn spill(&self) -> Result<usize>;
-
-    /// Current memory used by this consumer
-    fn mem_used(&self) -> usize;
 }
 
-impl Debug for dyn MemoryConsumer {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(
-            f,
-            "{}[{}]: {}",
-            self.name(),
-            self.id(),
-            human_readable_size(self.mem_used())
-        )
-    }
-}
-
-impl Display for dyn MemoryConsumer {
-    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
-        write!(f, "{}[{}]", self.name(), self.id(),)
-    }
-}
-
-/*
-The memory management architecture is the following:
-
-1. User designates max execution memory by setting RuntimeConfig.max_memory 
and RuntimeConfig.memory_fraction (float64 between 0..1).
-   The actual max memory DataFusion could use `pool_size =  max_memory * 
memory_fraction`.
-2. The entities that take up memory during its execution are called 'Memory 
Consumers'. Operators or others are encouraged to
-   register themselves to the memory manager and report its usage through 
`mem_used()`.
-3. There are two kinds of consumers:
-   - 'Requesting' consumers that would acquire memory during its execution and 
release memory through `spill` if no more memory is available.
-   - 'Tracking' consumers that exist for reporting purposes to provide a more 
accurate memory usage estimation for memory consumers.
-4. Requesting and tracking consumers share the pool. Each controlling consumer 
could acquire a maximum of
-   (pool_size - all_tracking_used) / active_num_controlling_consumers.
-
-            Memory Space for the DataFusion Lib / Process of `pool_size`
-   
┌──────────────────────────────────────────────z─────────────────────────────┐
-   │                                              z                            
 │
-   │                                              z                            
 │
-   │               Requesting                     z          Tracking          
 │
-   │            Memory Consumers                  z       Memory Consumers     
 │
-   │                                              z                            
 │
-   │                                              z                            
 │
-   
└──────────────────────────────────────────────z─────────────────────────────┘
-*/
-
-/// Manage memory usage during physical plan execution
+/// The memory manager maintains a fixed size pool of memory
+/// from which portions can be allocated
 #[derive(Debug)]
 pub struct MemoryManager {
-    requesters: Arc<Mutex<HashSet<MemoryConsumerId>>>,
-    pool_size: usize,
-    requesters_total: Arc<Mutex<usize>>,
-    trackers_total: AtomicUsize,
-    cv: Condvar,
+    state: Arc<MemoryManagerState>,
 }
 
 impl MemoryManager {
     /// Create new memory manager based on the configuration
-    #[allow(clippy::mutex_atomic)]
     pub fn new(config: MemoryManagerConfig) -> Arc<Self> {
-        let pool_size = config.pool_size();
-
         match config {
             MemoryManagerConfig::Existing(manager) => manager,
-            MemoryManagerConfig::New { .. } => {
+            MemoryManagerConfig::New {
+                max_memory,
+                memory_fraction,
+            } => {
+                let pool_size = (max_memory as f64 * memory_fraction) as usize;
                 debug!(
                     "Creating memory manager with initial size {}",
                     human_readable_size(pool_size)
                 );
 
                 Arc::new(Self {
-                    requesters: Arc::new(Mutex::new(HashSet::new())),
-                    pool_size,
-                    requesters_total: Arc::new(Mutex::new(0)),
-                    trackers_total: AtomicUsize::new(0),
-                    cv: Condvar::new(),
+                    state: Arc::new(MemoryManagerState {
+                        pool_size,
+                        used: AtomicUsize::new(0),
+                    }),
                 })
             }
         }
     }
 
-    fn get_tracker_total(&self) -> usize {
-        self.trackers_total.load(Ordering::SeqCst)
-    }
-
-    pub(crate) fn grow_tracker_usage(&self, delta: usize) {
-        self.trackers_total.fetch_add(delta, Ordering::SeqCst);
-    }
-
-    pub(crate) fn shrink_tracker_usage(&self, delta: usize) {
-        let update =
-            self.trackers_total
-                .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
-                    if x >= delta {
-                        Some(x - delta)
-                    } else {
-                        None
-                    }
-                });
-        update.unwrap_or_else(|_| {
-            panic!(
-                "Tracker total memory shrink by {} underflow, current value is 
",
-                delta
-            )
-        });
+    /// Returns the maximum pool size
+    ///
+    /// Note: this can be less than the amount allocated as a result of 
[`MemoryManager::allocate`]
+    pub fn pool_size(&self) -> usize {
+        self.state.pool_size
     }
 
-    /// Return the total memory usage for all requesters
-    pub fn get_requester_total(&self) -> usize {
-        *self.requesters_total.lock()
+    /// Returns the number of allocated bytes
+    ///
+    /// Note: this can exceed the pool size as a result of 
[`MemoryManager::allocate`]
+    pub fn allocated(&self) -> usize {
+        self.state.used.load(Ordering::Relaxed)
     }
 
-    /// Register a new memory requester
-    pub(crate) fn register_requester(&self, requester_id: &MemoryConsumerId) {
-        self.requesters.lock().insert(requester_id.clone());
+    /// Returns a new empty allocation identified by `name`
+    pub fn new_tracked_allocation(&self, name: String) -> TrackedAllocation {

Review Comment:
   I think I would rather punt on this until we have such a system in place, I 
am skeptical that such a system would make use of IDs a not just an Arc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to