yjshen commented on code in PR #4522:
URL: https://github.com/apache/arrow-datafusion/pull/4522#discussion_r1051515241
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
// We don't have to call try_grow here, since we have
already used the
// memory (so spilling right here wouldn't help at all for
the current
// operation). But we still have to record it so that
other requesters
- // would know about this unexpected increase in memory
consuption.
+ // would know about this unexpected increase in memory
consumption.
let new_size_delta = new_size - size;
- self.grow(new_size_delta);
+ self.allocation.grow(new_size_delta);
self.metrics.mem_used().add(new_size_delta);
}
Ordering::Less => {
let size_delta = size - new_size;
- self.shrink(size_delta);
+ self.allocation.shrink(size_delta);
self.metrics.mem_used().sub(size_delta);
}
Ordering::Equal => {}
}
- in_mem_batches.push(partial);
+ self.in_mem_batches.push(partial);
}
Ok(())
}
async fn spilled_before(&self) -> bool {
Review Comment:
```suggestion
fn spilled_before(&self) -> bool {
```
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
// We don't have to call try_grow here, since we have
already used the
// memory (so spilling right here wouldn't help at all for
the current
// operation). But we still have to record it so that
other requesters
- // would know about this unexpected increase in memory
consuption.
+ // would know about this unexpected increase in memory
consumption.
let new_size_delta = new_size - size;
- self.grow(new_size_delta);
+ self.allocation.grow(new_size_delta);
self.metrics.mem_used().add(new_size_delta);
}
Ordering::Less => {
let size_delta = size - new_size;
- self.shrink(size_delta);
+ self.allocation.shrink(size_delta);
self.metrics.mem_used().sub(size_delta);
}
Ordering::Equal => {}
}
- in_mem_batches.push(partial);
+ self.in_mem_batches.push(partial);
}
Ok(())
}
async fn spilled_before(&self) -> bool {
- let spills = self.spills.lock().await;
- !spills.is_empty()
+ !self.spills.is_empty()
}
/// MergeSort in mem batches as well as spills into total order with
`SortPreservingMergeStream`.
- async fn sort(&self) -> Result<SendableRecordBatchStream> {
- let partition = self.partition_id();
+ async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
Review Comment:
```suggestion
fn sort(&mut self) -> Result<SendableRecordBatchStream> {
```
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
// We don't have to call try_grow here, since we have
already used the
// memory (so spilling right here wouldn't help at all for
the current
// operation). But we still have to record it so that
other requesters
- // would know about this unexpected increase in memory
consuption.
+ // would know about this unexpected increase in memory
consumption.
let new_size_delta = new_size - size;
- self.grow(new_size_delta);
+ self.allocation.grow(new_size_delta);
self.metrics.mem_used().add(new_size_delta);
}
Ordering::Less => {
let size_delta = size - new_size;
- self.shrink(size_delta);
+ self.allocation.shrink(size_delta);
self.metrics.mem_used().sub(size_delta);
}
Ordering::Equal => {}
}
- in_mem_batches.push(partial);
+ self.in_mem_batches.push(partial);
}
Ok(())
}
async fn spilled_before(&self) -> bool {
Review Comment:
And we could inline and remove this function
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -136,94 +144,83 @@ impl ExternalSorter {
// We don't have to call try_grow here, since we have
already used the
// memory (so spilling right here wouldn't help at all for
the current
// operation). But we still have to record it so that
other requesters
- // would know about this unexpected increase in memory
consuption.
+ // would know about this unexpected increase in memory
consumption.
let new_size_delta = new_size - size;
- self.grow(new_size_delta);
+ self.allocation.grow(new_size_delta);
self.metrics.mem_used().add(new_size_delta);
}
Ordering::Less => {
let size_delta = size - new_size;
- self.shrink(size_delta);
+ self.allocation.shrink(size_delta);
self.metrics.mem_used().sub(size_delta);
}
Ordering::Equal => {}
}
- in_mem_batches.push(partial);
+ self.in_mem_batches.push(partial);
}
Ok(())
}
async fn spilled_before(&self) -> bool {
- let spills = self.spills.lock().await;
- !spills.is_empty()
+ !self.spills.is_empty()
}
/// MergeSort in mem batches as well as spills into total order with
`SortPreservingMergeStream`.
- async fn sort(&self) -> Result<SendableRecordBatchStream> {
- let partition = self.partition_id();
+ async fn sort(&mut self) -> Result<SendableRecordBatchStream> {
let batch_size = self.session_config.batch_size();
- let mut in_mem_batches = self.in_mem_batches.lock().await;
if self.spilled_before().await {
Review Comment:
```suggestion
if self.spilled_before() {
```
##########
datafusion/core/src/execution/runtime_env.rs:
##########
@@ -172,9 +155,9 @@ impl RuntimeConfig {
self
}
- /// Customize memory manager
- pub fn with_memory_manager(mut self, memory_manager: MemoryManagerConfig)
-> Self {
- self.memory_manager = memory_manager;
+ /// Customize memory policy
+ pub fn with_memory_policy(mut self, memory_pool: Arc<dyn MemoryPool>) ->
Self {
Review Comment:
We could eliminate the word `policy` and explain that different pool comes
with different allocation policies.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]