This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ecb6cd78d Preserve all of the valid orderings during merging. (#8169)
6ecb6cd78d is described below

commit 6ecb6cd78dcd0508d9c5e8543275cd67ea4bbad6
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Nov 15 11:04:29 2023 +0300

    Preserve all of the valid orderings during merging. (#8169)
    
    * Preserve all of the valid orderings during merging.
    
    * Update datafusion/physical-expr/src/equivalence.rs
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
    
    * Address reviews
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 .../fuzz_cases/sort_preserving_repartition_fuzz.rs | 276 ++++++++++++++++++++-
 datafusion/physical-expr/src/equivalence.rs        |  31 ++-
 datafusion/physical-plan/src/repartition/mod.rs    |   3 -
 .../src/sorts/sort_preserving_merge.rs             |   3 +-
 datafusion/sqllogictest/test_files/window.slt      |  45 ++++
 5 files changed, 335 insertions(+), 23 deletions(-)

diff --git 
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index 818698d6c0..5bc29ba1c2 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -17,22 +17,272 @@
 
 #[cfg(test)]
 mod sp_repartition_fuzz_tests {
-    use arrow::compute::concat_batches;
-    use arrow_array::{ArrayRef, Int64Array, RecordBatch};
-    use arrow_schema::SortOptions;
-    use datafusion::physical_plan::memory::MemoryExec;
-    use datafusion::physical_plan::repartition::RepartitionExec;
-    use 
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-    use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning};
-    use datafusion::prelude::SessionContext;
-    use datafusion_execution::config::SessionConfig;
-    use datafusion_physical_expr::expressions::col;
-    use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-    use rand::rngs::StdRng;
-    use rand::{Rng, SeedableRng};
     use std::sync::Arc;
+
+    use arrow::compute::{concat_batches, lexsort, SortColumn};
+    use arrow_array::{ArrayRef, Int64Array, RecordBatch, UInt64Array};
+    use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
+
+    use datafusion::physical_plan::{
+        collect,
+        memory::MemoryExec,
+        metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
+        repartition::RepartitionExec,
+        sorts::sort_preserving_merge::SortPreservingMergeExec,
+        sorts::streaming_merge::streaming_merge,
+        stream::RecordBatchStreamAdapter,
+        ExecutionPlan, Partitioning,
+    };
+    use datafusion::prelude::SessionContext;
+    use datafusion_common::Result;
+    use datafusion_execution::{
+        config::SessionConfig, memory_pool::MemoryConsumer, 
SendableRecordBatchStream,
+    };
+    use datafusion_physical_expr::{
+        expressions::{col, Column},
+        EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+    };
     use test_utils::add_empty_batches;
 
+    use itertools::izip;
+    use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
+
+    // Generate a schema which consists of 6 columns (a, b, c, d, e, f)
+    fn create_test_schema() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let e = Field::new("e", DataType::Int32, true);
+        let f = Field::new("f", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
+
+        Ok(schema)
+    }
+
+    /// Construct a schema with random ordering
+    /// among column a, b, c, d
+    /// where
+    /// Column [a=f] (e.g they are aliases).
+    /// Column e is constant.
+    fn create_random_schema(seed: u64) -> Result<(SchemaRef, 
EquivalenceProperties)> {
+        let test_schema = create_test_schema()?;
+        let col_a = &col("a", &test_schema)?;
+        let col_b = &col("b", &test_schema)?;
+        let col_c = &col("c", &test_schema)?;
+        let col_d = &col("d", &test_schema)?;
+        let col_e = &col("e", &test_schema)?;
+        let col_f = &col("f", &test_schema)?;
+        let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];
+
+        let mut eq_properties = 
EquivalenceProperties::new(test_schema.clone());
+        // Define a and f are aliases
+        eq_properties.add_equal_conditions(col_a, col_f);
+        // Column e has constant value.
+        eq_properties = eq_properties.add_constants([col_e.clone()]);
+
+        // Randomly order columns for sorting
+        let mut rng = StdRng::seed_from_u64(seed);
+        let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d 
are sorted
+
+        let options_asc = SortOptions {
+            descending: false,
+            nulls_first: false,
+        };
+
+        while !remaining_exprs.is_empty() {
+            let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
+            remaining_exprs.shuffle(&mut rng);
+
+            let ordering = remaining_exprs
+                .drain(0..n_sort_expr)
+                .map(|expr| PhysicalSortExpr {
+                    expr: expr.clone(),
+                    options: options_asc,
+                })
+                .collect();
+
+            eq_properties.add_new_orderings([ordering]);
+        }
+
+        Ok((test_schema, eq_properties))
+    }
+
+    // If we already generated a random result for one of the
+    // expressions in the equivalence classes. For other expressions in the 
same
+    // equivalence class use same result. This util gets already calculated 
result, when available.
+    fn get_representative_arr(
+        eq_group: &[Arc<dyn PhysicalExpr>],
+        existing_vec: &[Option<ArrayRef>],
+        schema: SchemaRef,
+    ) -> Option<ArrayRef> {
+        for expr in eq_group.iter() {
+            let col = expr.as_any().downcast_ref::<Column>().unwrap();
+            let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+            if let Some(res) = &existing_vec[idx] {
+                return Some(res.clone());
+            }
+        }
+        None
+    }
+
+    // Generate a table that satisfies the given equivalence properties; i.e.
+    // equivalences, ordering equivalences, and constants.
+    fn generate_table_for_eq_properties(
+        eq_properties: &EquivalenceProperties,
+        n_elem: usize,
+        n_distinct: usize,
+    ) -> Result<RecordBatch> {
+        let mut rng = StdRng::seed_from_u64(23);
+
+        let schema = eq_properties.schema();
+        let mut schema_vec = vec![None; schema.fields.len()];
+
+        // Utility closure to generate random array
+        let mut generate_random_array = |num_elems: usize, max_val: usize| -> 
ArrayRef {
+            let values: Vec<u64> = (0..num_elems)
+                .map(|_| rng.gen_range(0..max_val) as u64)
+                .collect();
+            Arc::new(UInt64Array::from_iter_values(values))
+        };
+
+        // Fill constant columns
+        for constant in eq_properties.constants() {
+            let col = constant.as_any().downcast_ref::<Column>().unwrap();
+            let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+            let arr =
+                Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as 
ArrayRef;
+            schema_vec[idx] = Some(arr);
+        }
+
+        // Fill columns based on ordering equivalences
+        for ordering in eq_properties.oeq_class().iter() {
+            let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
+                .iter()
+                .map(|PhysicalSortExpr { expr, options }| {
+                    let col = expr.as_any().downcast_ref::<Column>().unwrap();
+                    let (idx, _field) = 
schema.column_with_name(col.name()).unwrap();
+                    let arr = generate_random_array(n_elem, n_distinct);
+                    (
+                        SortColumn {
+                            values: arr,
+                            options: Some(*options),
+                        },
+                        idx,
+                    )
+                })
+                .unzip();
+
+            let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
+            for (idx, arr) in izip!(indices, sort_arrs) {
+                schema_vec[idx] = Some(arr);
+            }
+        }
+
+        // Fill columns based on equivalence groups
+        for eq_group in eq_properties.eq_group().iter() {
+            let representative_array =
+                get_representative_arr(eq_group, &schema_vec, schema.clone())
+                    .unwrap_or_else(|| generate_random_array(n_elem, 
n_distinct));
+
+            for expr in eq_group {
+                let col = expr.as_any().downcast_ref::<Column>().unwrap();
+                let (idx, _field) = 
schema.column_with_name(col.name()).unwrap();
+                schema_vec[idx] = Some(representative_array.clone());
+            }
+        }
+
+        let res: Vec<_> = schema_vec
+            .into_iter()
+            .zip(schema.fields.iter())
+            .map(|(elem, field)| {
+                (
+                    field.name(),
+                    // Generate random values for columns that do not occur in 
any of the groups (equivalence, ordering equivalence, constants)
+                    elem.unwrap_or_else(|| generate_random_array(n_elem, 
n_distinct)),
+                )
+            })
+            .collect();
+
+        Ok(RecordBatch::try_from_iter(res)?)
+    }
+
+    // This test checks for whether during sort preserving merge we can 
preserve all of the valid orderings
+    // successfully. If at the input we have orderings [a ASC, b ASC], [c ASC, 
d ASC]
+    // After sort preserving merge orderings [a ASC, b ASC], [c ASC, d ASC] 
should still be valid.
+    #[tokio::test]
+    async fn stream_merge_multi_order_preserve() -> Result<()> {
+        const N_PARTITION: usize = 8;
+        const N_ELEM: usize = 25;
+        const N_DISTINCT: usize = 5;
+        const N_DIFF_SCHEMA: usize = 20;
+
+        use datafusion::physical_plan::common::collect;
+        for seed in 0..N_DIFF_SCHEMA {
+            // Create a schema with random equivalence properties
+            let (_test_schema, eq_properties) = create_random_schema(seed as 
u64)?;
+            let table_data_with_properties =
+                generate_table_for_eq_properties(&eq_properties, N_ELEM, 
N_DISTINCT)?;
+            let schema = table_data_with_properties.schema();
+            let streams: Vec<SendableRecordBatchStream> = (0..N_PARTITION)
+                .map(|_idx| {
+                    let batch = table_data_with_properties.clone();
+                    Box::pin(RecordBatchStreamAdapter::new(
+                        schema.clone(),
+                        futures::stream::once(async { Ok(batch) }),
+                    )) as SendableRecordBatchStream
+                })
+                .collect::<Vec<_>>();
+
+            // Returns concatenated version of the all available orderings
+            let exprs = eq_properties
+                .oeq_class()
+                .output_ordering()
+                .unwrap_or_default();
+
+            let context = SessionContext::new().task_ctx();
+            let mem_reservation =
+                
MemoryConsumer::new("test".to_string()).register(context.memory_pool());
+
+            // Internally SortPreservingMergeExec uses this function for 
merging.
+            let res = streaming_merge(
+                streams,
+                schema,
+                &exprs,
+                BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
+                1,
+                None,
+                mem_reservation,
+            )?;
+            let res = collect(res).await?;
+            // Contains the merged result.
+            let res = concat_batches(&res[0].schema(), &res)?;
+
+            for ordering in eq_properties.oeq_class().iter() {
+                let err_msg = format!("error in eq properties: {:?}", 
eq_properties);
+                let sort_solumns = ordering
+                    .iter()
+                    .map(|sort_expr| sort_expr.evaluate_to_sort_column(&res))
+                    .collect::<Result<Vec<_>>>()?;
+                let orig_columns = sort_solumns
+                    .iter()
+                    .map(|sort_column| sort_column.values.clone())
+                    .collect::<Vec<_>>();
+                let sorted_columns = lexsort(&sort_solumns, None)?;
+
+                // Make sure after merging ordering is still valid.
+                assert_eq!(orig_columns.len(), sorted_columns.len(), "{}", 
err_msg);
+                assert!(
+                    izip!(orig_columns.into_iter(), sorted_columns.into_iter())
+                        .all(|(lhs, rhs)| { lhs == rhs }),
+                    "{}",
+                    err_msg
+                )
+            }
+        }
+        Ok(())
+    }
+
     #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
     async fn sort_preserving_repartition_test() {
         let seed_start = 0;
diff --git a/datafusion/physical-expr/src/equivalence.rs 
b/datafusion/physical-expr/src/equivalence.rs
index 84291653fb..f3bfe49616 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -229,7 +229,7 @@ impl EquivalenceGroup {
     }
 
     /// Returns an iterator over the equivalence classes in this group.
-    fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
+    pub fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
         self.classes.iter()
     }
 
