Jimexist commented on a change in pull request #463:
URL: https://github.com/apache/arrow-datafusion/pull/463#discussion_r644406996



##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -174,6 +174,12 @@ message WindowExprNode {
     // udaf = 3
   }
   LogicalExprNode expr = 4;
+  // repeated LogicalExprNode partition_by = 5;

Review comment:
       these commented out ones are not old, they are reminders of future 
fields.

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -745,13 +745,18 @@ impl DefaultPhysicalPlanner {
         };
 
         match e {
-            Expr::WindowFunction { fun, args } => {
+            Expr::WindowFunction { fun, args, .. } => {
                 let args = args
                     .iter()
                     .map(|e| {
                         self.create_physical_expr(e, physical_input_schema, 
ctx_state)
                     })
                     .collect::<Result<Vec<_>>>()?;
+                // if !order_by.is_empty() {

Review comment:
       i would bring them back when it comes to implementing exec plan for 
sort, but maybe later

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -1091,10 +1109,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 // then, window function
                 if let Some(window) = &function.over {
-                    if window.partition_by.is_empty()
-                        && window.order_by.is_empty()
-                        && window.window_frame.is_none()
-                    {
+                    if window.partition_by.is_empty() && 
window.window_frame.is_none() {

Review comment:
       they need to be empty otherwise it goes to the unsupported error clause 
which is needed to guard unintended usage.

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result

Review comment:
       they do. with `order by` they compute accumulative sum/avg/max/min, not 
a full partition one.

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=137.16..154.66 rows=1000 width=12)
+    /// ->  Sort  (cost=137.16..139.66 rows=1000 width=12)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=69.83..87.33 rows=1000 width=12)
+    ///             ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id DESC
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY order_id DESC) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id DESC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
-----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=142.16..162.16 rows=1000 width=16)
+    ///   ->  Sort  (cost=142.16..144.66 rows=1000 width=16)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=72.33..92.33 rows=1000 width=16)
+    ///               ->  Sort  (cost=72.33..74.83 rows=1000 width=12)
+    ///                     Sort Key: ((order_id + 1))
+    ///                     ->  Seq Scan on orders  (cost=0.00..22.50 
rows=1000 width=12)
+    /// ```
+    #[test]
+    fn over_order_by_two_sort_keys() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY (order_id + 1)) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id Plus Int64(1) ASC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                        QUERY PLAN
+    /// 
----------------------------------------------------------------------------------------
+    /// WindowAgg  (cost=139.66..172.16 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=139.66..159.66 rows=1000 width=16)
+    ///         ->  Sort  (cost=139.66..142.16 rows=1000 width=12)
+    ///               Sort Key: qty, order_id
+    ///               ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///                     ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                           Sort Key: order_id, qty
+    ///                           ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by_sort_keys_sorting() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY qty, order_id), 
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #SUM(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[SUM(#qty)]] partitionBy=[]\
+        \n    WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n      Sort: #qty ASC NULLS FIRST, #order_id ASC NULLS FIRST\
+        \n        WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n          Sort: #order_id ASC NULLS FIRST, #qty ASC NULLS FIRST\
+        \n            TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=69.83..117.33 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=69.83..104.83 rows=1000 width=16)
+    ///         ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///               ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id, qty
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    ///
+    /// FIXME: for now we are not detecting prefix of sorting keys in order to 
save one sort exec phase

Review comment:
       lots of room for optimization. there are several considerations when it 
comes to:
   1. how to compute order by and partition by and re-order them to optimize
   2. how to compute window aggregations given a possibility of either a 
