Jimexist commented on a change in pull request #463:
URL: https://github.com/apache/arrow-datafusion/pull/463#discussion_r644502895



##########
File path: datafusion/src/sql/utils.rs
##########
@@ -389,3 +407,192 @@ pub(crate) fn resolve_aliases_to_exprs(
         _ => Ok(None),
     })
 }
+
+/// group a slice of window expression expr by their order by expressions
+pub(crate) fn group_window_expr_by_sort_keys(
+    window_expr: &[Expr],
+) -> Result<Vec<(&[Expr], Vec<&Expr>)>> {
+    let mut result = vec![];
+    window_expr.iter().try_for_each(|expr| match expr {
+        Expr::WindowFunction { order_by, .. } => {
+            if let Some((_, values)) = result.iter_mut().find(
+                |group: &&mut (&[Expr], Vec<&Expr>)| matches!(group, (key, _) 
if key == order_by),
+            ) {
+                values.push(expr);
+            } else {
+                result.push((order_by, vec![expr]))
+            }
+            Ok(())
+        }
+        other => Err(DataFusionError::Internal(format!(
+            "Impossibly got non-window expr {:?}",
+            other,
+        ))),
+    })?;
+    Ok(result)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::logical_plan::col;
+    use crate::physical_plan::aggregates::AggregateFunction;
+    use crate::physical_plan::window_functions::WindowFunction;
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys_empty_case() -> Result<()> {
+        let result = group_window_expr_by_sort_keys(&[])?;
+        let expected: Vec<(&[Expr], Vec<&Expr>)> = vec![];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys_empty_window() -> Result<()> {
+        let max1 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let max2 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let min3 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let sum4 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            args: vec![col("age")],
+            order_by: vec![],
+        };
+        // FIXME use as_ref
+        let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
+        let result = group_window_expr_by_sort_keys(exprs)?;
+        let key = &[];
+        let expected: Vec<(&[Expr], Vec<&Expr>)> =
+            vec![(key, vec![&max1, &max2, &min3, &sum4])];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_group_window_expr_by_sort_keys() -> Result<()> {
+        let age_asc = Expr::Sort {
+            expr: Box::new(col("age")),
+            asc: true,
+            nulls_first: true,
+        };
+        let name_desc = Expr::Sort {
+            expr: Box::new(col("name")),
+            asc: false,
+            nulls_first: true,
+        };
+        let created_at_desc = Expr::Sort {
+            expr: Box::new(col("created_at")),
+            asc: false,
+            nulls_first: true,
+        };
+        let max1 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![age_asc.clone(), name_desc.clone()],
+        };
+        let max2 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+            args: vec![col("name")],
+            order_by: vec![],
+        };
+        let min3 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Min),
+            args: vec![col("name")],
+            order_by: vec![age_asc.clone(), name_desc.clone()],
+        };
+        let sum4 = Expr::WindowFunction {
+            fun: WindowFunction::AggregateFunction(AggregateFunction::Sum),
+            args: vec![col("age")],
+            order_by: vec![name_desc.clone(), age_asc.clone(), 
created_at_desc.clone()],
+        };
+        // FIXME use as_ref
+        let exprs = &[max1.clone(), max2.clone(), min3.clone(), sum4.clone()];
+        let result = group_window_expr_by_sort_keys(exprs)?;
+
+        let key1 = &[age_asc.clone(), name_desc.clone()];
+        let key2 = &[];
+        let key3 = &[name_desc, age_asc, created_at_desc];
+
+        let expected: Vec<(&[Expr], Vec<&Expr>)> = vec![
+            (key1, vec![&max1, &min3]),
+            (key2, vec![&max2]),
+            (key3, vec![&sum4]),
+        ];
+        assert_eq!(expected, result);
+        Ok(())
+    }
+
+    #[test]
+    fn test_find_sort_exprs() -> Result<()> {
+        let exprs = &[
+            Expr::WindowFunction {
+                fun: WindowFunction::AggregateFunction(AggregateFunction::Max),
+                args: vec![col("name")],
+                order_by: vec![
+                    Expr::Sort {

Review comment:
       thanks for the reminder - i plan to optimize this in subsequent PRs - as 
there would be more to comp




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to