@@ -551,7 +551,7 @@ impl EquivalenceGroup {
 
 /// This function constructs a duplicate-free `LexOrderingReq` by filtering out
 /// duplicate entries that have same physical expression inside. For example,
-/// `vec![a Some(Asc), a Some(Desc)]` collapses to `vec![a Some(Asc)]`.
+/// `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a Some(ASC)]`.
 pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
     let mut output = Vec::<PhysicalSortRequirement>::new();
     for item in input {
@@ -562,6 +562,19 @@ pub fn collapse_lex_req(input: LexRequirement) -> 
LexRequirement {
     output
 }
 
+/// This function constructs a duplicate-free `LexOrdering` by filtering out
+/// duplicate entries that have same physical expression inside. For example,
+/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
+pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
+    let mut output = Vec::<PhysicalSortExpr>::new();
+    for item in input {
+        if !output.iter().any(|req| req.expr.eq(&item.expr)) {
+            output.push(item);
+        }
+    }
+    output
+}
+
 /// An `OrderingEquivalenceClass` object keeps track of different alternative
 /// orderings than can describe a schema. For example, consider the following 
table:
 ///
@@ -667,10 +680,13 @@ impl OrderingEquivalenceClass {
         }
     }
 
-    /// Gets the first ordering entry in this ordering equivalence class.
-    /// This is one of the many valid orderings (if there are multiple).
+    /// Returns the concatenation of all the orderings. This enables merge
+    /// operations to preserve all equivalent orderings simultaneously.
     pub fn output_ordering(&self) -> Option<LexOrdering> {
-        self.orderings.first().cloned()
+        let output_ordering =
+            self.orderings.iter().flatten().cloned().collect::<Vec<_>>();
+        let output_ordering = collapse_lex_ordering(output_ordering);
+        (!output_ordering.is_empty()).then_some(output_ordering)
     }
 
     // Append orderings in `other` to all existing orderings in this 
