This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 63d24bf Make `SortPreservingMergeStream` stable on input stream order
(#1687)
63d24bf is described below
commit 63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jan 27 06:50:09 2022 -0500
Make `SortPreservingMergeStream` stable on input stream order (#1687)
---
datafusion/src/physical_plan/sorts/mod.rs | 4 +-
.../physical_plan/sorts/sort_preserving_merge.rs | 100 +++++++++++++++++++++
2 files changed, 103 insertions(+), 1 deletion(-)
diff --git a/datafusion/src/physical_plan/sorts/mod.rs
b/datafusion/src/physical_plan/sorts/mod.rs
index b49b583..7855568 100644
--- a/datafusion/src/physical_plan/sorts/mod.rs
+++ b/datafusion/src/physical_plan/sorts/mod.rs
@@ -160,7 +160,9 @@ impl SortKeyCursor {
}
}
- Ok(Ordering::Equal)
+ // Break ties using stream_idx to ensure a predictable
+ // ordering of rows when comparing equal streams.
+ Ok(self.stream_idx.cmp(&other.stream_idx))
}
/// Initialize a collection of comparators for comparing
diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
index f950526..2ac468b 100644
--- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -58,6 +58,27 @@ use crate::physical_plan::{
/// provided each partition of the input plan is sorted with respect to
/// these sort expressions, this operator will yield a single partition
/// that is also sorted with respect to them
+///
+/// ```text
+/// ┌─────────────────────────┐
+/// │ ┌───┬───┬───┬───┐ │
+/// │ │ A │ B │ C │ D │ ... │──┐
+/// │ └───┴───┴───┴───┘ │ │
+/// └─────────────────────────┘ │ ┌───────────────────┐
┌───────────────────────────────┐
+/// Stream 1 │ │ │ │
┌───┬───╦═══╦───┬───╦═══╗ │
+/// ├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C
│ D ║ E ║ ... │
+/// │ │ │ │
└───┴─▲─╩═══╩───┴───╩═══╝ │
+/// ┌─────────────────────────┐ │ └───────────────────┘
└─┬─────┴───────────────────────┘
+/// │ ╔═══╦═══╗ │ │
+/// │ ║ B ║ E ║ ... │──┘ │
+/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged
stream
+/// └─────────────────────────┘ places equal rows from stream 1
+/// Stream 2
+///
+///
+/// Input Streams Output stream
+/// (sorted) (sorted)
+/// ```
#[derive(Debug)]
pub struct SortPreservingMergeExec {
/// Input plan
@@ -1361,4 +1382,83 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_stable_sort() {
+ let runtime = Arc::new(RuntimeEnv::default());
+
+ // Create record batches like:
+ // batch_number |value
+ // -------------+------
+ // 1 | A
+ // 1 | B
+ //
+ // Ensure that the output is in the same order the batches were fed
+ let partitions: Vec<Vec<RecordBatch>> = (0..10)
+ .map(|batch_number| {
+ let batch_number: Int32Array =
+ vec![Some(batch_number), Some(batch_number)]
+ .into_iter()
+ .collect();
+ let value: StringArray = vec![Some("A"),
Some("B")].into_iter().collect();
+
+ let batch = RecordBatch::try_from_iter(vec![
+ ("batch_number", Arc::new(batch_number) as ArrayRef),
+ ("value", Arc::new(value) as ArrayRef),
+ ])
+ .unwrap();
+
+ vec![batch]
+ })
+ .collect();
+
+ let schema = partitions[0][0].schema();
+
+ let sort = vec![PhysicalSortExpr {
+ expr: col("value", &schema).unwrap(),
+ options: SortOptions {
+ descending: false,
+ nulls_first: true,
+ },
+ }];
+
+ let exec = MemoryExec::try_new(&partitions, schema, None).unwrap();
+ let merge = Arc::new(SortPreservingMergeExec::new(sort,
Arc::new(exec)));
+
+ let collected = collect(merge, runtime).await.unwrap();
+ assert_eq!(collected.len(), 1);
+
+ // Expect the data to be sorted first by "batch_number" (because
+ // that was the order it was fed in, even though only "value"
+ // is in the sort key)
+ assert_batches_eq!(
+ &[
+ "+--------------+-------+",
+ "| batch_number | value |",
+ "+--------------+-------+",
+ "| 0 | A |",
+ "| 1 | A |",
+ "| 2 | A |",
+ "| 3 | A |",
+ "| 4 | A |",
+ "| 5 | A |",
+ "| 6 | A |",
+ "| 7 | A |",
+ "| 8 | A |",
+ "| 9 | A |",
+ "| 0 | B |",
+ "| 1 | B |",
+ "| 2 | B |",
+ "| 3 | B |",
+ "| 4 | B |",
+ "| 5 | B |",
+ "| 6 | B |",
+ "| 7 | B |",
+ "| 8 | B |",
+ "| 9 | B |",
+ "+--------------+-------+",
+ ],
+ collected.as_slice()
+ );
+ }
}