Dandandan commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842148343
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
/// consume the non-empty `sorted_bathes` and do in_mem_sort
fn in_mem_partial_sort(
- buffered_batches: &mut Vec<RecordBatch>,
+ buffered_batches: &mut Vec<BatchWithSortArray>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
+ batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
+ if buffered_batches.len() == 1 {
+ let result = buffered_batches.pop();
+ Ok(Box::pin(SizedRecordBatchStream::new(
+ schema,
+ vec![Arc::new(result.unwrap().sorted_batch)],
+ tracking_metrics,
+ )))
+ } else {
+ let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+ buffered_batches
+ .drain(..)
+ .into_iter()
+ .map(|b| {
+ let BatchWithSortArray {
+ sort_arrays,
+ sorted_batch: batch,
+ } = b;
+ (sort_arrays, batch)
+ })
+ .unzip();
+
+ let sorted_iter = {
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+ };
+ Ok(Box::pin(SortedSizedRecordBatchStream::new(
+ schema,
+ batches,
+ sorted_iter,
+ tracking_metrics,
+ )))
+ }
+}
- let result = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+ batch_idx: u32,
+ row_idx: u32,
+}
- let pre_sort = if buffered_batches.len() == 1 {
- buffered_batches.pop()
- } else {
- let batches = buffered_batches.drain(..).collect::<Vec<_>>();
- // combine all record batches into one for each column
- common::combine_batches(&batches, schema.clone())?
- };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+ sort_arrays: &[Vec<ArrayRef>],
+ expr: &[PhysicalSortExpr],
+ batch_size: usize,
+) -> Result<SortedIterator> {
+ let row_indices = sort_arrays
+ .iter()
+ .enumerate()
+ .flat_map(|(i, arrays)| {
+ (0..arrays[0].len())
+ .map(|r| CompositeIndex {
+ // since we original use UInt32Array to index the combined
mono batch,
+ // component record batches won't overflow as well,
+ // use u32 here for space efficiency.
+ batch_idx: i as u32,
+ row_idx: r as u32,
+ })
+ .collect::<Vec<CompositeIndex>>()
+ })
+ .collect::<Vec<CompositeIndex>>();
+
+ let sort_columns = expr
+ .iter()
+ .enumerate()
+ .map(|(i, expr)| {
+ let columns_i = sort_arrays
+ .iter()
+ .map(|cs| cs[i].as_ref())
+ .collect::<Vec<&dyn Array>>();
+ Ok(SortColumn {
+ values: concat(columns_i.as_slice())?,
+ options: Some(expr.options),
+ })
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let indices = lexsort_to_indices(&sort_columns, None)?;
- pre_sort
- .map(|batch| sort_batch(batch, schema.clone(), expressions))
- .transpose()?
- };
+ Ok(SortedIterator::new(indices, row_indices, batch_size))
+}
- Ok(Box::pin(SizedRecordBatchStream::new(
- schema,
- vec![Arc::new(result.unwrap())],
- tracking_metrics,
- )))
+struct SortedIterator {
+ pos: usize,
+ indices: UInt32Array,
+ composite: Vec<CompositeIndex>,
+ batch_size: usize,
+ length: usize,
+}
+
+impl SortedIterator {
+ fn new(
+ indices: UInt32Array,
+ composite: Vec<CompositeIndex>,
+ batch_size: usize,
+ ) -> Self {
+ let length = composite.len();
+ Self {
+ pos: 0,
+ indices,
+ composite,
+ batch_size,
+ length,
+ }
+ }
+
+ fn memory_size(&self) -> usize {
+ std::mem::size_of_val(self)
+ + self.indices.get_array_memory_size()
+ + std::mem::size_of_val(&self.composite[..])
+ }
+}
+
+impl Iterator for SortedIterator {
+ type Item = Vec<CompositeIndex>;
+
+ /// Emit a max of `batch_size` positions each time
+ fn next(&mut self) -> Option<Self::Item> {
+ if self.pos >= self.length {
+ return None;
+ }
+
+ let current_size = min(self.batch_size, self.length - self.pos);
+ let mut result = Vec::with_capacity(current_size);
+ for i in 0..current_size {
+ let p = self.pos + i;
+ let c_index = self.indices.value(p) as usize;
+ result.push(self.composite[c_index])
+ }
+ self.pos += current_size;
+ Some(result)
+ }
+}
+
+/// Stream of sorted record batches
+struct SortedSizedRecordBatchStream {
+ schema: SchemaRef,
+ batches: Vec<RecordBatch>,
+ sorted_iter: SortedIterator,
+ num_cols: usize,
+ metrics: MemTrackingMetrics,
+}
+
+impl SortedSizedRecordBatchStream {
+ /// new
+ pub fn new(
+ schema: SchemaRef,
+ batches: Vec<RecordBatch>,
+ sorted_iter: SortedIterator,
+ metrics: MemTrackingMetrics,
+ ) -> Self {
+ let size = batches.iter().map(batch_byte_size).sum::<usize>()
+ + sorted_iter.memory_size();
+ metrics.init_mem_used(size);
+ let num_cols = batches[0].num_columns();
+ SortedSizedRecordBatchStream {
+ schema,
+ batches,
+ sorted_iter,
+ num_cols,
+ metrics,
+ }
+ }
+}
+
+impl Stream for SortedSizedRecordBatchStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ match self.sorted_iter.next() {
+ None => Poll::Ready(None),
+ Some(combined) => {
+ let mut output = Vec::with_capacity(self.num_cols);
+ for i in 0..self.num_cols {
+ let arrays = self
+ .batches
+ .iter()
+ .map(|b| b.column(i).data())
+ .collect::<Vec<_>>();
+ let mut mutable =
+ MutableArrayData::new(arrays, false, combined.len());
+ for x in combined.iter() {
+ // we cannot extend with slice here, since the lexsort
is unstable
+ mutable.extend(
Review Comment:
I think MutableArrayData is a bit more optimized for copying larger chunks
of data, instead of single rows at a time. I guess that needs some work
optimizing this case at the Arrow side (or somehow use another construction?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]