alamb commented on a change in pull request #463:
URL: https://github.com/apache/arrow-datafusion/pull/463#discussion_r644310785



##########
File path: datafusion/src/sql/planner.rs
##########
@@ -670,13 +670,28 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
     /// Wrap a plan in a window
     fn window(
         &self,
-        input: &LogicalPlan,
+        input: LogicalPlan,
         window_exprs: Vec<Expr>,
         select_exprs: &[Expr],
     ) -> Result<(LogicalPlan, Vec<Expr>)> {
-        let plan = LogicalPlanBuilder::from(input)
-            .window(window_exprs.clone())?
-            .build()?;
+        let mut plan = input;
+        let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
+        // sort by sort_key len descending, so that more deeply sorted plans 
gets nested further
+        // down as children; to further minic the behavior of PostgreSQL, we 
want stable sort

Review comment:
       ```suggestion
           // down as children; to further mimic the behavior of PostgreSQL, we 
want stable sort
   ```

##########
File path: datafusion/src/optimizer/utils.rs
##########
@@ -338,10 +337,24 @@ pub fn rewrite_expression(expr: &Expr, expressions: 
&[Expr]) -> Result<Expr> {
             fun: fun.clone(),
             args: expressions.to_vec(),
         }),
-        Expr::WindowFunction { fun, .. } => Ok(Expr::WindowFunction {
-            fun: fun.clone(),
-            args: expressions.to_vec(),
-        }),
+        Expr::WindowFunction { fun, .. } => {

Review comment:
       this pattern is kind of ugly -- at some point I would love to rewrite 
all this in terms of the expression visitors. Some day (TM) lol

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -745,13 +745,18 @@ impl DefaultPhysicalPlanner {
         };
 
         match e {
-            Expr::WindowFunction { fun, args } => {
+            Expr::WindowFunction { fun, args, .. } => {
                 let args = args
                     .iter()
                     .map(|e| {
                         self.create_physical_expr(e, physical_input_schema, 
ctx_state)
                     })
                     .collect::<Result<Vec<_>>>()?;
+                // if !order_by.is_empty() {

Review comment:
       Why is this commented out? It seems a better idea to generate an error 
than to silently error out to me

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result

Review comment:
       I don't think it really matters, but the `max`, `min` and `avg` window 
functions don't actually depend on order (and so in theory all of the sorts 
here could be optimized away. 

##########
File path: datafusion/src/sql/utils.rs
##########
@@ -389,3 +407,192 @@ pub(crate) fn resolve_aliases_to_exprs(
         _ => Ok(None),
     })
 }
+
+/// group a slice of window expression expr by their order by expressions
+pub(crate) fn group_window_expr_by_sort_keys(
+    window_expr: &[Expr],
+) -> Result<Vec<(&[Expr], Vec<&Expr>)>> {
+    let mut result = vec![];
+    window_expr.iter().try_for_each(|expr| match expr {
+        Expr::WindowFunction { order_by, .. } => {
+            if let Some((_, values)) = result.iter_mut().find(
+                |group: &&mut (&[Expr], Vec<&Expr>)| matches!(group, (key, _) 
if key == order_by),
+            ) {
+                values.push(expr);
+            } else {
+                result.push((order_by, vec![expr]))
+            }
+            Ok(())
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Impossibly got non-window expr {:?}",
+            other,
+        ))),
+    })?;
+    Ok(result)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logical_plan::col;
+    use crate::physical_plan::aggregates::AggregateFunction;
+    use crate::physical_plan::window_functions::WindowFunction;
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
+        let result = group_window_expr_by_sort_keys(&[])?;
+        let expected: Vec<(&[Expr], Vec<&Expr>)> = vec![];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys_empty_window() -> Result<()> {
+        let max1 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let max2 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let min3 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let sum4 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            args: vec![col("age")],
+            order_by: vec![],
+        };
+        // FIXME use as_ref
+        let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
+        let result = group_window_expr_by_sort_keys(exprs)?;
+        let key = &[];
+        let expected: Vec<(&[Expr], Vec<&Expr>)> =
+            vec![(key, vec![&max1, &max2, &min3, &sum4])];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys() -> Result<()> {
+        let age_asc = Expr::Sort {
+            expr: Box::new(col("age")),
+            asc: true,
+            nulls_first: true,
+        };
+        let name_desc = Expr::Sort {
+            expr: Box::new(col("name")),
+            asc: false,
+            nulls_first: true,
+        };
+        let created_at_desc = Expr::Sort {
+            expr: Box::new(col("created_at")),
+            asc: false,
+            nulls_first: true,
+        };
+        let max1 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![age_asc.clone(), name_desc.clone()],
+        };
+        let max2 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let min3 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
+            args: vec![col("name")],
+            order_by: vec![age_asc.clone(), name_desc.clone()],
+        };
+        let sum4 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            args: vec![col("age")],
+            order_by: vec![name_desc.clone(), age_asc.clone(), 
created_at_desc.clone()],
+        };
+        // FIXME use as_ref
+        let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
+        let result = group_window_expr_by_sort_keys(exprs)?;
+
+        let key1 = &[age_asc.clone(), name_desc.clone()];
+        let key2 = &[];
+        let key3 = &[name_desc, age_asc, created_at_desc];
+
+        let expected: Vec<(&[Expr], Vec<&Expr>)> = vec![
+            (key1, vec![&max1, &min3]),
+            (key2, vec![&max2]),
+            (key3, vec![&sum4]),
+        ];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_find_sort_exprs() -> Result<()> {
+        let exprs = &[
+            Expr::WindowFunction {
+                fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+                args: vec![col("name")],
+                order_by: vec![
+                    Expr::Sort {

Review comment:
       FYI you can use the `sort` method here for less verbosity if you want: 
https://docs.rs/datafusion/4.0.0/datafusion/logical_plan/enum.Expr.html#method.sort
   
   So something like `order_by: vec![col("age").sort(true, true)]`
   
   
   
   let sort_expr = col("foo").sort(true, true); // SORT ASC NULLS_FIRST
   

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=137.16..154.66 rows=1000 width=12)
+    /// ->  Sort  (cost=137.16..139.66 rows=1000 width=12)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=69.83..87.33 rows=1000 width=12)
+    ///             ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id DESC
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY order_id DESC) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id DESC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
-----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=142.16..162.16 rows=1000 width=16)
+    ///   ->  Sort  (cost=142.16..144.66 rows=1000 width=16)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=72.33..92.33 rows=1000 width=16)
+    ///               ->  Sort  (cost=72.33..74.83 rows=1000 width=12)
+    ///                     Sort Key: ((order_id + 1))
+    ///                     ->  Seq Scan on orders  (cost=0.00..22.50 
rows=1000 width=12)
+    /// ```
+    #[test]
+    fn over_order_by_two_sort_keys() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY (order_id + 1)) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id Plus Int64(1) ASC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                        QUERY PLAN
+    /// 
----------------------------------------------------------------------------------------
+    /// WindowAgg  (cost=139.66..172.16 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=139.66..159.66 rows=1000 width=16)
+    ///         ->  Sort  (cost=139.66..142.16 rows=1000 width=12)
+    ///               Sort Key: qty, order_id
+    ///               ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///                     ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                           Sort Key: order_id, qty
+    ///                           ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by_sort_keys_sorting() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY qty, order_id), 
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #SUM(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[SUM(#qty)]] partitionBy=[]\
+        \n    WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n      Sort: #qty ASC NULLS FIRST, #order_id ASC NULLS FIRST\
+        \n        WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n          Sort: #order_id ASC NULLS FIRST, #qty ASC NULLS FIRST\
+        \n            TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=69.83..117.33 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=69.83..104.83 rows=1000 width=16)
+    ///         ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///               ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id, qty
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    ///
+    /// FIXME: for now we are not detecting prefix of sorting keys in order to 
save one sort exec phase

Review comment:
       FWIW I think the FIXME is fine to do later -- let's get the 
functionality (with tests) working first and then we can optimize afterwards.

##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -174,6 +174,12 @@ message WindowExprNode {
     // udaf = 3
   }
   LogicalExprNode expr = 4;
+  // repeated LogicalExprNode partition_by = 5;

Review comment:
       we can probably just delete the old fields in the protobuf files -- I 
suspect no one is using them in a way that requires backwards compatibility

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -1091,10 +1109,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 // then, window function
                 if let Some(window) = &function.over {
-                    if window.partition_by.is_empty()
-                        && window.order_by.is_empty()
-                        && window.window_frame.is_none()
-                    {
+                    if window.partition_by.is_empty() && 
window.window_frame.is_none() {

Review comment:
       I don't understand the check for `partition_by.is_empty()` and 
`window_frame.is_none()` -- wouldn't we want to aways process the window clause?

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=137.16..154.66 rows=1000 width=12)
+    /// ->  Sort  (cost=137.16..139.66 rows=1000 width=12)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=69.83..87.33 rows=1000 width=12)
+    ///             ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id DESC
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY order_id DESC) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id DESC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result

Review comment:
       Thank you for including the postgres results




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to