Jimexist commented on a change in pull request #819:
URL: https://github.com/apache/arrow-datafusion/pull/819#discussion_r683531698



##########
File path: python/src/dataframe.rs
##########
@@ -45,16 +46,111 @@ impl DataFrame {
     }
 }
 
+/// TODO: this functions belongs to datafusion::sql::planner and 
datafusion::sql::utils but right now there are private to the datafusion crate
+pub(crate) fn find_window_exprs(exprs: &[Expr]) -> Vec<Expr> {
+    exprs
+        .into_iter()
+        .map(|e| match e {
+            Expr::WindowFunction { .. } => vec![e.clone()],
+            Expr::Alias(e, _) => find_window_exprs(&vec![*e.clone()]),
+            _ => vec![],
+        })
+        .flatten()
+        .collect::<Vec<_>>()
+}
+
+type WindowSortKey = Vec<Expr>;
+
+/// Generate a sort key for a given window expr's partition_by and order_bu 
expr
+fn generate_sort_key(partition_by: &[Expr], order_by: &[Expr]) -> 
WindowSortKey {
+    let mut sort_key = vec![];
+    partition_by.iter().for_each(|e| {
+        let e = e.clone().sort(true, true);
+        if !sort_key.contains(&e) {
+            sort_key.push(e);
+        }
+    });
+    order_by.iter().for_each(|e| {
+        if !sort_key.contains(e) {
+            sort_key.push(e.clone());
+        }
+    });
+    sort_key
+}
+
+/// group a slice of window expression expr by their order by expressions
+pub fn group_window_expr_by_sort_keys(
+    window_expr: &[Expr],
+) -> Result<Vec<(WindowSortKey, Vec<&Expr>)>, DataFusionError> {
+    let mut result = vec![];
+    window_expr.iter().try_for_each(|expr| match expr {
+        Expr::WindowFunction { partition_by, order_by, .. } => {
+            let sort_key = generate_sort_key(partition_by, order_by);
+            if let Some((_, values)) = result.iter_mut().find(
+                |group: &&mut (WindowSortKey, Vec<&Expr>)| matches!(group, 
(key, _) if *key == sort_key),
+            ) {
+                values.push(expr);
+            } else {
+                result.push((sort_key, vec![expr]))
+            }
+            Ok(())
+        }
+        other => Err(DataFusionError::Common(format!(
+            "Impossibly got non-window expr {:?}",
+            other,
+        ))),
+    })?;
+    Ok(result)
+}
+
+fn wrap_window(
+    input: LogicalPlan,
+    window_exprs: Vec<Expr>,
+) -> Result<LogicalPlan, DataFusionError> {
+    let mut plan = input;
+    let mut groups = group_window_expr_by_sort_keys(&window_exprs[..])?;
+    // sort by sort_key len descending, so that more deeply sorted plans gets 
nested further
+    // down as children; to further mimic the behavior of PostgreSQL, we want 
stable sort
+    // and a reverse so that tieing sort keys are reversed in order; note that 
by this rule
+    // if there's an empty over, it'll be at the top level

Review comment:
       i wonder if there can be a way to reuse this part of the code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to