This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new b9ef8de  DataFrame supports window function (#1167)
b9ef8de is described below

commit b9ef8de9ff0fca7159dc1f901ca8976d332853cb
Author: Carlos <[email protected]>
AuthorDate: Tue Oct 26 11:17:50 2021 +0800

    DataFrame supports window function (#1167)
---
 datafusion/src/execution/dataframe_impl.rs | 43 ++++++++++++++++++++++++++----
 datafusion/src/logical_plan/builder.rs     | 25 ++++++++++++++++-
 datafusion/src/sql/planner.rs              | 29 +++-----------------
 3 files changed, 65 insertions(+), 32 deletions(-)

diff --git a/datafusion/src/execution/dataframe_impl.rs 
b/datafusion/src/execution/dataframe_impl.rs
index 18a558e..a313cc1 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -35,6 +35,7 @@ use crate::arrow::util::pretty;
 use crate::physical_plan::{
     execute_stream, execute_stream_partitioned, ExecutionPlan, 
SendableRecordBatchStream,
 };
+use crate::sql::utils::find_window_exprs;
 use async_trait::async_trait;
 
 /// Implementation of DataFrame API
@@ -75,10 +76,17 @@ impl DataFrame for DataFrameImpl {
 
     /// Create a projection based on arbitrary expressions
     fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
-        let plan = LogicalPlanBuilder::from(self.to_logical_plan())
-            .project(expr_list)?
-            .build()?;
-        Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
+        let window_func_exprs = find_window_exprs(&expr_list);
+        let plan = if window_func_exprs.is_empty() {
+            self.to_logical_plan()
+        } else {
+            LogicalPlanBuilder::window_plan(self.to_logical_plan(), 
window_func_exprs)?
+        };
+        let project_plan = 
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+        Ok(Arc::new(DataFrameImpl::new(
+            self.ctx_state.clone(),
+            &project_plan,
+        )))
     }
 
     /// Create a filter based on a predicate expression
@@ -233,7 +241,7 @@ mod tests {
     use crate::execution::options::CsvReadOptions;
     use crate::logical_plan::*;
     use crate::physical_plan::functions::Volatility;
-    use crate::physical_plan::ColumnarValue;
+    use crate::physical_plan::{window_functions, ColumnarValue};
     use crate::{assert_batches_sorted_eq, 
execution::context::ExecutionContext};
     use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
     use arrow::datatypes::DataType;
@@ -271,6 +279,31 @@ mod tests {
     }
 
     #[tokio::test]
+    async fn select_with_window_exprs() -> Result<()> {
+        // build plan using Table API
+        let t = test_table().await?;
+        let first_row = Expr::WindowFunction {
+            fun: window_functions::WindowFunction::BuiltInWindowFunction(
+                window_functions::BuiltInWindowFunction::FirstValue,
+            ),
+            args: vec![col("aggregate_test_100.c1")],
+            partition_by: vec![col("aggregate_test_100.c2")],
+            order_by: vec![],
+            window_frame: None,
+        };
+        let t2 = t.select(vec![col("c1"), first_row])?;
+        let plan = t2.to_logical_plan();
+
+        let sql_plan = create_plan(
+            "select c1, first_value(c1) over (partition by c2) from 
aggregate_test_100",
+        )
+        .await?;
+
+        assert_same_plan(&plan, &sql_plan);
+        Ok(())
+    }
+
+    #[tokio::test]
     async fn aggregate() -> Result<()> {
         // build plan using DataFrame API
         let df = test_table().await?;
diff --git a/datafusion/src/logical_plan/builder.rs 
b/datafusion/src/logical_plan/builder.rs
index 3c6c444..09c3a14 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -44,6 +44,7 @@ use crate::logical_plan::{
     columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
     DFSchemaRef, Partitioning,
 };
+use crate::sql::utils::group_window_expr_by_sort_keys;
 
 /// Default table name for unnamed table
 pub const UNNAMED_TABLE: &str = "?table?";
@@ -401,7 +402,29 @@ impl LogicalPlanBuilder {
 
         Ok(Self::from(table_scan))
     }
-
+    /// Wrap a plan in a window
+    pub(crate) fn window_plan(
+        input: LogicalPlan,
+        window_exprs: Vec<Expr>,
+    ) -> Result<LogicalPlan> {
+        let mut plan = input;
+        let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
+        // sort by sort_key len descending, so that more deeply sorted plans 
gets nested further
+        // down as children; to further mimic the behavior of PostgreSQL, we 
want stable sort
+        // and a reverse so that tieing sort keys are reversed in order; note 
that by this rule
+        // if there's an empty over, it'll be at the top level
+        groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
+        groups.reverse();
+        for (_, exprs) in groups {
+            let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
+            // the partition and sort itself is done at physical level, see 
physical_planner's
+            // fn create_initial_plan
+            plan = LogicalPlanBuilder::from(plan)
+                .window(window_exprs)?
+                .build()?;
+        }
+        Ok(plan)
+    }
     /// Apply a projection without alias.
     pub fn project(&self, expr: impl IntoIterator<Item = Expr>) -> 
Result<Self> {
         self.project_with_alias(expr, None)
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 73fb681..7bdb7b8 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -59,9 +59,8 @@ use super::{
     parser::DFParser,
     utils::{
         can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
-        find_aggregate_exprs, find_column_exprs, find_window_exprs,
-        group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
-        resolve_positions_to_exprs,
+        find_aggregate_exprs, find_column_exprs, find_window_exprs, 
rebase_expr,
+        resolve_aliases_to_exprs, resolve_positions_to_exprs,
     },
 };
 use crate::logical_plan::builder::project_with_alias;
@@ -792,7 +791,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         let plan = if window_func_exprs.is_empty() {
             plan
         } else {
-            self.window(plan, window_func_exprs)?
+            LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
         };
 
         let plan = if select.distinct {
@@ -839,28 +838,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         LogicalPlanBuilder::from(input).project(expr)?.build()
     }
 
-    /// Wrap a plan in a window
-    fn window(&self, input: LogicalPlan, window_exprs: Vec<Expr>) -> 
Result<LogicalPlan> {
-        let mut plan = input;
-        let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
-        // sort by sort_key len descending, so that more deeply sorted plans 
gets nested further
-        // down as children; to further mimic the behavior of PostgreSQL, we 
want stable sort
-        // and a reverse so that tieing sort keys are reversed in order; note 
that by this rule
-        // if there's an empty over, it'll be at the top level
-        groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
-        groups.reverse();
-        for (_, exprs) in groups {
-            let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
-            // the partition and sort itself is done at physical level, see 
physical_planner's
-            // fn create_initial_plan
-            plan = LogicalPlanBuilder::from(plan)
-                .window(window_exprs)?
-                .build()?;
-        }
-
-        Ok(plan)
-    }
-
     /// Wrap a plan in an aggregate
     fn aggregate(
         &self,

Reply via email to