This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f9a9f125d9 Adding order equivalence support on MemoryExec (#7259)
f9a9f125d9 is described below
commit f9a9f125d9926d9748dbb98d0e9c61c02e7475ad
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Fri Aug 11 15:23:27 2023 +0300
Adding order equivalence support on MemoryExec (#7259)
* Memory equivalence support
* MemoryExec refactor
* Adding test
* Update datafusion/core/src/physical_plan/memory.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Updating the comment
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../src/physical_plan/joins/symmetric_hash_join.rs | 50 ++++++------
.../core/src/physical_plan/joins/test_utils.rs | 22 +++---
datafusion/core/src/physical_plan/memory.rs | 91 +++++++++++++++++++---
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 +-
.../fuzz_cases/sort_preserving_repartition_fuzz.rs | 2 +-
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 4 +-
datafusion/core/tests/memory_limit.rs | 13 ++--
7 files changed, 123 insertions(+), 61 deletions(-)
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index efe7ce503b..5e0ae189ff 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -1520,8 +1520,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -1595,8 +1595,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -1658,7 +1658,8 @@ mod tests {
build_sides_record_batches(TABLE_SIZE, cardinality)?;
let left_schema = &left_batch.schema();
let right_schema = &right_batch.schema();
- let (left, right) = create_memory_table(left_batch, right_batch, None,
None, 13)?;
+ let (left, right) =
+ create_memory_table(left_batch, right_batch, vec![], vec![], 13)?;
let on = vec![(
Column::new_with_schema("lc1", left_schema)?,
@@ -1709,7 +1710,8 @@ mod tests {
let (left_batch, right_batch) = build_sides_record_batches(TABLE_SIZE,
(11, 21))?;
let left_schema = &left_batch.schema();
let right_schema = &right_batch.schema();
- let (left, right) = create_memory_table(left_batch, right_batch, None,
None, 13)?;
+ let (left, right) =
+ create_memory_table(left_batch, right_batch, vec![], vec![], 13)?;
let on = vec![(
Column::new_with_schema("lc1", left_schema)?,
@@ -1764,8 +1766,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -1828,8 +1830,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -1891,8 +1893,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -1955,8 +1957,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -2015,8 +2017,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
@@ -2097,8 +2099,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
10,
)?;
@@ -2226,8 +2228,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
let intermediate_schema = Schema::new(vec![
@@ -2310,8 +2312,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
let intermediate_schema = Schema::new(vec![
@@ -2379,8 +2381,8 @@ mod tests {
let (left, right) = create_memory_table(
left_batch,
right_batch,
- Some(left_sorted),
- Some(right_sorted),
+ vec![left_sorted],
+ vec![right_sorted],
13,
)?;
diff --git a/datafusion/core/src/physical_plan/joins/test_utils.rs
b/datafusion/core/src/physical_plan/joins/test_utils.rs
index e786fb5eb5..44610ab09a 100644
--- a/datafusion/core/src/physical_plan/joins/test_utils.rs
+++ b/datafusion/core/src/physical_plan/joins/test_utils.rs
@@ -37,7 +37,7 @@ use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::intervals::test_utils::{
gen_conjunctive_numerical_expr, gen_conjunctive_temporal_expr,
};
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
+use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use rand::prelude::StdRng;
use rand::{Rng, SeedableRng};
use std::sync::Arc;
@@ -489,25 +489,21 @@ pub fn build_sides_record_batches(
pub fn create_memory_table(
left_batch: RecordBatch,
right_batch: RecordBatch,
- left_sorted: Option<Vec<PhysicalSortExpr>>,
- right_sorted: Option<Vec<PhysicalSortExpr>>,
+ left_sorted: Vec<LexOrdering>,
+ right_sorted: Vec<LexOrdering>,
batch_size: usize,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
- let mut left = MemoryExec::try_new(
+ let left = MemoryExec::try_new(
&[split_record_batches(&left_batch, batch_size)?],
left_batch.schema(),
None,
- )?;
- if let Some(sorted) = left_sorted {
- left = left.with_sort_information(sorted);
- }
- let mut right = MemoryExec::try_new(
+ )?
+ .with_sort_information(left_sorted);
+ let right = MemoryExec::try_new(
&[split_record_batches(&right_batch, batch_size)?],
right_batch.schema(),
None,
- )?;
- if let Some(sorted) = right_sorted {
- right = right.with_sort_information(sorted);
- }
+ )?
+ .with_sort_information(right_sorted);
Ok((Arc::new(left), Arc::new(right)))
}
diff --git a/datafusion/core/src/physical_plan/memory.rs
b/datafusion/core/src/physical_plan/memory.rs
index afc63e7d4c..be8385b586 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -30,8 +30,10 @@ use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
+use crate::physical_plan::ordering_equivalence_properties_helper;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
+use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
use futures::Stream;
/// Execution plan for reading in-memory batches of data
@@ -44,8 +46,8 @@ pub struct MemoryExec {
projected_schema: SchemaRef,
/// Optional projection
projection: Option<Vec<usize>>,
- // Optional sort information
- sort_information: Option<Vec<PhysicalSortExpr>>,
+ // Sort information: one or more equivalent orderings
+ sort_information: Vec<LexOrdering>,
}
impl fmt::Debug for MemoryExec {
@@ -53,7 +55,7 @@ impl fmt::Debug for MemoryExec {
write!(f, "partitions: [...]")?;
write!(f, "schema: {:?}", self.projected_schema)?;
write!(f, "projection: {:?}", self.projection)?;
- if let Some(sort_info) = &self.sort_information {
+ if let Some(sort_info) = &self.sort_information.get(0) {
write!(f, ", output_ordering: {:?}", sort_info)?;
}
Ok(())
@@ -73,7 +75,7 @@ impl DisplayAs for MemoryExec {
let output_ordering = self
.sort_information
- .as_ref()
+ .first()
.map(|output_ordering| {
let order_strings: Vec<_> =
output_ordering.iter().map(|e|
e.to_string()).collect();
@@ -113,7 +115,13 @@ impl ExecutionPlan for MemoryExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.sort_information.as_deref()
+ self.sort_information
+ .first()
+ .map(|ordering| ordering.as_slice())
+ }
+
+ fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties
{
+ ordering_equivalence_properties_helper(self.schema(),
&self.sort_information)
}
fn with_new_children(
@@ -161,16 +169,30 @@ impl MemoryExec {
schema,
projected_schema,
projection,
- sort_information: None,
+ sort_information: vec![],
})
}
- /// Set sort information
- pub fn with_sort_information(
- mut self,
- sort_information: Vec<PhysicalSortExpr>,
- ) -> Self {
- self.sort_information = Some(sort_information);
+ /// A memory table can be ordered by multiple expressions simultaneously.
+ /// `OrderingEquivalenceProperties` keeps track of expressions that
describe the
+ /// global ordering of the schema. These columns are not necessarily same;
e.g.
+ /// ```text
+ /// ┌-------┐
+ /// | a | b |
+ /// |---|---|
+ /// | 1 | 9 |
+ /// | 2 | 8 |
+ /// | 3 | 7 |
+ /// | 5 | 5 |
+ /// └---┴---┘
+ /// ```
+ /// where both `a ASC` and `b DESC` can describe the table ordering. With
+ /// `OrderingEquivalenceProperties`, we can keep track of these
equivalences
+ /// and treat `a ASC` and `b DESC` as the same ordering requirement
+ /// by outputting the `a ASC` from output_ordering API
+ /// and add `b DESC` into `OrderingEquivalenceProperties`
+ pub fn with_sort_information(mut self, sort_information: Vec<LexOrdering>)
-> Self {
+ self.sort_information = sort_information;
self
}
}
@@ -237,3 +259,48 @@ impl RecordBatchStream for MemoryStream {
self.schema.clone()
}
}
+
+#[cfg(test)]
+mod tests {
+ use crate::physical_plan::memory::MemoryExec;
+ use crate::physical_plan::ExecutionPlan;
+ use arrow_schema::{DataType, Field, Schema, SortOptions};
+ use datafusion_physical_expr::expressions::col;
+ use datafusion_physical_expr::PhysicalSortExpr;
+ use std::sync::Arc;
+
+ #[test]
+ fn test_memory_order_eq() -> datafusion_common::Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int64, false),
+ Field::new("c", DataType::Int64, false),
+ ]));
+ let expected_output_order = vec![
+ PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: SortOptions::default(),
+ },
+ PhysicalSortExpr {
+ expr: col("b", &schema)?,
+ options: SortOptions::default(),
+ },
+ ];
+ let expected_order_eq = vec![PhysicalSortExpr {
+ expr: col("c", &schema)?,
+ options: SortOptions::default(),
+ }];
+ let sort_information =
+ vec![expected_output_order.clone(), expected_order_eq.clone()];
+ let mem_exec = MemoryExec::try_new(&[vec![]], schema, None)?
+ .with_sort_information(sort_information);
+
+ assert_eq!(mem_exec.output_ordering().unwrap(), expected_output_order);
+ let order_eq = mem_exec.ordering_equivalence_properties();
+ assert!(order_eq
+ .classes()
+ .iter()
+ .any(|class| class.contains(&expected_order_eq)));
+ Ok(())
+ }
+}
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index e27c133412..bdca1f9068 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -94,7 +94,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>,
group_by_columns: Vec<&str
let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
- .with_sort_information(sort_keys),
+ .with_sort_information(vec![sort_keys]),
);
let aggregate_expr = vec![Arc::new(Sum::new(
diff --git
a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
index 6b3a633f3c..6304e01c63 100644
--- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs
@@ -107,7 +107,7 @@ mod sp_repartition_fuzz_tests {
let running_source = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
- .with_sort_information(sort_keys.clone()),
+ .with_sort_information(vec![sort_keys.clone()]),
);
let hash_exprs = vec![col("c", &schema).unwrap()];
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 870baf948b..c28b3440d7 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -432,7 +432,7 @@ async fn run_window_test(
];
let memory_exec =
MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(),
None).unwrap();
- let memory_exec =
memory_exec.with_sort_information(source_sort_keys.clone());
+ let memory_exec =
memory_exec.with_sort_information(vec![source_sort_keys.clone()]);
let mut exec1 = Arc::new(memory_exec) as Arc<dyn ExecutionPlan>;
// Table is ordered according to ORDER BY a, b, c In linear test we use
PARTITION BY b, ORDER BY a
// For WindowAggExec to produce correct result it need table to be
ordered by b,a. Hence add a sort.
@@ -460,7 +460,7 @@ async fn run_window_test(
let exec2 = Arc::new(
MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
.unwrap()
- .with_sort_information(source_sort_keys.clone()),
+ .with_sort_information(vec![source_sort_keys.clone()]),
);
let running_window_exec = Arc::new(
BoundedWindowAggExec::try_new(
diff --git a/datafusion/core/tests/memory_limit.rs
b/datafusion/core/tests/memory_limit.rs
index 862f5a8f97..99e6aa5f9f 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -27,7 +27,7 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::streaming::PartitionStream;
use datafusion_expr::{Expr, TableType};
-use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use futures::StreamExt;
use std::any::Any;
use std::sync::{Arc, OnceLock};
@@ -515,7 +515,7 @@ impl Scenario {
descending: false,
nulls_first: false,
};
- let sort_information = vec![
+ let sort_information = vec![vec![
PhysicalSortExpr {
expr: col("a", &schema).unwrap(),
options,
@@ -524,7 +524,7 @@ impl Scenario {
expr: col("b", &schema).unwrap(),
options,
},
- ];
+ ]];
let table = SortedTableProvider::new(batches,
sort_information);
Arc::new(table)
@@ -637,14 +637,11 @@ impl PartitionStream for DummyStreamPartition {
struct SortedTableProvider {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
- sort_information: Vec<PhysicalSortExpr>,
+ sort_information: Vec<LexOrdering>,
}
impl SortedTableProvider {
- fn new(
- batches: Vec<Vec<RecordBatch>>,
- sort_information: Vec<PhysicalSortExpr>,
- ) -> Self {
+ fn new(batches: Vec<Vec<RecordBatch>>, sort_information: Vec<LexOrdering>)
-> Self {
let schema = batches[0][0].schema();
Self {
schema,