This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2a2e147984 Use interleave in BatchBuilder (~10% faster merge) (#5894)
2a2e147984 is described below
commit 2a2e147984088d0c224c440279a4f3122b8ad38e
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Apr 7 14:57:07 2023 +0100
Use interleave in BatchBuilder (~10% faster merge) (#5894)
* Use interleave in BatchBuilder
* Fix clippy
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/physical_plan/sorts/builder.rs | 135 ++++++++-------------
datafusion/core/src/physical_plan/sorts/merge.rs | 4 +-
2 files changed, 52 insertions(+), 87 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs
b/datafusion/core/src/physical_plan/sorts/builder.rs
index a1941963b6..52e896334e 100644
--- a/datafusion/core/src/physical_plan/sorts/builder.rs
+++ b/datafusion/core/src/physical_plan/sorts/builder.rs
@@ -16,52 +16,62 @@
// under the License.
use crate::common::Result;
-use crate::physical_plan::sorts::index::RowIndex;
-use arrow::array::{make_array, MutableArrayData};
+use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
-use std::collections::VecDeque;
+
+#[derive(Debug, Copy, Clone, Default)]
+struct BatchCursor {
+ /// The index into BatchBuilder::batches
+ batch_idx: usize,
+ /// The row index within the given batch
+ row_idx: usize,
+}
/// Provides an API to incrementally build a [`RecordBatch`] from partitioned
[`RecordBatch`]
#[derive(Debug)]
pub struct BatchBuilder {
/// The schema of the RecordBatches yielded by this stream
schema: SchemaRef,
- /// For each input stream maintain a dequeue of RecordBatches
- ///
- /// Exhausted batches will be popped off the front once all
- /// their rows have been yielded to the output
- batches: Vec<VecDeque<RecordBatch>>,
- /// The accumulated row indexes for the next record batch
- indices: Vec<RowIndex>,
+ /// Maintain a list of [`RecordBatch`] and their corresponding stream
+ batches: Vec<(usize, RecordBatch)>,
+
+ /// The current [`BatchCursor`] for each stream
+ cursors: Vec<BatchCursor>,
+
+ /// The accumulated stream indexes from which to pull rows
+ /// Consists of a tuple of `(batch_idx, row_idx)`
+ indices: Vec<(usize, usize)>,
}
impl BatchBuilder {
/// Create a new [`BatchBuilder`] with the provided `stream_count` and
`batch_size`
pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) ->
Self {
- let batches = (0..stream_count).map(|_| VecDeque::new()).collect();
-
Self {
schema,
- batches,
+ batches: Vec::with_capacity(stream_count * 2),
+ cursors: vec![BatchCursor::default(); stream_count],
indices: Vec::with_capacity(batch_size),
}
}
/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
- self.batches[stream_idx].push_back(batch)
+ let batch_idx = self.batches.len();
+ self.batches.push((stream_idx, batch));
+ self.cursors[stream_idx] = BatchCursor {
+ batch_idx,
+ row_idx: 0,
+ }
}
- /// Push `row_idx` from the most recently appended batch in `stream_idx`
- pub fn push_row(&mut self, stream_idx: usize, row_idx: usize) {
- let batch_idx = self.batches[stream_idx].len() - 1;
- self.indices.push(RowIndex {
- stream_idx,
- batch_idx,
- row_idx,
- });
+ /// Append the next row from `stream_idx`
+ pub fn push_row(&mut self, stream_idx: usize) {
+ let cursor = &mut self.cursors[stream_idx];
+ let row_idx = cursor.row_idx;
+ cursor.row_idx += 1;
+ self.indices.push((cursor.batch_idx, row_idx));
}
/// Returns the number of in-progress rows in this [`BatchBuilder`]
@@ -89,65 +99,16 @@ impl BatchBuilder {
return Ok(None);
}
- // Mapping from stream index to the index of the first buffer from
that stream
- let mut buffer_idx = 0;
- let mut stream_to_buffer_idx = Vec::with_capacity(self.batches.len());
-
- for batches in &self.batches {
- stream_to_buffer_idx.push(buffer_idx);
- buffer_idx += batches.len();
- }
-
- let columns = self
- .schema
- .fields()
- .iter()
- .enumerate()
- .map(|(column_idx, field)| {
- let arrays = self
+ let columns = (0..self.schema.fields.len())
+ .map(|column_idx| {
+ let arrays: Vec<_> = self
.batches
.iter()
- .flat_map(|batch| {
- batch.iter().map(|batch|
batch.column(column_idx).data())
- })
+ .map(|(_, batch)| batch.column(column_idx).as_ref())
.collect();
-
- let mut array_data = MutableArrayData::new(
- arrays,
- field.is_nullable(),
- self.indices.len(),
- );
-
- let first = &self.indices[0];
- let mut buffer_idx =
- stream_to_buffer_idx[first.stream_idx] + first.batch_idx;
- let mut start_row_idx = first.row_idx;
- let mut end_row_idx = start_row_idx + 1;
-
- for row_index in self.indices.iter().skip(1) {
- let next_buffer_idx =
- stream_to_buffer_idx[row_index.stream_idx] +
row_index.batch_idx;
-
- if next_buffer_idx == buffer_idx && row_index.row_idx ==
end_row_idx {
- // subsequent row in same batch
- end_row_idx += 1;
- continue;
- }
-
- // emit current batch of rows for current buffer
- array_data.extend(buffer_idx, start_row_idx, end_row_idx);
-
- // start new batch of rows
- buffer_idx = next_buffer_idx;
- start_row_idx = row_index.row_idx;
- end_row_idx = start_row_idx + 1;
- }
-
- // emit final batch of rows
- array_data.extend(buffer_idx, start_row_idx, end_row_idx);
- make_array(array_data.freeze())
+ Ok(interleave(&arrays, &self.indices)?)
})
- .collect();
+ .collect::<Result<Vec<_>>>()?;
self.indices.clear();
@@ -155,16 +116,20 @@ impl BatchBuilder {
// is finished. This means all remaining rows from all but the last
batch
// for each stream have been yielded to the newly created record batch
//
- // Additionally as `in_progress` has been drained, there are no longer
- // any RowIndex's reliant on the batch indexes
- //
// We can therefore drop all but the last batch for each stream
- for batches in &mut self.batches {
- if batches.len() > 1 {
- // Drain all but the last batch
- batches.drain(0..(batches.len() - 1));
+ let mut batch_idx = 0;
+ let mut retained = 0;
+ self.batches.retain(|(stream_idx, _)| {
+ let stream_cursor = &mut self.cursors[*stream_idx];
+ let retain = stream_cursor.batch_idx == batch_idx;
+ batch_idx += 1;
+
+ if retain {
+ stream_cursor.batch_idx = retained;
+ retained += 1;
}
- }
+ retain
+ });
Ok(Some(RecordBatch::try_new(self.schema.clone(), columns)?))
}
diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs
b/datafusion/core/src/physical_plan/sorts/merge.rs
index d6b43db82b..296028fac7 100644
--- a/datafusion/core/src/physical_plan/sorts/merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/merge.rs
@@ -177,9 +177,9 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
let stream_idx = self.loser_tree[0];
let cursor = self.cursors[stream_idx].as_mut();
- if let Some(row_idx) = cursor.and_then(Cursor::advance) {
+ if cursor.and_then(Cursor::advance).is_some() {
self.loser_tree_adjusted = false;
- self.in_progress.push_row(stream_idx, row_idx);
+ self.in_progress.push_row(stream_idx);
if self.in_progress.len() < self.batch_size {
continue;
}