This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3ee23  perf: improve performance of `SortPreservingMergeExec` 
operator (#722)
bd3ee23 is described below

commit bd3ee23520a3e6f135891ec32d96fcea7ee2bb55
Author: Edd Robinson <[email protected]>
AuthorDate: Mon Jul 19 13:13:22 2021 +0100

    perf: improve performance of `SortPreservingMergeExec` operator (#722)
    
    * perf: re-use Array comparators
    
    This commit stores built Arrow comparators for two arrays on each of the 
sort key cursors, resulting in a significant reduction in the cost associated 
with merging record batches using the `SortPreservingMerge` operator.
    
    Benchmarks improved as follows:
    
    ```
    ⇒  critcmp master pr
    group                               master                                 
pr
    -----                               ------                                 
--
    interleave_batches                  1.83   623.8±12.41µs        ? ?/sec    
1.00    341.2±6.98µs        ? ?/sec
    merge_batches_no_overlap_large      1.56    400.6±4.94µs        ? ?/sec    
1.00    256.3±6.57µs        ? ?/sec
    merge_batches_no_overlap_small      1.63   425.1±24.88µs        ? ?/sec    
1.00    261.1±7.46µs        ? ?/sec
    merge_batches_small_into_large      1.18    228.0±3.95µs        ? ?/sec    
1.00    193.6±2.86µs        ? ?/sec
    merge_batches_some_overlap_large    1.68   505.4±10.27µs        ? ?/sec    
1.00    301.3±6.63µs        ? ?/sec
    merge_batches_some_overlap_small    1.64    515.7±5.21µs        ? ?/sec    
1.00   314.6±12.66µs        ? ?/sec
    ```
    
    * test: test more than two partitions
---
 .../src/physical_plan/sort_preserving_merge.rs     | 182 ++++++++++++++++-----
 1 file changed, 145 insertions(+), 37 deletions(-)

diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs 
b/datafusion/src/physical_plan/sort_preserving_merge.rs
index 0949c3c..b4bcc29 100644
--- a/datafusion/src/physical_plan/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sort_preserving_merge.rs
@@ -24,6 +24,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use arrow::array::DynComparator;
 use arrow::{
     array::{make_array as make_arrow_array, ArrayRef, MutableArrayData},
     compute::SortOptions,
@@ -35,6 +36,7 @@ use async_trait::async_trait;
 use futures::channel::mpsc;
 use futures::stream::FusedStream;
 use futures::{Stream, StreamExt};
+use hashbrown::HashMap;
 
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::{
@@ -176,34 +178,60 @@ impl ExecutionPlan for SortPreservingMergeExec {
     }
 }
 
-/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of 
`PhysicalExpr` that when
-/// evaluated on the `RecordBatch` yield the sort keys.
+/// A `SortKeyCursor` is created from a `RecordBatch`, and a set of
+/// `PhysicalExpr` that when evaluated on the `RecordBatch` yield the sort 
keys.
 ///
 /// Additionally it maintains a row cursor that can be advanced through the 
rows
 /// of the provided `RecordBatch`
 ///
-/// `SortKeyCursor::compare` can then be used to compare the sort key pointed 
to by this
-/// row cursor, with that of another `SortKeyCursor`
-#[derive(Debug, Clone)]
+/// `SortKeyCursor::compare` can then be used to compare the sort key pointed 
to
+/// by this row cursor, with that of another `SortKeyCursor`. A cursor stores
+/// a row comparator for each other cursor that it is compared to.
 struct SortKeyCursor {
     columns: Vec<ArrayRef>,
-    batch: RecordBatch,
     cur_row: usize,
     num_rows: usize,
+
+    // An index uniquely identifying the record batch scanned by this cursor.
+    batch_idx: usize,
+    batch: RecordBatch,
+
+    // A collection of comparators that compare rows in this cursor's batch to
+    // the cursors in other batches. Other batches are uniquely identified by
+    // their batch_idx.
+    batch_comparators: HashMap<usize, Vec<DynComparator>>,
+}
+
+impl<'a> std::fmt::Debug for SortKeyCursor {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("SortKeyCursor")
+            .field("columns", &self.columns)
+            .field("cur_row", &self.cur_row)
+            .field("num_rows", &self.num_rows)
+            .field("batch_idx", &self.batch_idx)
+            .field("batch", &self.batch)
+            .field("batch_comparators", &"<FUNC>")
+            .finish()
+    }
 }
 
 impl SortKeyCursor {
-    fn new(batch: RecordBatch, sort_key: &[Arc<dyn PhysicalExpr>]) -> 
Result<Self> {
+    fn new(
+        batch_idx: usize,
+        batch: RecordBatch,
+        sort_key: &[Arc<dyn PhysicalExpr>],
+    ) -> Result<Self> {
         let columns = sort_key
             .iter()
             .map(|expr| 
Ok(expr.evaluate(&batch)?.into_array(batch.num_rows())))
             .collect::<Result<_>>()?;
-
         Ok(Self {
             cur_row: 0,
             num_rows: batch.num_rows(),
             columns,
             batch,
+            batch_idx,
+            batch_comparators: HashMap::new(),
         })
     }
 
@@ -220,7 +248,7 @@ impl SortKeyCursor {
 
     /// Compares the sort key pointed to by this instance's row cursor with 
that of another
     fn compare(
-        &self,
+        &mut self,
         other: &SortKeyCursor,
         options: &[SortOptions],
     ) -> Result<Ordering> {
@@ -246,7 +274,19 @@ impl SortKeyCursor {
             .zip(other.columns.iter())
             .zip(options.iter());
 
-        for ((l, r), sort_options) in zipped {
+        // Recall or initialise a collection of comparators for comparing
+        // columnar arrays of this cursor and "other".
+        let cmp = self
+            .batch_comparators
+            .entry(other.batch_idx)
+            .or_insert_with(|| Vec::with_capacity(other.columns.len()));
+
+        for (i, ((l, r), sort_options)) in zipped.enumerate() {
+            if i >= cmp.len() {
+                // initialise comparators as potentially needed
+                cmp.push(arrow::array::build_compare(l.as_ref(), r.as_ref())?);
+            }
+
             match (l.is_valid(self.cur_row), r.is_valid(other.cur_row)) {
                 (false, true) if sort_options.nulls_first => return 
Ok(Ordering::Less),
                 (false, true) => return Ok(Ordering::Greater),
@@ -255,15 +295,11 @@ impl SortKeyCursor {
                 }
                 (true, false) => return Ok(Ordering::Less),
                 (false, false) => {}
-                (true, true) => {
-                    // TODO: Building the predicate each time is sub-optimal
-                    let c = arrow::array::build_compare(l.as_ref(), 
r.as_ref())?;
-                    match c(self.cur_row, other.cur_row) {
-                        Ordering::Equal => {}
-                        o if sort_options.descending => return Ok(o.reverse()),
-                        o => return Ok(o),
-                    }
-                }
+                (true, true) => match cmp[i](self.cur_row, other.cur_row) {
+                    Ordering::Equal => {}
+                    o if sort_options.descending => return Ok(o.reverse()),
+                    o => return Ok(o),
+                },
             }
         }
 
@@ -304,6 +340,9 @@ struct SortPreservingMergeStream {
     target_batch_size: usize,
     /// If the stream has encountered an error
     aborted: bool,
+
+    /// An index to uniquely identify the input stream batch
+    next_batch_index: usize,
 }
 
 impl SortPreservingMergeStream {
@@ -313,15 +352,21 @@ impl SortPreservingMergeStream {
         expressions: &[PhysicalSortExpr],
         target_batch_size: usize,
     ) -> Self {
+        let cursors = (0..streams.len())
+            .into_iter()
+            .map(|_| VecDeque::new())
+            .collect();
+
         Self {
             schema,
-            cursors: vec![Default::default(); streams.len()],
+            cursors,
             streams,
             column_expressions: expressions.iter().map(|x| 
x.expr.clone()).collect(),
             sort_options: expressions.iter().map(|x| x.options).collect(),
             target_batch_size,
             aborted: false,
             in_progress: vec![],
+            next_batch_index: 0,
         }
     }
 
@@ -352,12 +397,17 @@ impl SortPreservingMergeStream {
                 return Poll::Ready(Err(e));
             }
             Some(Ok(batch)) => {
-                let cursor = match SortKeyCursor::new(batch, 
&self.column_expressions) {
+                let cursor = match SortKeyCursor::new(
+                    self.next_batch_index, // assign this batch an ID
+                    batch,
+                    &self.column_expressions,
+                ) {
                     Ok(cursor) => cursor,
                     Err(e) => {
                         return 
Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));
                     }
                 };
+                self.next_batch_index += 1;
                 self.cursors[idx].push_back(cursor)
             }
         }
@@ -367,17 +417,17 @@ impl SortPreservingMergeStream {
 
     /// Returns the index of the next stream to pull a row from, or None
     /// if all cursors for all streams are exhausted
-    fn next_stream_idx(&self) -> Result<Option<usize>> {
-        let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
-        for (idx, candidate) in self.cursors.iter().enumerate() {
-            if let Some(candidate) = candidate.back() {
+    fn next_stream_idx(&mut self) -> Result<Option<usize>> {
+        let mut min_cursor: Option<(usize, &mut SortKeyCursor)> = None;
+        for (idx, candidate) in self.cursors.iter_mut().enumerate() {
+            if let Some(candidate) = candidate.back_mut() {
                 if candidate.is_finished() {
                     continue;
                 }
 
                 match min_cursor {
                     None => min_cursor = Some((idx, candidate)),
-                    Some((_, min)) => {
+                    Some((_, ref mut min)) => {
                         if min.compare(candidate, &self.sort_options)?
                             == Ordering::Greater
                         {
@@ -599,8 +649,7 @@ mod tests {
         let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
 
         _test_merge(
-            b1,
-            b2,
+            &[vec![b1], vec![b2]],
             &[
                 "+----+---+-------------------------------+",
                 "| a  | b | c                             |",
@@ -646,8 +695,7 @@ mod tests {
         let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
 
         _test_merge(
-            b1,
-            b2,
+            &[vec![b1], vec![b2]],
             &[
                 "+-----+---+-------------------------------+",
                 "| a   | b | c                             |",
@@ -693,8 +741,7 @@ mod tests {
         let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
 
         _test_merge(
-            b1,
-            b2,
+            &[vec![b1], vec![b2]],
             &[
                 "+----+---+-------------------------------+",
                 "| a  | b | c                             |",
@@ -715,8 +762,71 @@ mod tests {
         .await;
     }
 
-    async fn _test_merge(b1: RecordBatch, b2: RecordBatch, exp: &[&str]) {
-        let schema = b1.schema();
+    #[tokio::test]
+    async fn test_merge_three_partitions() {
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("a"),
+            Some("b"),
+            Some("c"),
+            Some("d"),
+            Some("f"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 
6, 5, 8]));
+        let b1 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 70, 90, 30]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("e"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+            Some("j"),
+        ]));
+        let c: ArrayRef =
+            Arc::new(TimestampNanosecondArray::from(vec![40, 60, 20, 20, 60]));
+        let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![100, 200, 700, 900, 
300]));
+        let b: ArrayRef = Arc::new(StringArray::from_iter(vec![
+            Some("f"),
+            Some("g"),
+            Some("h"),
+            Some("i"),
+            Some("j"),
+        ]));
+        let c: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![4, 6, 
2, 2, 6]));
+        let b3 = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", 
c)]).unwrap();
+
+        _test_merge(
+            &[vec![b1], vec![b2], vec![b3]],
+            &[
+                "+-----+---+-------------------------------+",
+                "| a   | b | c                             |",
+                "+-----+---+-------------------------------+",
+                "| 1   | a | 1970-01-01 00:00:00.000000008 |",
+                "| 2   | b | 1970-01-01 00:00:00.000000007 |",
+                "| 7   | c | 1970-01-01 00:00:00.000000006 |",
+                "| 9   | d | 1970-01-01 00:00:00.000000005 |",
+                "| 10  | e | 1970-01-01 00:00:00.000000040 |",
+                "| 100 | f | 1970-01-01 00:00:00.000000004 |",
+                "| 3   | f | 1970-01-01 00:00:00.000000008 |",
+                "| 200 | g | 1970-01-01 00:00:00.000000006 |",
+                "| 20  | g | 1970-01-01 00:00:00.000000060 |",
+                "| 700 | h | 1970-01-01 00:00:00.000000002 |",
+                "| 70  | h | 1970-01-01 00:00:00.000000020 |",
+                "| 900 | i | 1970-01-01 00:00:00.000000002 |",
+                "| 90  | i | 1970-01-01 00:00:00.000000020 |",
+                "| 300 | j | 1970-01-01 00:00:00.000000006 |",
+                "| 30  | j | 1970-01-01 00:00:00.000000060 |",
+                "+-----+---+-------------------------------+",
+            ],
+        )
+        .await;
+    }
+
+    async fn _test_merge(partitions: &[Vec<RecordBatch>], exp: &[&str]) {
+        let schema = partitions[0][0].schema();
         let sort = vec![
             PhysicalSortExpr {
                 expr: col("b", &schema).unwrap(),
@@ -727,12 +837,10 @@ mod tests {
                 options: Default::default(),
             },
         ];
-        let exec = MemoryExec::try_new(&[vec![b1], vec![b2]], schema, 
None).unwrap();
+        let exec = MemoryExec::try_new(partitions, schema, None).unwrap();
         let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec), 1024));
 
         let collected = collect(merge).await.unwrap();
-        assert_eq!(collected.len(), 1);
-
         assert_batches_eq!(exp, collected.as_slice());
     }
 

Reply via email to