This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a63751494 fix: Sort with a lot of repetition values (#2182)
a63751494 is described below

commit a63751494361f0b4ab78b8c085e32aed43c681dd
Author: Yijie Shen <[email protected]>
AuthorDate: Sat Apr 9 16:36:14 2022 +0800

    fix: Sort with a lot of repetition values (#2182)
    
    * fix: Sort with a lot of repetition values
    
    * clear insteal of drain
    
    * Use unstable
---
 datafusion/core/src/physical_plan/sorts/sort.rs    |  69 ++++++++++++++-------
 .../core/tests/parquet/repeat_much.snappy.parquet  | Bin 0 -> 1261 bytes
 datafusion/core/tests/sql/order.rs                 |  23 +++++++
 3 files changed, 69 insertions(+), 23 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 163a608b3..1d13efd14 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -426,8 +426,7 @@ impl Iterator for SortedIterator {
         // Combine adjacent indexes from the same batch to make a slice,
         // for more efficient `extend` later.
         let mut last_batch_idx = 0;
-        let mut start_row_idx = 0;
-        let mut len = 0;
+        let mut indices_in_batch = vec![];
 
         let mut slices = vec![];
         for i in 0..current_size {
@@ -435,43 +434,67 @@ impl Iterator for SortedIterator {
             let c_index = self.indices.value(p) as usize;
             let ci = self.composite[c_index];
 
-            if len == 0 {
+            if indices_in_batch.is_empty() {
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             } else if ci.batch_idx == last_batch_idx {
-                len += 1;
-                // since we have pre-sort each of the incoming batches,
-                // so if we witnessed a wrong order of indexes from the same 
batch,
-                // it must be of the same key with the row pointed by 
start_row_index.
-                start_row_idx = min(start_row_idx, ci.row_idx);
+                indices_in_batch.push(ci.row_idx);
             } else {
-                slices.push(CompositeSlice {
-                    batch_idx: last_batch_idx,
-                    start_row_idx,
-                    len,
-                });
+                group_indices(last_batch_idx, &mut indices_in_batch, &mut 
slices);
                 last_batch_idx = ci.batch_idx;
-                start_row_idx = ci.row_idx;
-                len = 1;
+                indices_in_batch.push(ci.row_idx);
             }
         }
 
         assert!(
-            len > 0,
+            !indices_in_batch.is_empty(),
             "There should have at least one record in a sort output slice."
         );
-        slices.push(CompositeSlice {
-            batch_idx: last_batch_idx,
-            start_row_idx,
-            len,
-        });
+        group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
 
         self.pos += current_size;
         Some(slices)
     }
 }
 
+/// Group continuous indices into a slice for better `extend` performance
+fn group_indices(
+    batch_idx: u32,
+    positions: &mut Vec<u32>,
+    output: &mut Vec<CompositeSlice>,
+) {
+    positions.sort_unstable();
+    let mut last_pos = 0;
+    let mut run_length = 0;
+    for pos in positions.iter() {
+        if run_length == 0 {
+            last_pos = *pos;
+            run_length = 1;
+        } else if *pos == last_pos + 1 {
+            run_length += 1;
+            last_pos = *pos;
+        } else {
+            output.push(CompositeSlice {
+                batch_idx,
+                start_row_idx: last_pos + 1 - run_length,
+                len: run_length as usize,
+            });
+            last_pos = *pos;
+            run_length = 1;
+        }
+    }
+    assert!(
+        run_length > 0,
+        "There should have at least one record in a sort output slice."
+    );
+    output.push(CompositeSlice {
+        batch_idx,
+        start_row_idx: last_pos + 1 - run_length,
+        len: run_length as usize,
+    });
+    positions.clear()
+}
+
 /// Stream of sorted record batches
 struct SortedSizedRecordBatchStream {
     schema: SchemaRef,
diff --git a/datafusion/core/tests/parquet/repeat_much.snappy.parquet 
b/datafusion/core/tests/parquet/repeat_much.snappy.parquet
new file mode 100644
index 000000000..f1066acbd
Binary files /dev/null and 
b/datafusion/core/tests/parquet/repeat_much.snappy.parquet differ
diff --git a/datafusion/core/tests/sql/order.rs 
b/datafusion/core/tests/sql/order.rs
index 2683f20e7..d500fbb89 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use super::*;
+use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec};
 
 #[tokio::test]
 async fn test_sort_unprojected_col() -> Result<()> {
@@ -198,3 +199,25 @@ async fn sort_empty() -> Result<()> {
     assert_eq!(results.len(), 0);
     Ok(())
 }
+
+#[tokio::test]
+async fn sort_with_lots_of_repetition_values() -> Result<()> {
+    let ctx = SessionContext::new();
+    let filename = "tests/parquet/repeat_much.snappy.parquet";
+
+    ctx.register_parquet("rep", filename, ParquetReadOptions::default())
+        .await?;
+    let sql = "select a from rep order by a";
+    let actual = execute_to_batches(&ctx, sql).await;
+    let actual = batches_to_vec(&actual);
+
+    let sql1 = "select a from rep";
+    let expected = execute_to_batches(&ctx, sql1).await;
+    let expected = partitions_to_sorted_vec(&[expected]);
+
+    assert_eq!(actual.len(), expected.len());
+    for i in 0..actual.len() {
+        assert_eq!(actual[i], expected[i]);
+    }
+    Ok(())
+}

Reply via email to