equivalence
@@ -825,6 +841,11 @@ impl EquivalenceProperties {
         &self.eq_group
     }
 
+    /// Returns a reference to the constant expressions
+    pub fn constants(&self) -> &[Arc<dyn PhysicalExpr>] {
+        &self.constants
+    }
+
     /// Returns the normalized version of the ordering equivalence class 
within.
     /// Normalization removes constants and duplicates as well as standardizing
     /// expressions according to the equivalence group within.
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 9719446d78..24f227d8a5 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -472,9 +472,6 @@ impl ExecutionPlan for RepartitionExec {
         if !self.maintains_input_order()[0] {
             result.clear_orderings();
         }
-        if self.preserve_order {
-            result = 
result.with_reorder(self.sort_exprs().unwrap_or_default().to_vec())
-        }
         result
     }
 
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 65cd8e4148..f4b57e8bfb 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -174,8 +174,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
-        let output_oeq = self.input.equivalence_properties();
-        output_oeq.with_reorder(self.expr.to_vec())
+        self.input.equivalence_properties()
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 2eb0576d55..8be02b846c 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3396,6 +3396,21 @@ WITH ORDER (a ASC, b ASC)
 WITH ORDER (c ASC)
 LOCATION '../core/tests/data/window_2.csv';
 
+# Create an unbounded source where there is multiple orderings.
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE multiple_ordered_table_inf (
+  a0 INTEGER,
+  a INTEGER,
+  b INTEGER,
+  c INTEGER,
+  d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC)
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv';
+
 # All of the window execs in the physical plan should work in the
 # sorted mode.
 query TT
