mustafasrepo commented on code in PR #4928:
URL: https://github.com/apache/arrow-datafusion/pull/4928#discussion_r1071825011
##########
datafusion/core/src/physical_optimizer/sort_enforcement.rs:
##########
@@ -589,71 +589,84 @@ mod tests {
Ok(())
}
+ /// Runs the sort enforcement optimizer and asserts the plan
+ /// against the original and expected plans
+ ///
+ /// `$EXPECTED_PLAN_LINES`: input plan
+ /// `$EXPECTED_OPTIMIZED_PLAN_LINES`: optimized plan
+ /// `$PLAN`: the plan to optimized
+ ///
+ macro_rules! assert_optimized {
+ ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr,
$PLAN: expr) => {
+ let session_ctx = SessionContext::new();
+ let state = session_ctx.state();
+
+ let physical_plan = $PLAN;
+ let formatted =
displayable(physical_plan.as_ref()).indent().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+
+ let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
+ .iter().map(|s| *s).collect();
+
+ assert_eq!(
+ expected_plan_lines, actual,
+ "\n**Original Plan
Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let expected_optimized_lines: Vec<&str> =
$EXPECTED_OPTIMIZED_PLAN_LINES
+ .iter().map(|s| *s).collect();
+
+ // Run the actual optimizer
+ let optimized_physical_plan =
+ EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
+
+ let formatted = displayable(optimized_physical_plan.as_ref())
+ .indent()
+ .to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected_optimized_lines, actual,
+ "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ };
+ }
+
#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
- let session_ctx = SessionContext::new();
- let state = session_ctx.state();
let schema = create_test_schema()?;
- let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
- as Arc<dyn ExecutionPlan>;
- let sort_exprs = vec![PhysicalSortExpr {
- expr: col("non_nullable_col", schema.as_ref()).unwrap(),
- options: SortOptions::default(),
- }];
- let sort_exec = Arc::new(SortExec::try_new(sort_exprs, source, None)?)
- as Arc<dyn ExecutionPlan>;
- let sort_exprs = vec![PhysicalSortExpr {
- expr: col("nullable_col", schema.as_ref()).unwrap(),
- options: SortOptions::default(),
- }];
- let physical_plan = Arc::new(SortExec::try_new(sort_exprs, sort_exec,
None)?)
- as Arc<dyn ExecutionPlan>;
- let formatted =
displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "SortExec: [nullable_col@0 ASC]",
- " SortExec: [non_nullable_col@1 ASC]",
- ]
- };
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- let optimized_physical_plan =
- EnforceSorting::new().optimize(physical_plan,
state.config_options())?;
- let formatted = displayable(optimized_physical_plan.as_ref())
- .indent()
- .to_string();
- let expected = { vec!["SortExec: [nullable_col@0 ASC]"] };
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
+ let source = memory_exec(&schema);
+ let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)],
source);
+ let physical_plan = sort_exec(vec![sort_expr("nullable_col",
&schema)], input);
+
+ let expected_input = vec![
+ "SortExec: [nullable_col@0 ASC]",
+ " SortExec: [non_nullable_col@1 ASC]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ let expected_optimized = vec![
+ "SortExec: [nullable_col@0 ASC]",
+ " MemoryExec: partitions=0, partition_sizes=[]",
+ ];
+ assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}
#[tokio::test]
async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
- let session_ctx = SessionContext::new();
- let state = session_ctx.state();
let schema = create_test_schema()?;
- let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
- as Arc<dyn ExecutionPlan>;
- let sort_exprs = vec![PhysicalSortExpr {
- expr: col("non_nullable_col", source.schema().as_ref()).unwrap(),
- options: SortOptions {
+ let source = memory_exec(&schema);
+
+ let sort_exprs = vec![sort_expr_options(
+ "non_nullable_col",
+ &source.schema(),
+ SortOptions {
descending: true,
nulls_first: true,
},
- }];
- let sort_exec = Arc::new(SortExec::try_new(sort_exprs.clone(), source,
None)?)
- as Arc<dyn ExecutionPlan>;
+ )];
+ let sort = sort_exec(sort_exprs.clone(), source);
+
let window_agg_exec = Arc::new(WindowAggExec::try_new(
Review Comment:
given util function `window_exec` is available, we can use below snippet to
create `window_agg_exec`
```rust
let window_agg_exec =
window_exec(sort.clone(), sort.schema(), &sort_exprs,
"non_nullable_col")?;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]