This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a63751494 fix: Sort with a lot of repetition values (#2182)
a63751494 is described below
commit a63751494361f0b4ab78b8c085e32aed43c681dd
Author: Yijie Shen <[email protected]>
AuthorDate: Sat Apr 9 16:36:14 2022 +0800
fix: Sort with a lot of repetition values (#2182)
* fix: Sort with a lot of repetition values
* clear insteal of drain
* Use unstable
---
datafusion/core/src/physical_plan/sorts/sort.rs | 69 ++++++++++++++-------
.../core/tests/parquet/repeat_much.snappy.parquet | Bin 0 -> 1261 bytes
datafusion/core/tests/sql/order.rs | 23 +++++++
3 files changed, 69 insertions(+), 23 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index 163a608b3..1d13efd14 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -426,8 +426,7 @@ impl Iterator for SortedIterator {
// Combine adjacent indexes from the same batch to make a slice,
// for more efficient `extend` later.
let mut last_batch_idx = 0;
- let mut start_row_idx = 0;
- let mut len = 0;
+ let mut indices_in_batch = vec![];
let mut slices = vec![];
for i in 0..current_size {
@@ -435,43 +434,67 @@ impl Iterator for SortedIterator {
let c_index = self.indices.value(p) as usize;
let ci = self.composite[c_index];
- if len == 0 {
+ if indices_in_batch.is_empty() {
last_batch_idx = ci.batch_idx;
- start_row_idx = ci.row_idx;
- len = 1;
+ indices_in_batch.push(ci.row_idx);
} else if ci.batch_idx == last_batch_idx {
- len += 1;
- // since we have pre-sort each of the incoming batches,
- // so if we witnessed a wrong order of indexes from the same
batch,
- // it must be of the same key with the row pointed by
start_row_index.
- start_row_idx = min(start_row_idx, ci.row_idx);
+ indices_in_batch.push(ci.row_idx);
} else {
- slices.push(CompositeSlice {
- batch_idx: last_batch_idx,
- start_row_idx,
- len,
- });
+ group_indices(last_batch_idx, &mut indices_in_batch, &mut
slices);
last_batch_idx = ci.batch_idx;
- start_row_idx = ci.row_idx;
- len = 1;
+ indices_in_batch.push(ci.row_idx);
}
}
assert!(
- len > 0,
+ !indices_in_batch.is_empty(),
"There should have at least one record in a sort output slice."
);
- slices.push(CompositeSlice {
- batch_idx: last_batch_idx,
- start_row_idx,
- len,
- });
+ group_indices(last_batch_idx, &mut indices_in_batch, &mut slices);
self.pos += current_size;
Some(slices)
}
}
+/// Group continuous indices into a slice for better `extend` performance
+fn group_indices(
+ batch_idx: u32,
+ positions: &mut Vec<u32>,
+ output: &mut Vec<CompositeSlice>,
+) {
+ positions.sort_unstable();
+ let mut last_pos = 0;
+ let mut run_length = 0;
+ for pos in positions.iter() {
+ if run_length == 0 {
+ last_pos = *pos;
+ run_length = 1;
+ } else if *pos == last_pos + 1 {
+ run_length += 1;
+ last_pos = *pos;
+ } else {
+ output.push(CompositeSlice {
+ batch_idx,
+ start_row_idx: last_pos + 1 - run_length,
+ len: run_length as usize,
+ });
+ last_pos = *pos;
+ run_length = 1;
+ }
+ }
+ assert!(
+ run_length > 0,
+ "There should have at least one record in a sort output slice."
+ );
+ output.push(CompositeSlice {
+ batch_idx,
+ start_row_idx: last_pos + 1 - run_length,
+ len: run_length as usize,
+ });
+ positions.clear()
+}
+
/// Stream of sorted record batches
struct SortedSizedRecordBatchStream {
schema: SchemaRef,
diff --git a/datafusion/core/tests/parquet/repeat_much.snappy.parquet
b/datafusion/core/tests/parquet/repeat_much.snappy.parquet
new file mode 100644
index 000000000..f1066acbd
Binary files /dev/null and
b/datafusion/core/tests/parquet/repeat_much.snappy.parquet differ
diff --git a/datafusion/core/tests/sql/order.rs
b/datafusion/core/tests/sql/order.rs
index 2683f20e7..d500fbb89 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -16,6 +16,7 @@
// under the License.
use super::*;
+use fuzz_utils::{batches_to_vec, partitions_to_sorted_vec};
#[tokio::test]
async fn test_sort_unprojected_col() -> Result<()> {
@@ -198,3 +199,25 @@ async fn sort_empty() -> Result<()> {
assert_eq!(results.len(), 0);
Ok(())
}
+
+#[tokio::test]
+async fn sort_with_lots_of_repetition_values() -> Result<()> {
+ let ctx = SessionContext::new();
+ let filename = "tests/parquet/repeat_much.snappy.parquet";
+
+ ctx.register_parquet("rep", filename, ParquetReadOptions::default())
+ .await?;
+ let sql = "select a from rep order by a";
+ let actual = execute_to_batches(&ctx, sql).await;
+ let actual = batches_to_vec(&actual);
+
+ let sql1 = "select a from rep";
+ let expected = execute_to_batches(&ctx, sql1).await;
+ let expected = partitions_to_sorted_vec(&[expected]);
+
+ assert_eq!(actual.len(), expected.len());
+ for i in 0..actual.len() {
+ assert_eq!(actual[i], expected[i]);
+ }
+ Ok(())
+}