This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ef97b328c [FOLLOWUP] eliminate the duplicated sort keys in Order By
clause (#5607)
ef97b328c is described below
commit ef97b328c58e4f7318f6ed5eded608a8b55c680d
Author: mingmwang <[email protected]>
AuthorDate: Sat Mar 18 19:42:14 2023 +0800
[FOLLOWUP] eliminate the duplicated sort keys in Order By clause (#5607)
* [FOLLOWUP] eliminate the duplicated sort keys in Order By clause
* Update datafusion/optimizer/src/eliminate_duplicated_expr.rs
Co-authored-by: jakevin <[email protected]>
* Update datafusion/optimizer/src/eliminate_duplicated_expr.rs
* fix compile err
* add sql test to cover duplicate sory keys with different options
---------
Co-authored-by: jakevin <[email protected]>
---
datafusion/core/tests/sql/order.rs | 83 ++++++++++++++++++++++
.../optimizer/src/eliminate_duplicated_expr.rs | 47 ++++++++++--
2 files changed, 123 insertions(+), 7 deletions(-)
diff --git a/datafusion/core/tests/sql/order.rs
b/datafusion/core/tests/sql/order.rs
index 2388eebef..e29904c21 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -39,3 +39,86 @@ async fn sort_with_lots_of_repetition_values() -> Result<()>
{
}
Ok(())
}
+
+#[tokio::test]
+async fn sort_with_duplicate_sort_exprs() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let t1_schema = Arc::new(Schema::new(vec![
+ Field::new("id", DataType::Int32, true),
+ Field::new("name", DataType::Utf8, true),
+ ]));
+
+ let t1_data = RecordBatch::try_new(
+ t1_schema.clone(),
+ vec![
+ Arc::new(Int32Array::from(vec![2, 4, 9, 3, 4])),
+ Arc::new(StringArray::from_slice(["a", "b", "c", "d", "e"])),
+ ],
+ )?;
+ ctx.register_batch("t1", t1_data)?;
+
+ let sql = "select * from t1 order by id desc, id, name, id asc";
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let plan = dataframe.into_optimized_plan().unwrap();
+ let expected = vec![
+ "Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST [id:Int32;N,
name:Utf8;N]",
+ " TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
+ ];
+
+ let formatted = plan.display_indent_schema().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let expected = vec![
+ "+----+------+",
+ "| id | name |",
+ "+----+------+",
+ "| 9 | c |",
+ "| 4 | b |",
+ "| 4 | e |",
+ "| 3 | d |",
+ "| 2 | a |",
+ "+----+------+",
+ ];
+
+ let results = execute_to_batches(&ctx, sql).await;
+ assert_batches_eq!(expected, &results);
+
+ let sql = "select * from t1 order by id asc, id, name, id desc;";
+ let msg = format!("Creating logical plan for '{sql}'");
+ let dataframe = ctx.sql(sql).await.expect(&msg);
+ let plan = dataframe.into_optimized_plan().unwrap();
+ let expected = vec![
+ "Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST [id:Int32;N,
name:Utf8;N]",
+ " TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
+ ];
+
+ let formatted = plan.display_indent_schema().to_string();
+ let actual: Vec<&str> = formatted.trim().lines().collect();
+ assert_eq!(
+ expected, actual,
+ "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+
+ let expected = vec![
+ "+----+------+",
+ "| id | name |",
+ "+----+------+",
+ "| 2 | a |",
+ "| 3 | d |",
+ "| 4 | b |",
+ "| 4 | e |",
+ "| 9 | c |",
+ "+----+------+",
+ ];
+
+ let results = execute_to_batches(&ctx, sql).await;
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+}
diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs
b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
index 5a882108e..15f3d8e1d 100644
--- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs
+++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
@@ -18,8 +18,9 @@
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
+use datafusion_expr::expr::Sort as ExprSort;
use datafusion_expr::logical_plan::LogicalPlan;
-use datafusion_expr::Sort;
+use datafusion_expr::{Expr, Sort};
use hashbrown::HashSet;
/// Optimization rule that eliminate duplicated expr.
@@ -41,15 +42,28 @@ impl OptimizerRule for EliminateDuplicatedExpr {
) -> Result<Option<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort) => {
+ let normalized_sort_keys = sort
+ .expr
+ .iter()
+ .map(|e| match e {
+ Expr::Sort(ExprSort { expr, .. }) => {
+ Expr::Sort(ExprSort::new(expr.clone(), true,
false))
+ }
+ _ => e.clone(),
+ })
+ .collect::<Vec<_>>();
+
// dedup sort.expr and keep order
let mut dedup_expr = Vec::new();
let mut dedup_set = HashSet::new();
- for expr in &sort.expr {
- if !dedup_set.contains(expr) {
- dedup_expr.push(expr);
- dedup_set.insert(expr.clone());
- }
- }
+ sort.expr.iter().zip(normalized_sort_keys.iter()).for_each(
+ |(expr, normalized_expr)| {
+ if !dedup_set.contains(normalized_expr) {
+ dedup_expr.push(expr);
+ dedup_set.insert(normalized_expr);
+ }
+ },
+ );
if dedup_expr.len() == sort.expr.len() {
Ok(None)
} else {
@@ -100,4 +114,23 @@ mod tests {
\n TableScan: test";
assert_optimized_plan_eq(&plan, expected)
}
+
+ #[test]
+ fn eliminate_sort_exprs_with_options() -> Result<()> {
+ let table_scan = test_table_scan().unwrap();
+ let sort_exprs = vec![
+ col("a").sort(true, true),
+ col("b").sort(true, false),
+ col("a").sort(false, false),
+ col("b").sort(false, true),
+ ];
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .sort(sort_exprs)?
+ .limit(5, Some(10))?
+ .build()?;
+ let expected = "Limit: skip=5, fetch=10\
+ \n Sort: test.a ASC NULLS FIRST, test.b ASC NULLS LAST\
+ \n TableScan: test";
+ assert_optimized_plan_eq(&plan, expected)
+ }
}