ever-growing window or a shifting window (that can shrink and expand, depending 
on # of rows or the absolute values)

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=137.16..154.66 rows=1000 width=12)
+    /// ->  Sort  (cost=137.16..139.66 rows=1000 width=12)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=69.83..87.33 rows=1000 width=12)
+    ///             ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id DESC
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY order_id DESC) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id DESC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
-----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=142.16..162.16 rows=1000 width=16)
+    ///   ->  Sort  (cost=142.16..144.66 rows=1000 width=16)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=72.33..92.33 rows=1000 width=16)
+    ///               ->  Sort  (cost=72.33..74.83 rows=1000 width=12)
+    ///                     Sort Key: ((order_id + 1))
+    ///                     ->  Seq Scan on orders  (cost=0.00..22.50 
rows=1000 width=12)
+    /// ```
+    #[test]
+    fn over_order_by_two_sort_keys() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY (order_id + 1)) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id Plus Int64(1) ASC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                        QUERY PLAN
+    /// 
----------------------------------------------------------------------------------------
+    /// WindowAgg  (cost=139.66..172.16 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=139.66..159.66 rows=1000 width=16)
+    ///         ->  Sort  (cost=139.66..142.16 rows=1000 width=12)
+    ///               Sort Key: qty, order_id
+    ///               ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///                     ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                           Sort Key: order_id, qty
+    ///                           ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by_sort_keys_sorting() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY qty, order_id), 
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #SUM(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[SUM(#qty)]] partitionBy=[]\
+        \n    WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n      Sort: #qty ASC NULLS FIRST, #order_id ASC NULLS FIRST\
+        \n        WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n          Sort: #order_id ASC NULLS FIRST, #qty ASC NULLS FIRST\
+        \n            TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=69.83..117.33 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=69.83..104.83 rows=1000 width=16)
+    ///         ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///               ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id, qty
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    ///
+    /// FIXME: for now we are not detecting prefix of sorting keys in order to 
save one sort exec phase

Review comment:
       for 1, e.g. with `max(a) over (partition by b order by c)` you can 
either:
   1. hash partition by b, merge, and then sort by c
   2. sort by (b, c) so it's easier to implement but you lose parallelism here

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2749,14 +2772,139 @@ mod tests {
         );
     }
 
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=137.16..154.66 rows=1000 width=12)
+    /// ->  Sort  (cost=137.16..139.66 rows=1000 width=12)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=69.83..87.33 rows=1000 width=12)
+    ///             ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id DESC
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY order_id DESC) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id DESC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
-----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=142.16..162.16 rows=1000 width=16)
+    ///   ->  Sort  (cost=142.16..144.66 rows=1000 width=16)
+    ///         Sort Key: order_id
+    ///         ->  WindowAgg  (cost=72.33..92.33 rows=1000 width=16)
+    ///               ->  Sort  (cost=72.33..74.83 rows=1000 width=12)
+    ///                     Sort Key: ((order_id + 1))
+    ///                     ->  Seq Scan on orders  (cost=0.00..22.50 
rows=1000 width=12)
+    /// ```
+    #[test]
+    fn over_order_by_two_sort_keys() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY order_id), 
MIN(qty) OVER (ORDER BY (order_id + 1)) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n    Sort: #order_id ASC NULLS FIRST\
+        \n      WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n        Sort: #order_id Plus Int64(1) ASC NULLS FIRST\
+        \n          TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                        QUERY PLAN
+    /// 
----------------------------------------------------------------------------------------
+    /// WindowAgg  (cost=139.66..172.16 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=139.66..159.66 rows=1000 width=16)
+    ///         ->  Sort  (cost=139.66..142.16 rows=1000 width=12)
+    ///               Sort Key: qty, order_id
+    ///               ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///                     ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                           Sort Key: order_id, qty
+    ///                           ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    #[test]
+    fn over_order_by_sort_keys_sorting() {
+        let sql = "SELECT order_id, MAX(qty) OVER (ORDER BY qty, order_id), 
SUM(qty) OVER (), MIN(qty) OVER (ORDER BY order_id, qty) from orders";
+        let expected = "\
+        Projection: #order_id, #MAX(qty), #SUM(qty), #MIN(qty)\
+        \n  WindowAggr: windowExpr=[[SUM(#qty)]] partitionBy=[]\
+        \n    WindowAggr: windowExpr=[[MAX(#qty)]] partitionBy=[]\
+        \n      Sort: #qty ASC NULLS FIRST, #order_id ASC NULLS FIRST\
+        \n        WindowAggr: windowExpr=[[MIN(#qty)]] partitionBy=[]\
+        \n          Sort: #order_id ASC NULLS FIRST, #qty ASC NULLS FIRST\
+        \n            TableScan: orders projection=None";
+        quick_test(sql, expected);
+    }
+
+    /// psql result
+    /// ```
+    ///                                     QUERY PLAN
+    /// 
----------------------------------------------------------------------------------
+    /// WindowAgg  (cost=69.83..117.33 rows=1000 width=24)
+    ///   ->  WindowAgg  (cost=69.83..104.83 rows=1000 width=16)
+    ///         ->  WindowAgg  (cost=69.83..89.83 rows=1000 width=12)
+    ///               ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///                     Sort Key: order_id, qty
+    ///                     ->  Seq Scan on orders  (cost=0.00..20.00 
rows=1000 width=8)
+    /// ```
+    ///
+    /// FIXME: for now we are not detecting prefix of sorting keys in order to 
save one sort exec phase

