alamb commented on a change in pull request #501:
URL: https://github.com/apache/arrow-datafusion/pull/501#discussion_r647810344



##########
File path: datafusion/src/optimizer/utils.rs
##########
@@ -340,7 +348,19 @@ pub fn rewrite_expression(expr: &Expr, expressions: 
&[Expr]) -> Result<Expr> {
         Expr::WindowFunction {
             fun, window_frame, ..
         } => {
-            let index = expressions
+            let partition_index = expressions
+                .iter()
+                .position(|expr| {
+                    matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
+            if str == WINDOW_PARTITION_MARKER)
+                })
+                .ok_or_else(|| {
+                    DataFusionError::Internal(
+                        "Ill-formed window function expressions".to_owned(),

Review comment:
       ```suggestion
                           "Ill-formed window function expressions: unexpected 
marker".to_owned(),
   ```

##########
File path: datafusion/src/optimizer/utils.rs
##########
@@ -351,12 +371,20 @@ pub fn rewrite_expression(expr: &Expr, expressions: 
&[Expr]) -> Result<Expr> {
                         "Ill-formed window function expressions".to_owned(),
                     )
                 })?;
-            Ok(Expr::WindowFunction {
-                fun: fun.clone(),
-                args: expressions[..index].to_vec(),
-                order_by: expressions[index + 1..].to_vec(),
-                window_frame: *window_frame,
-            })
+
+            if partition_index >= sort_index {
+                Err(DataFusionError::Internal(
+                    "Ill-formed window function expressions".to_owned(),

Review comment:
       ```suggestion
                       "Ill-formed window function expressions: partition index 
too large".to_owned(),
   ```

##########
File path: datafusion/src/optimizer/utils.rs
##########
@@ -340,7 +348,19 @@ pub fn rewrite_expression(expr: &Expr, expressions: 
&[Expr]) -> Result<Expr> {
         Expr::WindowFunction {
             fun, window_frame, ..
         } => {
-            let index = expressions
+            let partition_index = expressions
+                .iter()
+                .position(|expr| {
+                    matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(str)))
+            if str == WINDOW_PARTITION_MARKER)
+                })
+                .ok_or_else(|| {
+                    DataFusionError::Internal(
+                        "Ill-formed window function expressions".to_owned(),

Review comment:
       I suggest making each error message specific so if we encounter any of 
them we know what call site it comes from. 

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -1121,52 +1121,53 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
                 // then, window function
                 if let Some(window) = &function.over {
-                    if window.partition_by.is_empty() {
-                        let order_by = window
-                            .order_by
-                            .iter()
-                            .map(|e| self.order_by_to_sort_expr(e))
-                            .into_iter()
-                            .collect::<Result<Vec<_>>>()?;
-                        let window_frame = window
-                            .window_frame
-                            .as_ref()
-                            .map(|window_frame| 
window_frame.clone().try_into())
-                            .transpose()?;
-                        let fun = 
window_functions::WindowFunction::from_str(&name);
-                        if let 
Ok(window_functions::WindowFunction::AggregateFunction(
-                            aggregate_fun,
-                        )) = fun
-                        {
-                            return Ok(Expr::WindowFunction {
-                                fun: 
window_functions::WindowFunction::AggregateFunction(
-                                    aggregate_fun.clone(),
-                                ),
-                                args: self
-                                    .aggregate_fn_to_expr(&aggregate_fun, 
function)?,
-                                order_by,
-                                window_frame,
-                            });
-                        } else if let Ok(
-                            
window_functions::WindowFunction::BuiltInWindowFunction(
+                    let partition_by = window
+                        .partition_by
+                        .iter()
+                        .map(|e| self.sql_expr_to_logical_expr(e))
+                        .into_iter()
+                        .collect::<Result<Vec<_>>>()?;
+                    let order_by = window
+                        .order_by
+                        .iter()
+                        .map(|e| self.order_by_to_sort_expr(e))
+                        .into_iter()
+                        .collect::<Result<Vec<_>>>()?;
+                    let window_frame = window
+                        .window_frame
+                        .as_ref()
+                        .map(|window_frame| window_frame.clone().try_into())
+                        .transpose()?;
+                    let fun = 
window_functions::WindowFunction::from_str(&name);
+                    if let 
Ok(window_functions::WindowFunction::AggregateFunction(

Review comment:
       I may be misreading this, but it looks like the error message may have 
gotten lost.
   
   Stylistically, this might be cleaner using a `match` rather than an 
`if/else` chain (and the compiler will tell you if you missed a case) something 
like
   
   ```rust
   let fun = window_functions::WindowFunction::from_str(&name)?; // note 
question mark
   use window_functions::WindowFunction::*;
   
   match fun {
     AggregateFunction(aggregate_fun) =>{ .. code .. },
     BuiltInWindowFunction(window_fun) =>{ .. code .. },
   }
   ```
   
   Perhaps?

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -2772,20 +2773,29 @@ mod tests {
             "SELECT order_id, MAX(qty) OVER (), min(qty) over (), aVg(qty) 
OVER () from orders";
         let expected = "\
         Projection: #order_id, #MAX(qty), #MIN(qty), #AVG(qty)\
-        \n  WindowAggr: windowExpr=[[MAX(#qty), MIN(#qty), AVG(#qty)]] 
partitionBy=[]\
+        \n  WindowAggr: windowExpr=[[MAX(#qty), MIN(#qty), AVG(#qty)]]\
         \n    TableScan: orders projection=None";
         quick_test(sql, expected);
     }
 
+    /// psql result
+    /// ```
+    ///                               QUERY PLAN
+    /// ----------------------------------------------------------------------
+    /// WindowAgg  (cost=69.83..87.33 rows=1000 width=8)
+    ///   ->  Sort  (cost=69.83..72.33 rows=1000 width=8)
+    ///         Sort Key: order_id
+    ///         ->  Seq Scan on orders  (cost=0.00..20.00 rows=1000 width=8)
+    /// ```
     #[test]
-    fn over_partition_by_not_supported() {
-        let sql =
-            "SELECT order_id, MAX(delivered) OVER (PARTITION BY order_id) from 
orders";
-        let err = logical_plan(sql).expect_err("query should have failed");
-        assert_eq!(
-            "NotImplemented(\"Unsupported OVER clause (PARTITION BY 
order_id)\")",
-            format!("{:?}", err)
-        );
+    fn over_partition_by() {
+        let sql = "SELECT order_id, MAX(qty) OVER (PARTITION BY order_id) from 
orders";

Review comment:
       I think including the `PARTITION BY` information somewhere in this plan 
would be valuable -- maybe it could be added to `WindowExpr` formatting? 
   
   I may be missing a reason to not include it as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to