alamb commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842072399
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
/// consume the non-empty `sorted_bathes` and do in_mem_sort
fn in_mem_partial_sort(
- buffered_batches: &mut Vec<RecordBatch>,
+ buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
+ batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
+ if buffered_batches.len() == 1 {
+ let result = buffered_batches.pop();
+ Ok(Box::pin(SizedRecordBatchStream::new(
+ schema,
+ vec![Arc::new(result.unwrap().sorted_batch)],
+ tracking_metrics,
+ )))
+ } else {
+ let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+ buffered_batches
+ .drain(..)
+ .into_iter()
+ .map(|b| {
+ let BatchWithSortArray {
+ sort_arrays,
+ sorted_batch: batch,
+ } = b;
+ (sort_arrays, batch)
+ })
+ .unzip();
+
+ let sorted_iter = {
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+ };
+ Ok(Box::pin(SortedSizedRecordBatchStream::new(
+ schema,
+ batches,
+ sorted_iter,
+ tracking_metrics,
+ )))
+ }
+}
- let result = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+ batch_idx: u32,
+ row_idx: u32,
+}
- let pre_sort = if buffered_batches.len() == 1 {
- buffered_batches.pop()
- } else {
- let batches = buffered_batches.drain(..).collect::<Vec<_>>();
- // combine all record batches into one for each column
- common::combine_batches(&batches, schema.clone())?
- };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+ sort_arrays: &[Vec<ArrayRef>],
+ expr: &[PhysicalSortExpr],
+ batch_size: usize,
+) -> Result<SortedIterator> {
+ let row_indices = sort_arrays
+ .iter()
+ .enumerate()
+ .flat_map(|(i, arrays)| {
+ (0..arrays[0].len())
+ .map(|r| CompositeIndex {
+ // since we original use UInt32Array to index the combined
mono batch,
+ // component record batches won't overflow as well,
Review Comment:
👍 yeah I agree this approach is no more prone to overflow than the
implementation on `master`
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
/// consume the non-empty `sorted_bathes` and do in_mem_sort
fn in_mem_partial_sort(
- buffered_batches: &mut Vec<RecordBatch>,
+ buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
+ batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
+ if buffered_batches.len() == 1 {
+ let result = buffered_batches.pop();
+ Ok(Box::pin(SizedRecordBatchStream::new(
+ schema,
+ vec![Arc::new(result.unwrap().sorted_batch)],
+ tracking_metrics,
+ )))
+ } else {
+ let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+ buffered_batches
+ .drain(..)
+ .into_iter()
+ .map(|b| {
+ let BatchWithSortArray {
+ sort_arrays,
+ sorted_batch: batch,
+ } = b;
+ (sort_arrays, batch)
+ })
+ .unzip();
+
+ let sorted_iter = {
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+ };
+ Ok(Box::pin(SortedSizedRecordBatchStream::new(
+ schema,
+ batches,
+ sorted_iter,
+ tracking_metrics,
+ )))
+ }
+}
- let result = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+ batch_idx: u32,
+ row_idx: u32,
+}
- let pre_sort = if buffered_batches.len() == 1 {
- buffered_batches.pop()
- } else {
- let batches = buffered_batches.drain(..).collect::<Vec<_>>();
- // combine all record batches into one for each column
- common::combine_batches(&batches, schema.clone())?
- };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+ sort_arrays: &[Vec<ArrayRef>],
+ expr: &[PhysicalSortExpr],
+ batch_size: usize,
+) -> Result<SortedIterator> {
+ let row_indices = sort_arrays
+ .iter()
+ .enumerate()
+ .flat_map(|(i, arrays)| {
+ (0..arrays[0].len())
+ .map(|r| CompositeIndex {
+ // since we original use UInt32Array to index the combined
mono batch,
+ // component record batches won't overflow as well,
+ // use u32 here for space efficiency.
+ batch_idx: i as u32,
+ row_idx: r as u32,
+ })
+ .collect::<Vec<CompositeIndex>>()
+ })
+ .collect::<Vec<CompositeIndex>>();
+
+ let sort_columns = expr
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| {
+ let columns_i = sort_arrays
+ .iter()
+ .map(|cs| cs[i].as_ref())
+ .collect::<Vec<&dyn Array>>();
+ Ok(SortColumn {
+ values: concat(columns_i.as_slice())?,
+ options: Some(expr.options),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let indices = lexsort_to_indices(&sort_columns, None)?;
- pre_sort
- .map(|batch| sort_batch(batch, schema.clone(), expressions))
- .transpose()?
- };
+ Ok(SortedIterator::new(indices, row_indices, batch_size))
+}
- Ok(Box::pin(SizedRecordBatchStream::new(
- schema,
- vec![Arc::new(result.unwrap())],
- tracking_metrics,
- )))
+struct SortedIterator {
+ pos: usize,
+ indices: UInt32Array,
+ composite: Vec<CompositeIndex>,
+ batch_size: usize,
+ length: usize,
+}
Review Comment:
```suggestion
struct SortedIterator {
/// Current logical position in the iterator
pos: usize,
/// Indexes into the input representing the correctly sorted total output
indices: UInt32Array,
/// Map each each logical input index to where it can be found in the
sorted input batches
composite: Vec<CompositeIndex>,
/// Maximum batch size to produce
batch_size: usize,
/// total length of the iterator
length: usize,
}
```
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
/// consume the non-empty `sorted_bathes` and do in_mem_sort
fn in_mem_partial_sort(
- buffered_batches: &mut Vec<RecordBatch>,
+ buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
+ batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
+ if buffered_batches.len() == 1 {
+ let result = buffered_batches.pop();
+ Ok(Box::pin(SizedRecordBatchStream::new(
+ schema,
+ vec![Arc::new(result.unwrap().sorted_batch)],
+ tracking_metrics,
+ )))
+ } else {
+ let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+ buffered_batches
+ .drain(..)
+ .into_iter()
+ .map(|b| {
+ let BatchWithSortArray {
+ sort_arrays,
+ sorted_batch: batch,
+ } = b;
+ (sort_arrays, batch)
+ })
+ .unzip();
+
+ let sorted_iter = {
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+ };
+ Ok(Box::pin(SortedSizedRecordBatchStream::new(
+ schema,
+ batches,
+ sorted_iter,
+ tracking_metrics,
+ )))
+ }
+}
- let result = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+ batch_idx: u32,
+ row_idx: u32,
+}
- let pre_sort = if buffered_batches.len() == 1 {
- buffered_batches.pop()
- } else {
- let batches = buffered_batches.drain(..).collect::<Vec<_>>();
- // combine all record batches into one for each column
- common::combine_batches(&batches, schema.clone())?
- };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+ sort_arrays: &[Vec<ArrayRef>],
+ expr: &[PhysicalSortExpr],
+ batch_size: usize,
+) -> Result<SortedIterator> {
+ let row_indices = sort_arrays
+ .iter()
+ .enumerate()
+ .flat_map(|(i, arrays)| {
+ (0..arrays[0].len())
+ .map(|r| CompositeIndex {
+ // since we original use UInt32Array to index the combined
mono batch,
+ // component record batches won't overflow as well,
+ // use u32 here for space efficiency.
+ batch_idx: i as u32,
+ row_idx: r as u32,
+ })
+ .collect::<Vec<CompositeIndex>>()
+ })
+ .collect::<Vec<CompositeIndex>>();
+
+ let sort_columns = expr
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| {
+ let columns_i = sort_arrays
+ .iter()
+ .map(|cs| cs[i].as_ref())
+ .collect::<Vec<&dyn Array>>();
+ Ok(SortColumn {
+ values: concat(columns_i.as_slice())?,
+ options: Some(expr.options),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let indices = lexsort_to_indices(&sort_columns, None)?;
- pre_sort
- .map(|batch| sort_batch(batch, schema.clone(), expressions))
- .transpose()?
- };
+ Ok(SortedIterator::new(indices, row_indices, batch_size))
Review Comment:
One thing I thought of as a way to improve this performance is to re-order
`row_indices` using `indices` so you had a single `Vec<CompositeIndex>`. Maybe
that is better for cache locality and it releases some small amount of memory
sooner.
However, the more interesting thing is that with a single `Vec` you could
then potentially combine multiple `CompositeIndex`s together when sequential
outputs shared the same input `RecordBatch` and thus potentially reduce the
number of calls to `MutableArrayData::extend`
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -105,13 +107,21 @@ impl ExternalSorter {
}
}
- async fn insert_batch(&self, input: RecordBatch) -> Result<()> {
+ async fn insert_batch(
+ &self,
+ input: RecordBatch,
+ tracking_metrics: &MemTrackingMetrics,
+ ) -> Result<()> {
if input.num_rows() > 0 {
let size = batch_byte_size(&input);
self.try_grow(size).await?;
self.metrics.mem_used().add(size);
let mut in_mem_batches = self.in_mem_batches.lock().await;
- in_mem_batches.push(input);
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ let partial = sort_batch(input, self.schema.clone(), &self.expr)?;
Review Comment:
It makes sense that sorting a batch in the same thread that produced it (and
thus would still be in the cache) improves performance. Nice find @yjshen
cc @tustvold who has been observing similar things while working on
scheduling I/O and CPU decoding
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
/// consume the non-empty `sorted_bathes` and do in_mem_sort
fn in_mem_partial_sort(
- buffered_batches: &mut Vec<RecordBatch>,
+ buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
+ batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
+ if buffered_batches.len() == 1 {
+ let result = buffered_batches.pop();
+ Ok(Box::pin(SizedRecordBatchStream::new(
+ schema,
+ vec![Arc::new(result.unwrap().sorted_batch)],
+ tracking_metrics,
+ )))
+ } else {
+ let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+ buffered_batches
+ .drain(..)
+ .into_iter()
+ .map(|b| {
+ let BatchWithSortArray {
+ sort_arrays,
+ sorted_batch: batch,
+ } = b;
+ (sort_arrays, batch)
+ })
+ .unzip();
+
+ let sorted_iter = {
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+ };
+ Ok(Box::pin(SortedSizedRecordBatchStream::new(
+ schema,
+ batches,
+ sorted_iter,
+ tracking_metrics,
+ )))
+ }
+}
- let result = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+ batch_idx: u32,
+ row_idx: u32,
+}
- let pre_sort = if buffered_batches.len() == 1 {
- buffered_batches.pop()
- } else {
- let batches = buffered_batches.drain(..).collect::<Vec<_>>();
- // combine all record batches into one for each column
- common::combine_batches(&batches, schema.clone())?
- };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+ sort_arrays: &[Vec<ArrayRef>],
+ expr: &[PhysicalSortExpr],
+ batch_size: usize,
+) -> Result<SortedIterator> {
+ let row_indices = sort_arrays
+ .iter()
+ .enumerate()
+ .flat_map(|(i, arrays)| {
+ (0..arrays[0].len())
+ .map(|r| CompositeIndex {
+ // since we original use UInt32Array to index the combined
mono batch,
+ // component record batches won't overflow as well,
+ // use u32 here for space efficiency.
+ batch_idx: i as u32,
+ row_idx: r as u32,
+ })
+ .collect::<Vec<CompositeIndex>>()
+ })
+ .collect::<Vec<CompositeIndex>>();
+
+ let sort_columns = expr
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| {
+ let columns_i = sort_arrays
+ .iter()
+ .map(|cs| cs[i].as_ref())
+ .collect::<Vec<&dyn Array>>();
+ Ok(SortColumn {
+ values: concat(columns_i.as_slice())?,
+ options: Some(expr.options),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let indices = lexsort_to_indices(&sort_columns, None)?;
- pre_sort
- .map(|batch| sort_batch(batch, schema.clone(), expressions))
- .transpose()?
- };
+ Ok(SortedIterator::new(indices, row_indices, batch_size))
+}
- Ok(Box::pin(SizedRecordBatchStream::new(
- schema,
- vec![Arc::new(result.unwrap())],
- tracking_metrics,
- )))
+struct SortedIterator {
+ pos: usize,
+ indices: UInt32Array,
+ composite: Vec<CompositeIndex>,
+ batch_size: usize,
+ length: usize,
+}
+
+impl SortedIterator {
+ fn new(
+ indices: UInt32Array,
+ composite: Vec<CompositeIndex>,
+ batch_size: usize,
+ ) -> Self {
+ let length = composite.len();
+ Self {
+ pos: 0,
+ indices,
+ composite,
+ batch_size,
+ length,
+ }
+ }
+
+ fn memory_size(&self) -> usize {
+ std::mem::size_of_val(self)
+ + self.indices.get_array_memory_size()
+ + std::mem::size_of_val(&self.composite[..])
+ }
+}
+
+impl Iterator for SortedIterator {
+ type Item = Vec<CompositeIndex>;
+
+ /// Emit a max of `batch_size` positions each time
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.pos >= self.length {
+ return None;
+ }
+
+ let current_size = min(self.batch_size, self.length - self.pos);
+ let mut result = Vec::with_capacity(current_size);
+ for i in 0..current_size {
+ let p = self.pos + i;
+ let c_index = self.indices.value(p) as usize;
+ result.push(self.composite[c_index])
+ }
+ self.pos += current_size;
+ Some(result)
+ }
+}
+
+/// Stream of sorted record batches
+struct SortedSizedRecordBatchStream {
+ schema: SchemaRef,
+ batches: Vec<RecordBatch>,
+ sorted_iter: SortedIterator,
+ num_cols: usize,
+ metrics: MemTrackingMetrics,
+}
+
+impl SortedSizedRecordBatchStream {
+ /// new
+ pub fn new(
+ schema: SchemaRef,
+ batches: Vec<RecordBatch>,
+ sorted_iter: SortedIterator,
+ metrics: MemTrackingMetrics,
+ ) -> Self {
+ let size = batches.iter().map(batch_byte_size).sum::<usize>()
+ + sorted_iter.memory_size();
+ metrics.init_mem_used(size);
+ let num_cols = batches[0].num_columns();
+ SortedSizedRecordBatchStream {
+ schema,
+ batches,
+ sorted_iter,
+ num_cols,
+ metrics,
+ }
+ }
+}
+
+impl Stream for SortedSizedRecordBatchStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ match self.sorted_iter.next() {
+ None => Poll::Ready(None),
+ Some(combined) => {
+ let mut output = Vec::with_capacity(self.num_cols);
+ for i in 0..self.num_cols {
+ let arrays = self
+ .batches
+ .iter()
+ .map(|b| b.column(i).data())
+ .collect::<Vec<_>>();
+ let mut mutable =
Review Comment:
this is very clever 👍 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]