Review comment:
       for 2, for ever growing window, accumulative scan can be used, but for 
shrinking or shifting window, vec dequeue can be used, but also there's segment 
tree...

##########
File path: datafusion/src/sql/utils.rs
##########
@@ -389,3 +407,192 @@ pub(crate) fn resolve_aliases_to_exprs(
         _ => Ok(None),
     })
 }
+
+/// group a slice of window expression expr by their order by expressions
+pub(crate) fn group_window_expr_by_sort_keys(
+    window_expr: &[Expr],
+) -> Result<Vec<(&[Expr], Vec<&Expr>)>> {
+    let mut result = vec![];
+    window_expr.iter().try_for_each(|expr| match expr {
+        Expr::WindowFunction { order_by, .. } => {
+            if let Some((_, values)) = result.iter_mut().find(
+                |group: &&mut (&[Expr], Vec<&Expr>)| matches!(group, (key, _) 
if key == order_by),
+            ) {
+                values.push(expr);
+            } else {
+                result.push((order_by, vec![expr]))
+            }
+            Ok(())
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Impossibly got non-window expr {:?}",
+            other,
+        ))),
+    })?;
+    Ok(result)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logical_plan::col;
+    use crate::physical_plan::aggregates::AggregateFunction;
+    use crate::physical_plan::window_functions::WindowFunction;
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
+        let result = group_window_expr_by_sort_keys(&[])?;
+        let expected: Vec<(&[Expr], Vec<&Expr>)> = vec![];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys_empty_window() -> Result<()> {
+        let max1 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let max2 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let min3 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let sum4 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            args: vec![col("age")],
+            order_by: vec![],
+        };
+        // FIXME use as_ref
+        let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
+        let result = group_window_expr_by_sort_keys(exprs)?;
+        let key = &[];
+        let expected: Vec<(&[Expr], Vec<&Expr>)> =
+            vec![(key, vec![&max1, &max2, &min3, &sum4])];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys() -> Result<()> {
+        let age_asc = Expr::Sort {
+            expr: Box::new(col("age")),
+            asc: true,
+            nulls_first: true,
+        };
+        let name_desc = Expr::Sort {
+            expr: Box::new(col("name")),
+            asc: false,
+            nulls_first: true,
+        };
+        let created_at_desc = Expr::Sort {
+            expr: Box::new(col("created_at")),
+            asc: false,
+            nulls_first: true,
+        };
+        let max1 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![age_asc.clone(), name_desc.clone()],
+        };
+        let max2 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let min3 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
+            args: vec![col("name")],
+            order_by: vec![age_asc.clone(), name_desc.clone()],
+        };
+        let sum4 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            args: vec![col("age")],
+            order_by: vec![name_desc.clone(), age_asc.clone(), 
created_at_desc.clone()],
+        };
+        // FIXME use as_ref
+        let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
+        let result = group_window_expr_by_sort_keys(exprs)?;
+
+        let key1 = &[age_asc.clone(), name_desc.clone()];
+        let key2 = &[];
+        let key3 = &[name_desc, age_asc, created_at_desc];
+
+        let expected: Vec<(&[Expr], Vec<&Expr>)> = vec![
+            (key1, vec![&max1, &min3]),
+            (key2, vec![&max2]),
+            (key3, vec![&sum4]),
+        ];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_find_sort_exprs() -> Result<()> {
+        let exprs = &[
+            Expr::WindowFunction {
+                fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+                args: vec![col("name")],
+                order_by: vec![
+                    Expr::Sort {

Review comment:
       thanks for the reminder - i plan to optimize this in subsequent PRs - as 
there would be more to comp




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to