yjshen commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842252967
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -105,13 +107,21 @@ impl ExternalSorter {
}
}
- async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+ async fn insert_batch(
+ &self,
+ input: RecordBatch,
+ tracking_metrics: &MemTrackingMetrics,
+ ) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
self.try_grow(size).await?;
self.metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
- in_mem_batches.push(input);
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ let partial = sort_batch(input, self.schema.clone(), &self.expr)?;
Review Comment:
Yes, I think the performance gains come from two folds:
- Sort and reorder the batch in the same thread while it would still be in
the cache, as you mentioned.
I think the other one is the memory access pattern for the final output
phase. We are serially accessing columns for each batch. So the "sort order
materializing we done for each incoming column" changes "purely" randomized
collecting to sequentially accessing each column from all the batches, and
yields a better cache behavior.
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -105,13 +107,21 @@ impl ExternalSorter {
}
}
- async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+ async fn insert_batch(
+ &self,
+ input: RecordBatch,
+ tracking_metrics: &MemTrackingMetrics,
+ ) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
self.try_grow(size).await?;
self.metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
- in_mem_batches.push(input);
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ let partial = sort_batch(input, self.schema.clone(), &self.expr)?;
Review Comment:
Yes, I think the performance gains come from two folds:
- Sort and reorder the batch in the same thread while it would still be in
the cache, as you mentioned.
- I think the other one is the memory access pattern for the final output
phase. We are serially accessing columns for each batch. So the "sort order
materializing we done for each incoming column" changes "purely" randomized
collecting to sequentially accessing each column from all the batches, and
yields a better cache behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]