yjshen commented on code in PR #4522:
URL: https://github.com/apache/arrow-datafusion/pull/4522#discussion_r1051515241


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
                     // We don't have to call try_grow here, since we have 
already used the
                     // memory (so spilling right here wouldn't help at all for 
the current
                     // operation). But we still have to record it so that 
other requesters
-                    // would know about this unexpected increase in memory 
consuption.
+                    // would know about this unexpected increase in memory 
consumption.
                     let new_size_delta = new_size - size;
-                    self.grow(new_size_delta);
+                    self.allocation.grow(new_size_delta);
                     self.metrics.mem_used().add(new_size_delta);
                 }
                 Ordering::Less => {
                     let size_delta = size - new_size;
-                    self.shrink(size_delta);
+                    self.allocation.shrink(size_delta);
                     self.metrics.mem_used().sub(size_delta);
                 }
                 Ordering::Equal => {}
             }
-            in_mem_batches.push(partial);
+            self.in_mem_batches.push(partial);
         }
         Ok(())
     }
 
     async fn spilled_before(&self) -> bool {

Review Comment:
   ```suggestion
       fn spilled_before(&self) -> bool {
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
                     // We don't have to call try_grow here, since we have 
already used the
                     // memory (so spilling right here wouldn't help at all for 
the current
                     // operation). But we still have to record it so that 
other requesters
-                    // would know about this unexpected increase in memory 
consuption.
+                    // would know about this unexpected increase in memory 
consumption.
                     let new_size_delta = new_size - size;
-                    self.grow(new_size_delta);
+                    self.allocation.grow(new_size_delta);
                     self.metrics.mem_used().add(new_size_delta);
                 }
                 Ordering::Less => {
                     let size_delta = size - new_size;
-                    self.shrink(size_delta);
+                    self.allocation.shrink(size_delta);
                     self.metrics.mem_used().sub(size_delta);
                 }
                 Ordering::Equal => {}
             }
-            in_mem_batches.push(partial);
+            self.in_mem_batches.push(partial);
         }
         Ok(())
     }
 
     async fn spilled_before(&self) -> bool {
-        let spills = self.spills.lock().await;
-        !spills.is_empty()
+        !self.spills.is_empty()
     }
 
     /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
-    async fn sort(&self) -> Result<SendableRecordBatchStream> {
-        let partition = self.partition_id();
+    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {

Review Comment:
   ```suggestion
       fn sort(&mut self) -> Result<SendableRecordBatchStream> {
   ```



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
                     // We don't have to call try_grow here, since we have 
already used the
                     // memory (so spilling right here wouldn't help at all for 
the current
                     // operation). But we still have to record it so that 
other requesters
-                    // would know about this unexpected increase in memory 
consuption.
+                    // would know about this unexpected increase in memory 
consumption.
                     let new_size_delta = new_size - size;
-                    self.grow(new_size_delta);
+                    self.allocation.grow(new_size_delta);
                     self.metrics.mem_used().add(new_size_delta);
                 }
                 Ordering::Less => {
                     let size_delta = size - new_size;
-                    self.shrink(size_delta);
+                    self.allocation.shrink(size_delta);
                     self.metrics.mem_used().sub(size_delta);
                 }
                 Ordering::Equal => {}
             }
-            in_mem_batches.push(partial);
+            self.in_mem_batches.push(partial);
         }
         Ok(())
     }
 
     async fn spilled_before(&self) -> bool {

Review Comment:
   And we could inline and remove this function



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
                     // We don't have to call try_grow here, since we have 
already used the
                     // memory (so spilling right here wouldn't help at all for 
the current
                     // operation). But we still have to record it so that 
other requesters
-                    // would know about this unexpected increase in memory 
consuption.
+                    // would know about this unexpected increase in memory 
consumption.
                     let new_size_delta = new_size - size;
-                    self.grow(new_size_delta);
+                    self.allocation.grow(new_size_delta);
                     self.metrics.mem_used().add(new_size_delta);
                 }
                 Ordering::Less => {
                     let size_delta = size - new_size;
-                    self.shrink(size_delta);
+                    self.allocation.shrink(size_delta);
                     self.metrics.mem_used().sub(size_delta);
                 }
                 Ordering::Equal => {}
             }
-            in_mem_batches.push(partial);
+            self.in_mem_batches.push(partial);
         }
         Ok(())
     }
 
     async fn spilled_before(&self) -> bool {
-        let spills = self.spills.lock().await;
-        !spills.is_empty()
+        !self.spills.is_empty()
     }
 
     /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
-    async fn sort(&self) -> Result<SendableRecordBatchStream> {
-        let partition = self.partition_id();
+    async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
         let batch_size = self.session_config.batch_size();
-        let mut in_mem_batches = self.in_mem_batches.lock().await;
 
         if self.spilled_before().await {

Review Comment:
   ```suggestion
           if self.spilled_before() {
   ```



##########
datafusion/core/src/execution/runtime_env.rs:
##########
@@ -172,9 +155,9 @@ impl RuntimeConfig {
         self
     }
 
-    /// Customize memory manager
-    pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig) 
-> Self {
-        self.memory_manager = memory_manager;
+    /// Customize memory policy
+    pub fn with_memory_policy(mut self, memory_pool: Arc<dyn MemoryPool>) -> 
Self {

Review Comment:
   We could eliminate the word `policy` and explain that different pool comes 
with different allocation policies.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to