This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6ecb6cd78d Preserve all of the valid orderings during merging. (#8169)
6ecb6cd78d is described below
commit 6ecb6cd78dcd0508d9c5e8543275cd67ea4bbad6
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Nov 15 11:04:29 2023 +0300
Preserve all of the valid orderings during merging. (#8169)
* Preserve all of the valid orderings during merging.
* Update datafusion/physical-expr/src/equivalence.rs
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
* Address reviews
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
.../fuzz_cases/sort_preserving_repartition_fuzz.rs | 276 ++++++++++++++++++++-
datafusion/physical-expr/src/equivalence.rs | 31 ++-
datafusion/physical-plan/src/repartition/mod.rs | 3 -
.../src/sorts/sort_preserving_merge.rs | 3 +-
datafusion/sqllogictest/test_files/window.slt | 45 ++++
5 files changed, 335 insertions(+), 23 deletions(-)
diff --git
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index 818698d6c0..5bc29ba1c2 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -17,22 +17,272 @@
#[cfg(test)]
mod sp_repartition_fuzz_tests {
- use arrow::compute::concat_batches;
- use arrow_array::{ArrayRef, Int64Array, RecordBatch};
- use arrow_schema::SortOptions;
- use datafusion::physical_plan::memory::MemoryExec;
- use datafusion::physical_plan::repartition::RepartitionExec;
- use
datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
- use datafusion::physical_plan::{collect, ExecutionPlan, Partitioning};
- use datafusion::prelude::SessionContext;
- use datafusion_execution::config::SessionConfig;
- use datafusion_physical_expr::expressions::col;
- use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
- use rand::rngs::StdRng;
- use rand::{Rng, SeedableRng};
use std::sync::Arc;
+
+ use arrow::compute::{concat_batches, lexsort, SortColumn};
+ use arrow_array::{ArrayRef, Int64Array, RecordBatch, UInt64Array};
+ use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
+
+ use datafusion::physical_plan::{
+ collect,
+ memory::MemoryExec,
+ metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
+ repartition::RepartitionExec,
+ sorts::sort_preserving_merge::SortPreservingMergeExec,
+ sorts::streaming_merge::streaming_merge,
+ stream::RecordBatchStreamAdapter,
+ ExecutionPlan, Partitioning,
+ };
+ use datafusion::prelude::SessionContext;
+ use datafusion_common::Result;
+ use datafusion_execution::{
+ config::SessionConfig, memory_pool::MemoryConsumer,
SendableRecordBatchStream,
+ };
+ use datafusion_physical_expr::{
+ expressions::{col, Column},
+ EquivalenceProperties, PhysicalExpr, PhysicalSortExpr,
+ };
use test_utils::add_empty_batches;
+ use itertools::izip;
+ use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
+
+ // Generate a schema which consists of 6 columns (a, b, c, d, e, f)
+ fn create_test_schema() -> Result<SchemaRef> {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, true);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, true);
+ let e = Field::new("e", DataType::Int32, true);
+ let f = Field::new("f", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e, f]));
+
+ Ok(schema)
+ }
+
+ /// Construct a schema with random ordering
+ /// among column a, b, c, d
+ /// where
+ /// Column [a=f] (e.g they are aliases).
+ /// Column e is constant.
+ fn create_random_schema(seed: u64) -> Result<(SchemaRef,
EquivalenceProperties)> {
+ let test_schema = create_test_schema()?;
+ let col_a = &col("a", &test_schema)?;
+ let col_b = &col("b", &test_schema)?;
+ let col_c = &col("c", &test_schema)?;
+ let col_d = &col("d", &test_schema)?;
+ let col_e = &col("e", &test_schema)?;
+ let col_f = &col("f", &test_schema)?;
+ let col_exprs = [col_a, col_b, col_c, col_d, col_e, col_f];
+
+ let mut eq_properties =
EquivalenceProperties::new(test_schema.clone());
+ // Define a and f are aliases
+ eq_properties.add_equal_conditions(col_a, col_f);
+ // Column e has constant value.
+ eq_properties = eq_properties.add_constants([col_e.clone()]);
+
+ // Randomly order columns for sorting
+ let mut rng = StdRng::seed_from_u64(seed);
+ let mut remaining_exprs = col_exprs[0..4].to_vec(); // only a, b, c, d
are sorted
+
+ let options_asc = SortOptions {
+ descending: false,
+ nulls_first: false,
+ };
+
+ while !remaining_exprs.is_empty() {
+ let n_sort_expr = rng.gen_range(0..remaining_exprs.len() + 1);
+ remaining_exprs.shuffle(&mut rng);
+
+ let ordering = remaining_exprs
+ .drain(0..n_sort_expr)
+ .map(|expr| PhysicalSortExpr {
+ expr: expr.clone(),
+ options: options_asc,
+ })
+ .collect();
+
+ eq_properties.add_new_orderings([ordering]);
+ }
+
+ Ok((test_schema, eq_properties))
+ }
+
+ // If we already generated a random result for one of the
+ // expressions in the equivalence classes. For other expressions in the
same
+ // equivalence class use same result. This util gets already calculated
result, when available.
+ fn get_representative_arr(
+ eq_group: &[Arc<dyn PhysicalExpr>],
+ existing_vec: &[Option<ArrayRef>],
+ schema: SchemaRef,
+ ) -> Option<ArrayRef> {
+ for expr in eq_group.iter() {
+ let col = expr.as_any().downcast_ref::<Column>().unwrap();
+ let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+ if let Some(res) = &existing_vec[idx] {
+ return Some(res.clone());
+ }
+ }
+ None
+ }
+
+ // Generate a table that satisfies the given equivalence properties; i.e.
+ // equivalences, ordering equivalences, and constants.
+ fn generate_table_for_eq_properties(
+ eq_properties: &EquivalenceProperties,
+ n_elem: usize,
+ n_distinct: usize,
+ ) -> Result<RecordBatch> {
+ let mut rng = StdRng::seed_from_u64(23);
+
+ let schema = eq_properties.schema();
+ let mut schema_vec = vec![None; schema.fields.len()];
+
+ // Utility closure to generate random array
+ let mut generate_random_array = |num_elems: usize, max_val: usize| ->
ArrayRef {
+ let values: Vec<u64> = (0..num_elems)
+ .map(|_| rng.gen_range(0..max_val) as u64)
+ .collect();
+ Arc::new(UInt64Array::from_iter_values(values))
+ };
+
+ // Fill constant columns
+ for constant in eq_properties.constants() {
+ let col = constant.as_any().downcast_ref::<Column>().unwrap();
+ let (idx, _field) = schema.column_with_name(col.name()).unwrap();
+ let arr =
+ Arc::new(UInt64Array::from_iter_values(vec![0; n_elem])) as
ArrayRef;
+ schema_vec[idx] = Some(arr);
+ }
+
+ // Fill columns based on ordering equivalences
+ for ordering in eq_properties.oeq_class().iter() {
+ let (sort_columns, indices): (Vec<_>, Vec<_>) = ordering
+ .iter()
+ .map(|PhysicalSortExpr { expr, options }| {
+ let col = expr.as_any().downcast_ref::<Column>().unwrap();
+ let (idx, _field) =
schema.column_with_name(col.name()).unwrap();
+ let arr = generate_random_array(n_elem, n_distinct);
+ (
+ SortColumn {
+ values: arr,
+ options: Some(*options),
+ },
+ idx,
+ )
+ })
+ .unzip();
+
+ let sort_arrs = arrow::compute::lexsort(&sort_columns, None)?;
+ for (idx, arr) in izip!(indices, sort_arrs) {
+ schema_vec[idx] = Some(arr);
+ }
+ }
+
+ // Fill columns based on equivalence groups
+ for eq_group in eq_properties.eq_group().iter() {
+ let representative_array =
+ get_representative_arr(eq_group, &schema_vec, schema.clone())
+ .unwrap_or_else(|| generate_random_array(n_elem,
n_distinct));
+
+ for expr in eq_group {
+ let col = expr.as_any().downcast_ref::<Column>().unwrap();
+ let (idx, _field) =
schema.column_with_name(col.name()).unwrap();
+ schema_vec[idx] = Some(representative_array.clone());
+ }
+ }
+
+ let res: Vec<_> = schema_vec
+ .into_iter()
+ .zip(schema.fields.iter())
+ .map(|(elem, field)| {
+ (
+ field.name(),
+ // Generate random values for columns that do not occur in
any of the groups (equivalence, ordering equivalence, constants)
+ elem.unwrap_or_else(|| generate_random_array(n_elem,
n_distinct)),
+ )
+ })
+ .collect();
+
+ Ok(RecordBatch::try_from_iter(res)?)
+ }
+
+ // This test checks for whether during sort preserving merge we can
preserve all of the valid orderings
+ // successfully. If at the input we have orderings [a ASC, b ASC], [c ASC,
d ASC]
+ // After sort preserving merge orderings [a ASC, b ASC], [c ASC, d ASC]
should still be valid.
+ #[tokio::test]
+ async fn stream_merge_multi_order_preserve() -> Result<()> {
+ const N_PARTITION: usize = 8;
+ const N_ELEM: usize = 25;
+ const N_DISTINCT: usize = 5;
+ const N_DIFF_SCHEMA: usize = 20;
+
+ use datafusion::physical_plan::common::collect;
+ for seed in 0..N_DIFF_SCHEMA {
+ // Create a schema with random equivalence properties
+ let (_test_schema, eq_properties) = create_random_schema(seed as
u64)?;
+ let table_data_with_properties =
+ generate_table_for_eq_properties(&eq_properties, N_ELEM,
N_DISTINCT)?;
+ let schema = table_data_with_properties.schema();
+ let streams: Vec<SendableRecordBatchStream> = (0..N_PARTITION)
+ .map(|_idx| {
+ let batch = table_data_with_properties.clone();
+ Box::pin(RecordBatchStreamAdapter::new(
+ schema.clone(),
+ futures::stream::once(async { Ok(batch) }),
+ )) as SendableRecordBatchStream
+ })
+ .collect::<Vec<_>>();
+
+ // Returns concatenated version of the all available orderings
+ let exprs = eq_properties
+ .oeq_class()
+ .output_ordering()
+ .unwrap_or_default();
+
+ let context = SessionContext::new().task_ctx();
+ let mem_reservation =
+
MemoryConsumer::new("test".to_string()).register(context.memory_pool());
+
+ // Internally SortPreservingMergeExec uses this function for
merging.
+ let res = streaming_merge(
+ streams,
+ schema,
+ &exprs,
+ BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
+ 1,
+ None,
+ mem_reservation,
+ )?;
+ let res = collect(res).await?;
+ // Contains the merged result.
+ let res = concat_batches(&res[0].schema(), &res)?;
+
+ for ordering in eq_properties.oeq_class().iter() {
+ let err_msg = format!("error in eq properties: {:?}",
eq_properties);
+ let sort_solumns = ordering
+ .iter()
+ .map(|sort_expr| sort_expr.evaluate_to_sort_column(&res))
+ .collect::<Result<Vec<_>>>()?;
+ let orig_columns = sort_solumns
+ .iter()
+ .map(|sort_column| sort_column.values.clone())
+ .collect::<Vec<_>>();
+ let sorted_columns = lexsort(&sort_solumns, None)?;
+
+ // Make sure after merging ordering is still valid.
+ assert_eq!(orig_columns.len(), sorted_columns.len(), "{}",
err_msg);
+ assert!(
+ izip!(orig_columns.into_iter(), sorted_columns.into_iter())
+ .all(|(lhs, rhs)| { lhs == rhs }),
+ "{}",
+ err_msg
+ )
+ }
+ }
+ Ok(())
+ }
+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn sort_preserving_repartition_test() {
let seed_start = 0;
diff --git a/datafusion/physical-expr/src/equivalence.rs
b/datafusion/physical-expr/src/equivalence.rs
index 84291653fb..f3bfe49616 100644
--- a/datafusion/physical-expr/src/equivalence.rs
+++ b/datafusion/physical-expr/src/equivalence.rs
@@ -229,7 +229,7 @@ impl EquivalenceGroup {
}
/// Returns an iterator over the equivalence classes in this group.
- fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
+ pub fn iter(&self) -> impl Iterator<Item = &EquivalenceClass> {
self.classes.iter()
}
@@ -551,7 +551,7 @@ impl EquivalenceGroup {
/// This function constructs a duplicate-free `LexOrderingReq` by filtering out
/// duplicate entries that have same physical expression inside. For example,
-/// `vec![a Some(Asc), a Some(Desc)]` collapses to `vec![a Some(Asc)]`.
+/// `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a Some(ASC)]`.
pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
let mut output = Vec::<PhysicalSortRequirement>::new();
for item in input {
@@ -562,6 +562,19 @@ pub fn collapse_lex_req(input: LexRequirement) ->
LexRequirement {
output
}
+/// This function constructs a duplicate-free `LexOrdering` by filtering out
+/// duplicate entries that have same physical expression inside. For example,
+/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`.
+pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering {
+ let mut output = Vec::<PhysicalSortExpr>::new();
+ for item in input {
+ if !output.iter().any(|req| req.expr.eq(&item.expr)) {
+ output.push(item);
+ }
+ }
+ output
+}
+
/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following
table:
///
@@ -667,10 +680,13 @@ impl OrderingEquivalenceClass {
}
}
- /// Gets the first ordering entry in this ordering equivalence class.
- /// This is one of the many valid orderings (if there are multiple).
+ /// Returns the concatenation of all the orderings. This enables merge
+ /// operations to preserve all equivalent orderings simultaneously.
pub fn output_ordering(&self) -> Option<LexOrdering> {
- self.orderings.first().cloned()
+ let output_ordering =
+ self.orderings.iter().flatten().cloned().collect::<Vec<_>>();
+ let output_ordering = collapse_lex_ordering(output_ordering);
+ (!output_ordering.is_empty()).then_some(output_ordering)
}
// Append orderings in `other` to all existing orderings in this
equivalence
@@ -825,6 +841,11 @@ impl EquivalenceProperties {
&self.eq_group
}
+ /// Returns a reference to the constant expressions
+ pub fn constants(&self) -> &[Arc<dyn PhysicalExpr>] {
+ &self.constants
+ }
+
/// Returns the normalized version of the ordering equivalence class
within.
/// Normalization removes constants and duplicates as well as standardizing
/// expressions according to the equivalence group within.
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 9719446d78..24f227d8a5 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -472,9 +472,6 @@ impl ExecutionPlan for RepartitionExec {
if !self.maintains_input_order()[0] {
result.clear_orderings();
}
- if self.preserve_order {
- result =
result.with_reorder(self.sort_exprs().unwrap_or_default().to_vec())
- }
result
}
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 65cd8e4148..f4b57e8bfb 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -174,8 +174,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
}
fn equivalence_properties(&self) -> EquivalenceProperties {
- let output_oeq = self.input.equivalence_properties();
- output_oeq.with_reorder(self.expr.to_vec())
+ self.input.equivalence_properties()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 2eb0576d55..8be02b846c 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -3396,6 +3396,21 @@ WITH ORDER (a ASC, b ASC)
WITH ORDER (c ASC)
LOCATION '../core/tests/data/window_2.csv';
+# Create an unbounded source where there is multiple orderings.
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE multiple_ordered_table_inf (
+ a0 INTEGER,
+ a INTEGER,
+ b INTEGER,
+ c INTEGER,
+ d INTEGER
+)
+STORED AS CSV
+WITH HEADER ROW
+WITH ORDER (a ASC, b ASC)
+WITH ORDER (c ASC)
+LOCATION '../core/tests/data/window_2.csv';
+
# All of the window execs in the physical plan should work in the
# sorted mode.
query TT
@@ -3477,3 +3492,33 @@ query II
select sum(1) over() x, sum(1) over () y
----
1 1
+
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# source is ordered by [a ASC, b ASC], [c ASC]
+# after sort preserving repartition and sort preserving merge
+# we should still have the orderings [a ASC, b ASC], [c ASC].
+query TT
+EXPLAIN SELECT *,
+ AVG(d) OVER sliding_window AS avg_d
+FROM multiple_ordered_table_inf
+WINDOW sliding_window AS (
+ PARTITION BY d
+ ORDER BY a RANGE 10 PRECEDING
+)
+ORDER BY c
+----
+logical_plan
+Sort: multiple_ordered_table_inf.c ASC NULLS LAST
+--Projection: multiple_ordered_table_inf.a0, multiple_ordered_table_inf.a,
multiple_ordered_table_inf.b, multiple_ordered_table_inf.c,
multiple_ordered_table_inf.d, AVG(multiple_ordered_table_inf.d) PARTITION BY
[multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW AS avg_d
+----WindowAggr: windowExpr=[[AVG(CAST(multiple_ordered_table_inf.d AS
Float64)) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY
[multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND
CURRENT ROW]]
+------TableScan: multiple_ordered_table_inf projection=[a0, a, b, c, d]
+physical_plan
+SortPreservingMergeExec: [c@3 ASC NULLS LAST]
+--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d,
AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d]
ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10
PRECEDING AND CURRENT ROW@5 as avg_d]
+----BoundedWindowAggExec: wdw=[AVG(multiple_ordered_table_inf.d) PARTITION BY
[multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS
LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Ok(Field { name:
"AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d]
ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10
PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), [...]
+------CoalesceBatchesExec: target_batch_size=4096
+--------SortPreservingRepartitionExec: partitioning=Hash([d@4], 2),
input_partitions=2, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
+----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+------------CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC
NULLS LAST], has_header=true