This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 63d24bf  Make `SortPreservingMergeStream` stable on input stream order 
(#1687)
63d24bf is described below

commit 63d24bf5bd0d34e84ccc10c9bc51b5bb754017b1
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jan 27 06:50:09 2022 -0500

    Make `SortPreservingMergeStream` stable on input stream order (#1687)
---
 datafusion/src/physical_plan/sorts/mod.rs          |   4 +-
 .../physical_plan/sorts/sort_preserving_merge.rs   | 100 +++++++++++++++++++++
 2 files changed, 103 insertions(+), 1 deletion(-)

diff --git a/datafusion/src/physical_plan/sorts/mod.rs 
b/datafusion/src/physical_plan/sorts/mod.rs
index b49b583..7855568 100644
--- a/datafusion/src/physical_plan/sorts/mod.rs
+++ b/datafusion/src/physical_plan/sorts/mod.rs
@@ -160,7 +160,9 @@ impl SortKeyCursor {
             }
         }
 
-        Ok(Ordering::Equal)
+        // Break ties using stream_idx to ensure a predictable
+        // ordering of rows when comparing equal streams.
+        Ok(self.stream_idx.cmp(&other.stream_idx))
     }
 
     /// Initialize a collection of comparators for comparing
diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
index f950526..2ac468b 100644
--- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -58,6 +58,27 @@ use crate::physical_plan::{
 /// provided each partition of the input plan is sorted with respect to
 /// these sort expressions, this operator will yield a single partition
 /// that is also sorted with respect to them
+///
+/// ```text
+/// ┌─────────────────────────┐
+/// │ ┌───┬───┬───┬───┐       │
+/// │ │ A │ B │ C │ D │ ...   │──┐
+/// │ └───┴───┴───┴───┘       │  │
+/// └─────────────────────────┘  │  ┌───────────────────┐    
┌───────────────────────────────┐
+///   Stream 1                   │  │                   │    │ 
┌───┬───╦═══╦───┬───╦═══╗     │
+///                              ├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C 
│ D ║ E ║ ... │
+///                              │  │                   │    │ 
└───┴─▲─╩═══╩───┴───╩═══╝     │
+/// ┌─────────────────────────┐  │  └───────────────────┘    
└─┬─────┴───────────────────────┘
+/// │ ╔═══╦═══╗               │  │
+/// │ ║ B ║ E ║     ...       │──┘                             │
+/// │ ╚═══╩═══╝               │              Note Stable Sort: the merged 
stream
+/// └─────────────────────────┘                places equal rows from stream 1
+///   Stream 2
+///
+///
+///  Input Streams                                             Output stream
+///    (sorted)                                                  (sorted)
+/// ```
 #[derive(Debug)]
 pub struct SortPreservingMergeExec {
     /// Input plan
@@ -1361,4 +1382,83 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_stable_sort() {
+        let runtime = Arc::new(RuntimeEnv::default());
+
+        // Create record batches like:
+        // batch_number |value
+        // -------------+------
+        //    1         | A
+        //    1         | B
+        //
+        // Ensure that the output is in the same order the batches were fed
+        let partitions: Vec<Vec<RecordBatch>> = (0..10)
+            .map(|batch_number| {
+                let batch_number: Int32Array =
+                    vec![Some(batch_number), Some(batch_number)]
+                        .into_iter()
+                        .collect();
+                let value: StringArray = vec![Some("A"), 
Some("B")].into_iter().collect();
+
+                let batch = RecordBatch::try_from_iter(vec![
+                    ("batch_number", Arc::new(batch_number) as ArrayRef),
+                    ("value", Arc::new(value) as ArrayRef),
+                ])
+                .unwrap();
+
+                vec![batch]
+            })
+            .collect();
+
+        let schema = partitions[0][0].schema();
+
+        let sort = vec![PhysicalSortExpr {
+            expr: col("value", &schema).unwrap(),
+            options: SortOptions {
+                descending: false,
+                nulls_first: true,
+            },
+        }];
+
+        let exec = MemoryExec::try_new(&partitions, schema, None).unwrap();
+        let merge = Arc::new(SortPreservingMergeExec::new(sort, 
Arc::new(exec)));
+
+        let collected = collect(merge, runtime).await.unwrap();
+        assert_eq!(collected.len(), 1);
+
+        // Expect the data to be sorted first by "batch_number" (because
+        // that was the order it was fed in, even though only "value"
+        // is in the sort key)
+        assert_batches_eq!(
+            &[
+                "+--------------+-------+",
+                "| batch_number | value |",
+                "+--------------+-------+",
+                "| 0            | A     |",
+                "| 1            | A     |",
+                "| 2            | A     |",
+                "| 3            | A     |",
+                "| 4            | A     |",
+                "| 5            | A     |",
+                "| 6            | A     |",
+                "| 7            | A     |",
+                "| 8            | A     |",
+                "| 9            | A     |",
+                "| 0            | B     |",
+                "| 1            | B     |",
+                "| 2            | B     |",
+                "| 3            | B     |",
+                "| 4            | B     |",
+                "| 5            | B     |",
+                "| 6            | B     |",
+                "| 7            | B     |",
+                "| 8            | B     |",
+                "| 9            | B     |",
+                "+--------------+-------+",
+            ],
+            collected.as_slice()
+        );
+    }
 }

Reply via email to