@@ -3477,3 +3492,33 @@ query II
 select sum(1) over() x, sum(1) over () y
 ----
 1 1
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# source is ordered by [a ASC, b ASC], [c ASC]
+# after sort preserving repartition and sort preserving merge
+# we should still have the orderings [a ASC, b ASC], [c ASC].
+query TT
+EXPLAIN SELECT *,
+       AVG(d) OVER sliding_window AS avg_d
+FROM multiple_ordered_table_inf
+WINDOW sliding_window AS (
+    PARTITION BY d
+       ORDER BY a RANGE 10 PRECEDING
+)
+ORDER BY c
+----
+logical_plan
+Sort: multiple_ordered_table_inf.c ASC NULLS LAST
+--Projection: multiple_ordered_table_inf.a0, multiple_ordered_table_inf.a, 
multiple_ordered_table_inf.b, multiple_ordered_table_inf.c, 
multiple_ordered_table_inf.d, AVG(multiple_ordered_table_inf.d) PARTITION BY 
[multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW AS avg_d
+----WindowAggr: windowExpr=[[AVG(CAST(multiple_ordered_table_inf.d AS 
Float64)) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY 
[multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 
CURRENT ROW]]
+------TableScan: multiple_ordered_table_inf projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [c@3 ASC NULLS LAST]
+--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, 
AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] 
ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 
PRECEDING AND CURRENT ROW@5 as avg_d]
+----BoundedWindowAggExec: wdw=[AVG(multiple_ordered_table_inf.d) PARTITION BY 
[multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS 
LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Ok(Field { name: 
"AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] 
ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 
PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), [...]
+------CoalesceBatchesExec: target_batch_size=4096
+--------SortPreservingRepartitionExec: partitioning=Hash([d@4], 2), 
input_partitions=2, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
+----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC 
NULLS LAST], has_header=true

Reply via email to