yjshen commented on code in PR #4522: URL: https://github.com/apache/arrow-datafusion/pull/4522#discussion_r1051515241
########## datafusion/core/src/physical_plan/sorts/sort.rs: ########## @@ -136,94 +144,83 @@ impl ExternalSorter { // We don't have to call try_grow here, since we have already used the // memory (so spilling right here wouldn't help at all for the current // operation). But we still have to record it so that other requesters - // would know about this unexpected increase in memory consuption. + // would know about this unexpected increase in memory consumption. let new_size_delta = new_size - size; - self.grow(new_size_delta); + self.allocation.grow(new_size_delta); self.metrics.mem_used().add(new_size_delta); } Ordering::Less => { let size_delta = size - new_size; - self.shrink(size_delta); + self.allocation.shrink(size_delta); self.metrics.mem_used().sub(size_delta); } Ordering::Equal => {} } - in_mem_batches.push(partial); + self.in_mem_batches.push(partial); } Ok(()) } async fn spilled_before(&self) -> bool { Review Comment: ```suggestion fn spilled_before(&self) -> bool { ``` ########## datafusion/core/src/physical_plan/sorts/sort.rs: ########## @@ -136,94 +144,83 @@ impl ExternalSorter { // We don't have to call try_grow here, since we have already used the // memory (so spilling right here wouldn't help at all for the current // operation). But we still have to record it so that other requesters - // would know about this unexpected increase in memory consuption. + // would know about this unexpected increase in memory consumption. let new_size_delta = new_size - size; - self.grow(new_size_delta); + self.allocation.grow(new_size_delta); self.metrics.mem_used().add(new_size_delta); } Ordering::Less => { let size_delta = size - new_size; - self.shrink(size_delta); + self.allocation.shrink(size_delta); self.metrics.mem_used().sub(size_delta); } Ordering::Equal => {} } - in_mem_batches.push(partial); + self.in_mem_batches.push(partial); } Ok(()) } async fn spilled_before(&self) -> bool { - let spills = self.spills.lock().await; - !spills.is_empty() + !self.spills.is_empty() } /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - async fn sort(&self) -> Result<SendableRecordBatchStream> { - let partition = self.partition_id(); + async fn sort(&mut self) -> Result<SendableRecordBatchStream> { Review Comment: ```suggestion fn sort(&mut self) -> Result<SendableRecordBatchStream> { ``` ########## datafusion/core/src/physical_plan/sorts/sort.rs: ########## @@ -136,94 +144,83 @@ impl ExternalSorter { // We don't have to call try_grow here, since we have already used the // memory (so spilling right here wouldn't help at all for the current // operation). But we still have to record it so that other requesters - // would know about this unexpected increase in memory consuption. + // would know about this unexpected increase in memory consumption. let new_size_delta = new_size - size; - self.grow(new_size_delta); + self.allocation.grow(new_size_delta); self.metrics.mem_used().add(new_size_delta); } Ordering::Less => { let size_delta = size - new_size; - self.shrink(size_delta); + self.allocation.shrink(size_delta); self.metrics.mem_used().sub(size_delta); } Ordering::Equal => {} } - in_mem_batches.push(partial); + self.in_mem_batches.push(partial); } Ok(()) } async fn spilled_before(&self) -> bool { Review Comment: And we could inline and remove this function ########## datafusion/core/src/physical_plan/sorts/sort.rs: ########## @@ -136,94 +144,83 @@ impl ExternalSorter { // We don't have to call try_grow here, since we have already used the // memory (so spilling right here wouldn't help at all for the current // operation). But we still have to record it so that other requesters - // would know about this unexpected increase in memory consuption. + // would know about this unexpected increase in memory consumption. let new_size_delta = new_size - size; - self.grow(new_size_delta); + self.allocation.grow(new_size_delta); self.metrics.mem_used().add(new_size_delta); } Ordering::Less => { let size_delta = size - new_size; - self.shrink(size_delta); + self.allocation.shrink(size_delta); self.metrics.mem_used().sub(size_delta); } Ordering::Equal => {} } - in_mem_batches.push(partial); + self.in_mem_batches.push(partial); } Ok(()) } async fn spilled_before(&self) -> bool { - let spills = self.spills.lock().await; - !spills.is_empty() + !self.spills.is_empty() } /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - async fn sort(&self) -> Result<SendableRecordBatchStream> { - let partition = self.partition_id(); + async fn sort(&mut self) -> Result<SendableRecordBatchStream> { let batch_size = self.session_config.batch_size(); - let mut in_mem_batches = self.in_mem_batches.lock().await; if self.spilled_before().await { Review Comment: ```suggestion if self.spilled_before() { ``` ########## datafusion/core/src/execution/runtime_env.rs: ########## @@ -172,9 +155,9 @@ impl RuntimeConfig { self } - /// Customize memory manager - pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig) -> Self { - self.memory_manager = memory_manager; + /// Customize memory policy + pub fn with_memory_policy(mut self, memory_pool: Arc<dyn MemoryPool>) -> Self { Review Comment: We could eliminate the word `policy` and explain that different pool comes with different allocation policies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org