This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ef97b328c [FOLLOWUP] eliminate the duplicated sort keys in Order By 
clause (#5607)
ef97b328c is described below

commit ef97b328c58e4f7318f6ed5eded608a8b55c680d
Author: mingmwang <[email protected]>
AuthorDate: Sat Mar 18 19:42:14 2023 +0800

    [FOLLOWUP] eliminate the duplicated sort keys in Order By clause (#5607)
    
    * [FOLLOWUP] eliminate the duplicated sort keys in Order By clause
    
    * Update datafusion/optimizer/src/eliminate_duplicated_expr.rs
    
    Co-authored-by: jakevin <[email protected]>
    
    * Update datafusion/optimizer/src/eliminate_duplicated_expr.rs
    
    * fix compile err
    
    * add sql test to cover duplicate sory keys with different options
    
    ---------
    
    Co-authored-by: jakevin <[email protected]>
---
 datafusion/core/tests/sql/order.rs                 | 83 ++++++++++++++++++++++
 .../optimizer/src/eliminate_duplicated_expr.rs     | 47 ++++++++++--
 2 files changed, 123 insertions(+), 7 deletions(-)

diff --git a/datafusion/core/tests/sql/order.rs 
b/datafusion/core/tests/sql/order.rs
index 2388eebef..e29904c21 100644
--- a/datafusion/core/tests/sql/order.rs
+++ b/datafusion/core/tests/sql/order.rs
@@ -39,3 +39,86 @@ async fn sort_with_lots_of_repetition_values() -> Result<()> 
{
     }
     Ok(())
 }
+
+#[tokio::test]
+async fn sort_with_duplicate_sort_exprs() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let t1_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, true),
+        Field::new("name", DataType::Utf8, true),
+    ]));
+
+    let t1_data = RecordBatch::try_new(
+        t1_schema.clone(),
+        vec![
+            Arc::new(Int32Array::from(vec![2, 4, 9, 3, 4])),
+            Arc::new(StringArray::from_slice(["a", "b", "c", "d", "e"])),
+        ],
+    )?;
+    ctx.register_batch("t1", t1_data)?;
+
+    let sql = "select * from t1 order by id desc, id, name, id asc";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+    let expected = vec![
+        "Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST [id:Int32;N, 
name:Utf8;N]",
+        "  TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
+    ];
+
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let expected = vec![
+        "+----+------+",
+        "| id | name |",
+        "+----+------+",
+        "| 9  | c    |",
+        "| 4  | b    |",
+        "| 4  | e    |",
+        "| 3  | d    |",
+        "| 2  | a    |",
+        "+----+------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_eq!(expected, &results);
+
+    let sql = "select * from t1 order by id asc, id, name, id desc;";
+    let msg = format!("Creating logical plan for '{sql}'");
+    let dataframe = ctx.sql(sql).await.expect(&msg);
+    let plan = dataframe.into_optimized_plan().unwrap();
+    let expected = vec![
+        "Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST [id:Int32;N, 
name:Utf8;N]",
+        "  TableScan: t1 projection=[id, name] [id:Int32;N, name:Utf8;N]",
+    ];
+
+    let formatted = plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let expected = vec![
+        "+----+------+",
+        "| id | name |",
+        "+----+------+",
+        "| 2  | a    |",
+        "| 3  | d    |",
+        "| 4  | b    |",
+        "| 4  | e    |",
+        "| 9  | c    |",
+        "+----+------+",
+    ];
+
+    let results = execute_to_batches(&ctx, sql).await;
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs 
b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
index 5a882108e..15f3d8e1d 100644
--- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs
+++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs
@@ -18,8 +18,9 @@
 use crate::optimizer::ApplyOrder;
 use crate::{OptimizerConfig, OptimizerRule};
 use datafusion_common::Result;
+use datafusion_expr::expr::Sort as ExprSort;
 use datafusion_expr::logical_plan::LogicalPlan;
-use datafusion_expr::Sort;
+use datafusion_expr::{Expr, Sort};
 use hashbrown::HashSet;
 
 /// Optimization rule that eliminate duplicated expr.
@@ -41,15 +42,28 @@ impl OptimizerRule for EliminateDuplicatedExpr {
     ) -> Result<Option<LogicalPlan>> {
         match plan {
             LogicalPlan::Sort(sort) => {
+                let normalized_sort_keys = sort
+                    .expr
+                    .iter()
+                    .map(|e| match e {
+                        Expr::Sort(ExprSort { expr, .. }) => {
+                            Expr::Sort(ExprSort::new(expr.clone(), true, 
false))
+                        }
+                        _ => e.clone(),
+                    })
+                    .collect::<Vec<_>>();
+
                 // dedup sort.expr and keep order
                 let mut dedup_expr = Vec::new();
                 let mut dedup_set = HashSet::new();
-                for expr in &sort.expr {
-                    if !dedup_set.contains(expr) {
-                        dedup_expr.push(expr);
-                        dedup_set.insert(expr.clone());
-                    }
-                }
+                sort.expr.iter().zip(normalized_sort_keys.iter()).for_each(
+                    |(expr, normalized_expr)| {
+                        if !dedup_set.contains(normalized_expr) {
+                            dedup_expr.push(expr);
+                            dedup_set.insert(normalized_expr);
+                        }
+                    },
+                );
                 if dedup_expr.len() == sort.expr.len() {
                     Ok(None)
                 } else {
@@ -100,4 +114,23 @@ mod tests {
         \n    TableScan: test";
         assert_optimized_plan_eq(&plan, expected)
     }
+
+    #[test]
+    fn eliminate_sort_exprs_with_options() -> Result<()> {
+        let table_scan = test_table_scan().unwrap();
+        let sort_exprs = vec![
+            col("a").sort(true, true),
+            col("b").sort(true, false),
+            col("a").sort(false, false),
+            col("b").sort(false, true),
+        ];
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .sort(sort_exprs)?
+            .limit(5, Some(10))?
+            .build()?;
+        let expected = "Limit: skip=5, fetch=10\
+        \n  Sort: test.a ASC NULLS FIRST, test.b ASC NULLS LAST\
+        \n    TableScan: test";
+        assert_optimized_plan_eq(&plan, expected)
+    }
 }

Reply via email to