This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a2e147984 Use interleave in BatchBuilder (~10% faster merge) (#5894)
2a2e147984 is described below

commit 2a2e147984088d0c224c440279a4f3122b8ad38e
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Apr 7 14:57:07 2023 +0100

    Use interleave in BatchBuilder (~10% faster merge) (#5894)
    
    * Use interleave in BatchBuilder
    
    * Fix clippy
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/physical_plan/sorts/builder.rs | 135 ++++++++-------------
 datafusion/core/src/physical_plan/sorts/merge.rs   |   4 +-
 2 files changed, 52 insertions(+), 87 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/builder.rs 
b/datafusion/core/src/physical_plan/sorts/builder.rs
index a1941963b6..52e896334e 100644
--- a/datafusion/core/src/physical_plan/sorts/builder.rs
+++ b/datafusion/core/src/physical_plan/sorts/builder.rs
@@ -16,52 +16,62 @@
 // under the License.
 
 use crate::common::Result;
-use crate::physical_plan::sorts::index::RowIndex;
-use arrow::array::{make_array, MutableArrayData};
+use arrow::compute::interleave;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
-use std::collections::VecDeque;
+
+#[derive(Debug, Copy, Clone, Default)]
+struct BatchCursor {
+    /// The index into BatchBuilder::batches
+    batch_idx: usize,
+    /// The row index within the given batch
+    row_idx: usize,
+}
 
 /// Provides an API to incrementally build a [`RecordBatch`] from partitioned 
[`RecordBatch`]
 #[derive(Debug)]
 pub struct BatchBuilder {
     /// The schema of the RecordBatches yielded by this stream
     schema: SchemaRef,
-    /// For each input stream maintain a dequeue of RecordBatches
-    ///
-    /// Exhausted batches will be popped off the front once all
-    /// their rows have been yielded to the output
-    batches: Vec<VecDeque<RecordBatch>>,
 
-    /// The accumulated row indexes for the next record batch
-    indices: Vec<RowIndex>,
+    /// Maintain a list of [`RecordBatch`] and their corresponding stream
+    batches: Vec<(usize, RecordBatch)>,
+
+    /// The current [`BatchCursor`] for each stream
+    cursors: Vec<BatchCursor>,
+
+    /// The accumulated stream indexes from which to pull rows
+    /// Consists of a tuple of `(batch_idx, row_idx)`
+    indices: Vec<(usize, usize)>,
 }
 
 impl BatchBuilder {
     /// Create a new [`BatchBuilder`] with the provided `stream_count` and 
`batch_size`
     pub fn new(schema: SchemaRef, stream_count: usize, batch_size: usize) -> 
Self {
-        let batches = (0..stream_count).map(|_| VecDeque::new()).collect();
-
         Self {
             schema,
-            batches,
+            batches: Vec::with_capacity(stream_count * 2),
+            cursors: vec![BatchCursor::default(); stream_count],
             indices: Vec::with_capacity(batch_size),
         }
     }
 
     /// Append a new batch in `stream_idx`
     pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) {
-        self.batches[stream_idx].push_back(batch)
+        let batch_idx = self.batches.len();
+        self.batches.push((stream_idx, batch));
+        self.cursors[stream_idx] = BatchCursor {
+            batch_idx,
+            row_idx: 0,
+        }
     }
 
-    /// Push `row_idx` from the most recently appended batch in `stream_idx`
-    pub fn push_row(&mut self, stream_idx: usize, row_idx: usize) {
-        let batch_idx = self.batches[stream_idx].len() - 1;
-        self.indices.push(RowIndex {
-            stream_idx,
-            batch_idx,
-            row_idx,
-        });
+    /// Append the next row from `stream_idx`
+    pub fn push_row(&mut self, stream_idx: usize) {
+        let cursor = &mut self.cursors[stream_idx];
+        let row_idx = cursor.row_idx;
+        cursor.row_idx += 1;
+        self.indices.push((cursor.batch_idx, row_idx));
     }
 
     /// Returns the number of in-progress rows in this [`BatchBuilder`]
@@ -89,65 +99,16 @@ impl BatchBuilder {
             return Ok(None);
         }
 
-        // Mapping from stream index to the index of the first buffer from 
that stream
-        let mut buffer_idx = 0;
-        let mut stream_to_buffer_idx = Vec::with_capacity(self.batches.len());
-
-        for batches in &self.batches {
-            stream_to_buffer_idx.push(buffer_idx);
-            buffer_idx += batches.len();
-        }
-
-        let columns = self
-            .schema
-            .fields()
-            .iter()
-            .enumerate()
-            .map(|(column_idx, field)| {
-                let arrays = self
+        let columns = (0..self.schema.fields.len())
+            .map(|column_idx| {
+                let arrays: Vec<_> = self
                     .batches
                     .iter()
-                    .flat_map(|batch| {
-                        batch.iter().map(|batch| 
batch.column(column_idx).data())
-                    })
+                    .map(|(_, batch)| batch.column(column_idx).as_ref())
                     .collect();
-
-                let mut array_data = MutableArrayData::new(
-                    arrays,
-                    field.is_nullable(),
-                    self.indices.len(),
-                );
-
-                let first = &self.indices[0];
-                let mut buffer_idx =
-                    stream_to_buffer_idx[first.stream_idx] + first.batch_idx;
-                let mut start_row_idx = first.row_idx;
-                let mut end_row_idx = start_row_idx + 1;
-
-                for row_index in self.indices.iter().skip(1) {
-                    let next_buffer_idx =
-                        stream_to_buffer_idx[row_index.stream_idx] + 
row_index.batch_idx;
-
-                    if next_buffer_idx == buffer_idx && row_index.row_idx == 
end_row_idx {
-                        // subsequent row in same batch
-                        end_row_idx += 1;
-                        continue;
-                    }
-
-                    // emit current batch of rows for current buffer
-                    array_data.extend(buffer_idx, start_row_idx, end_row_idx);
-
-                    // start new batch of rows
-                    buffer_idx = next_buffer_idx;
-                    start_row_idx = row_index.row_idx;
-                    end_row_idx = start_row_idx + 1;
-                }
-
-                // emit final batch of rows
-                array_data.extend(buffer_idx, start_row_idx, end_row_idx);
-                make_array(array_data.freeze())
+                Ok(interleave(&arrays, &self.indices)?)
             })
-            .collect();
+            .collect::<Result<Vec<_>>>()?;
 
         self.indices.clear();
 
@@ -155,16 +116,20 @@ impl BatchBuilder {
         // is finished. This means all remaining rows from all but the last 
batch
         // for each stream have been yielded to the newly created record batch
         //
-        // Additionally as `in_progress` has been drained, there are no longer
-        // any RowIndex's reliant on the batch indexes
-        //
         // We can therefore drop all but the last batch for each stream
-        for batches in &mut self.batches {
-            if batches.len() > 1 {
-                // Drain all but the last batch
-                batches.drain(0..(batches.len() - 1));
+        let mut batch_idx = 0;
+        let mut retained = 0;
+        self.batches.retain(|(stream_idx, _)| {
+            let stream_cursor = &mut self.cursors[*stream_idx];
+            let retain = stream_cursor.batch_idx == batch_idx;
+            batch_idx += 1;
+
+            if retain {
+                stream_cursor.batch_idx = retained;
+                retained += 1;
             }
-        }
+            retain
+        });
 
         Ok(Some(RecordBatch::try_new(self.schema.clone(), columns)?))
     }
diff --git a/datafusion/core/src/physical_plan/sorts/merge.rs 
b/datafusion/core/src/physical_plan/sorts/merge.rs
index d6b43db82b..296028fac7 100644
--- a/datafusion/core/src/physical_plan/sorts/merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/merge.rs
@@ -177,9 +177,9 @@ impl<C: Cursor> SortPreservingMergeStream<C> {
 
             let stream_idx = self.loser_tree[0];
             let cursor = self.cursors[stream_idx].as_mut();
-            if let Some(row_idx) = cursor.and_then(Cursor::advance) {
+            if cursor.and_then(Cursor::advance).is_some() {
                 self.loser_tree_adjusted = false;
-                self.in_progress.push_row(stream_idx, row_idx);
+                self.in_progress.push_row(stream_idx);
                 if self.in_progress.len() < self.batch_size {
                     continue;
                 }

Reply via email to