This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f9a9f125d9 Adding order equivalence support on MemoryExec (#7259)
f9a9f125d9 is described below

commit f9a9f125d9926d9748dbb98d0e9c61c02e7475ad
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Aug 11 15:23:27 2023 +0300

    Adding order equivalence support on MemoryExec (#7259)
    
    * Memory equivalence support
    
    * MemoryExec refactor
    
    * Adding test
    
    * Update datafusion/core/src/physical_plan/memory.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Updating the comment
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../src/physical_plan/joins/symmetric_hash_join.rs | 50 ++++++------
 .../core/src/physical_plan/joins/test_utils.rs     | 22 +++---
 datafusion/core/src/physical_plan/memory.rs        | 91 +++++++++++++++++++---
 datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs |  2 +-
 .../fuzz_cases/sort_preserving_repartition_fuzz.rs |  2 +-
 datafusion/core/tests/fuzz_cases/window_fuzz.rs    |  4 +-
 datafusion/core/tests/memory_limit.rs              | 13 ++--
 7 files changed, 123 insertions(+), 61 deletions(-)

diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index efe7ce503b..5e0ae189ff 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -1520,8 +1520,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -1595,8 +1595,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -1658,7 +1658,8 @@ mod tests {
             build_sides_record_batches(TABLE_SIZE, cardinality)?;
         let left_schema = &left_batch.schema();
         let right_schema = &right_batch.schema();
-        let (left, right) = create_memory_table(left_batch, right_batch, None, 
None, 13)?;
+        let (left, right) =
+            create_memory_table(left_batch, right_batch, vec![], vec![], 13)?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -1709,7 +1710,8 @@ mod tests {
         let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE, 
(11, 21))?;
         let left_schema = &left_batch.schema();
         let right_schema = &right_batch.schema();
-        let (left, right) = create_memory_table(left_batch, right_batch, None, 
None, 13)?;
+        let (left, right) =
+            create_memory_table(left_batch, right_batch, vec![], vec![], 13)?;
 
         let on = vec![(
             Column::new_with_schema("lc1", left_schema)?,
@@ -1764,8 +1766,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -1828,8 +1830,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -1891,8 +1893,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -1955,8 +1957,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -2015,8 +2017,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
@@ -2097,8 +2099,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             10,
         )?;
 
@@ -2226,8 +2228,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
         let intermediate_schema = Schema::new(vec![
@@ -2310,8 +2312,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
         let intermediate_schema = Schema::new(vec![
@@ -2379,8 +2381,8 @@ mod tests {
         let (left, right) = create_memory_table(
             left_batch,
             right_batch,
-            Some(left_sorted),
-            Some(right_sorted),
+            vec![left_sorted],
+            vec![right_sorted],
             13,
         )?;
 
diff --git a/datafusion/core/src/physical_plan/joins/test_utils.rs 
b/datafusion/core/src/physical_plan/joins/test_utils.rs
index e786fb5eb5..44610ab09a 100644
--- a/datafusion/core/src/physical_plan/joins/test_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/test_utils.rs
@@ -37,7 +37,7 @@ use datafusion_expr::{JoinType, Operator};
 use datafusion_physical_expr::intervals::test_utils::{
     gen_conjunctive_numerical_expr, gen_conjunctive_temporal_expr,
 };
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
 use rand::prelude::StdRng;
 use rand::{Rng, SeedableRng};
 use std::sync::Arc;
@@ -489,25 +489,21 @@ pub fn build_sides_record_batches(
 pub fn create_memory_table(
     left_batch: RecordBatch,
     right_batch: RecordBatch,
-    left_sorted: Option<Vec<PhysicalSortExpr>>,
-    right_sorted: Option<Vec<PhysicalSortExpr>>,
+    left_sorted: Vec<LexOrdering>,
+    right_sorted: Vec<LexOrdering>,
     batch_size: usize,
 ) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
-    let mut left = MemoryExec::try_new(
+    let left = MemoryExec::try_new(
         &[split_record_batches(&left_batch, batch_size)?],
         left_batch.schema(),
         None,
-    )?;
-    if let Some(sorted) = left_sorted {
-        left = left.with_sort_information(sorted);
-    }
-    let mut right = MemoryExec::try_new(
+    )?
+    .with_sort_information(left_sorted);
+    let right = MemoryExec::try_new(
         &[split_record_batches(&right_batch, batch_size)?],
         right_batch.schema(),
         None,
-    )?;
-    if let Some(sorted) = right_sorted {
-        right = right.with_sort_information(sorted);
-    }
+    )?
+    .with_sort_information(right_sorted);
     Ok((Arc::new(left), Arc::new(right)))
 }
diff --git a/datafusion/core/src/physical_plan/memory.rs 
b/datafusion/core/src/physical_plan/memory.rs
index afc63e7d4c..be8385b586 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -30,8 +30,10 @@ use std::any::Any;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use crate::physical_plan::ordering_equivalence_properties_helper;
 use datafusion_common::DataFusionError;
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
 use futures::Stream;
 
 /// Execution plan for reading in-memory batches of data
@@ -44,8 +46,8 @@ pub struct MemoryExec {
     projected_schema: SchemaRef,
     /// Optional projection
     projection: Option<Vec<usize>>,
-    // Optional sort information
-    sort_information: Option<Vec<PhysicalSortExpr>>,
+    // Sort information: one or more equivalent orderings
+    sort_information: Vec<LexOrdering>,
 }
 
 impl fmt::Debug for MemoryExec {
@@ -53,7 +55,7 @@ impl fmt::Debug for MemoryExec {
         write!(f, "partitions: [...]")?;
         write!(f, "schema: {:?}", self.projected_schema)?;
         write!(f, "projection: {:?}", self.projection)?;
-        if let Some(sort_info) = &self.sort_information {
+        if let Some(sort_info) = &self.sort_information.get(0) {
             write!(f, ", output_ordering: {:?}", sort_info)?;
         }
         Ok(())
@@ -73,7 +75,7 @@ impl DisplayAs for MemoryExec {
 
                 let output_ordering = self
                     .sort_information
-                    .as_ref()
+                    .first()
                     .map(|output_ordering| {
                         let order_strings: Vec<_> =
                             output_ordering.iter().map(|e| 
e.to_string()).collect();
@@ -113,7 +115,13 @@ impl ExecutionPlan for MemoryExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.sort_information.as_deref()
+        self.sort_information
+            .first()
+            .map(|ordering| ordering.as_slice())
+    }
+
+    fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties 
{
+        ordering_equivalence_properties_helper(self.schema(), 
&self.sort_information)
     }
 
     fn with_new_children(
@@ -161,16 +169,30 @@ impl MemoryExec {
             schema,
             projected_schema,
             projection,
-            sort_information: None,
+            sort_information: vec![],
         })
     }
 
-    /// Set sort information
-    pub fn with_sort_information(
-        mut self,
-        sort_information: Vec<PhysicalSortExpr>,
-    ) -> Self {
-        self.sort_information = Some(sort_information);
+    /// A memory table can be ordered by multiple expressions simultaneously.
+    /// `OrderingEquivalenceProperties` keeps track of expressions that 
describe the
+    /// global ordering of the schema. These columns are not necessarily same; 
e.g.
+    /// ```text
+    /// ┌-------┐
+    /// | a | b |
+    /// |---|---|
+    /// | 1 | 9 |
+    /// | 2 | 8 |
+    /// | 3 | 7 |
+    /// | 5 | 5 |
+    /// └---┴---┘
+    /// ```
+    /// where both `a ASC` and `b DESC` can describe the table ordering. With
+    /// `OrderingEquivalenceProperties`, we can keep track of these 
equivalences
+    /// and treat `a ASC` and `b DESC` as the same ordering requirement
+    /// by outputting the `a ASC` from output_ordering API
+    /// and add `b DESC` into `OrderingEquivalenceProperties`
+    pub fn with_sort_information(mut self, sort_information: Vec<LexOrdering>) 
-> Self {
+        self.sort_information = sort_information;
         self
     }
 }
@@ -237,3 +259,48 @@ impl RecordBatchStream for MemoryStream {
         self.schema.clone()
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::ExecutionPlan;
+    use arrow_schema::{DataType, Field, Schema, SortOptions};
+    use datafusion_physical_expr::expressions::col;
+    use datafusion_physical_expr::PhysicalSortExpr;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_memory_order_eq() -> datafusion_common::Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int64, false),
+            Field::new("b", DataType::Int64, false),
+            Field::new("c", DataType::Int64, false),
+        ]));
+        let expected_output_order = vec![
+            PhysicalSortExpr {
+                expr: col("a", &schema)?,
+                options: SortOptions::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("b", &schema)?,
+                options: SortOptions::default(),
+            },
+        ];
+        let expected_order_eq = vec![PhysicalSortExpr {
+            expr: col("c", &schema)?,
+            options: SortOptions::default(),
+        }];
+        let sort_information =
+            vec![expected_output_order.clone(), expected_order_eq.clone()];
+        let mem_exec = MemoryExec::try_new(&[vec![]], schema, None)?
+            .with_sort_information(sort_information);
+
+        assert_eq!(mem_exec.output_ordering().unwrap(), expected_output_order);
+        let order_eq = mem_exec.ordering_equivalence_properties();
+        assert!(order_eq
+            .classes()
+            .iter()
+            .any(|class| class.contains(&expected_order_eq)));
+        Ok(())
+    }
+}
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index e27c133412..bdca1f9068 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -94,7 +94,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, 
group_by_columns: Vec<&str
     let running_source = Arc::new(
         MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
             .unwrap()
-            .with_sort_information(sort_keys),
+            .with_sort_information(vec![sort_keys]),
     );
 
     let aggregate_expr = vec![Arc::new(Sum::new(
diff --git 
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index 6b3a633f3c..6304e01c63 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -107,7 +107,7 @@ mod sp_repartition_fuzz_tests {
         let running_source = Arc::new(
             MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
                 .unwrap()
-                .with_sort_information(sort_keys.clone()),
+                .with_sort_information(vec![sort_keys.clone()]),
         );
         let hash_exprs = vec![col("c", &schema).unwrap()];
 
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 870baf948b..c28b3440d7 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -432,7 +432,7 @@ async fn run_window_test(
     ];
     let memory_exec =
         MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), 
None).unwrap();
-    let memory_exec = 
memory_exec.with_sort_information(source_sort_keys.clone());
+    let memory_exec = 
memory_exec.with_sort_information(vec![source_sort_keys.clone()]);
     let mut exec1 = Arc::new(memory_exec) as Arc<dyn ExecutionPlan>;
     // Table is ordered according to ORDER BY a, b, c In linear test we use 
PARTITION BY b, ORDER BY a
     // For WindowAggExec  to produce correct result it need table to be 
ordered by b,a. Hence add a sort.
@@ -460,7 +460,7 @@ async fn run_window_test(
     let exec2 = Arc::new(
         MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
             .unwrap()
-            .with_sort_information(source_sort_keys.clone()),
+            .with_sort_information(vec![source_sort_keys.clone()]),
     );
     let running_window_exec = Arc::new(
         BoundedWindowAggExec::try_new(
diff --git a/datafusion/core/tests/memory_limit.rs 
b/datafusion/core/tests/memory_limit.rs
index 862f5a8f97..99e6aa5f9f 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -27,7 +27,7 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule;
 use datafusion::physical_plan::memory::MemoryExec;
 use datafusion::physical_plan::streaming::PartitionStream;
 use datafusion_expr::{Expr, TableType};
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 use futures::StreamExt;
 use std::any::Any;
 use std::sync::{Arc, OnceLock};
@@ -515,7 +515,7 @@ impl Scenario {
                     descending: false,
                     nulls_first: false,
                 };
-                let sort_information = vec![
+                let sort_information = vec![vec![
                     PhysicalSortExpr {
                         expr: col("a", &schema).unwrap(),
                         options,
@@ -524,7 +524,7 @@ impl Scenario {
                         expr: col("b", &schema).unwrap(),
                         options,
                     },
-                ];
+                ]];
 
                 let table = SortedTableProvider::new(batches, 
sort_information);
                 Arc::new(table)
@@ -637,14 +637,11 @@ impl PartitionStream for DummyStreamPartition {
 struct SortedTableProvider {
     schema: SchemaRef,
     batches: Vec<Vec<RecordBatch>>,
-    sort_information: Vec<PhysicalSortExpr>,
+    sort_information: Vec<LexOrdering>,
 }
 
 impl SortedTableProvider {
-    fn new(
-        batches: Vec<Vec<RecordBatch>>,
-        sort_information: Vec<PhysicalSortExpr>,
-    ) -> Self {
+    fn new(batches: Vec<Vec<RecordBatch>>, sort_information: Vec<LexOrdering>) 
-> Self {
         let schema = batches[0][0].schema();
         Self {
             schema,

Reply via email to