This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new b9ef8de DataFrame supports window function (#1167)
b9ef8de is described below
commit b9ef8de9ff0fca7159dc1f901ca8976d332853cb
Author: Carlos <[email protected]>
AuthorDate: Tue Oct 26 11:17:50 2021 +0800
DataFrame supports window function (#1167)
---
datafusion/src/execution/dataframe_impl.rs | 43 ++++++++++++++++++++++++++----
datafusion/src/logical_plan/builder.rs | 25 ++++++++++++++++-
datafusion/src/sql/planner.rs | 29 +++-----------------
3 files changed, 65 insertions(+), 32 deletions(-)
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index 18a558e..a313cc1 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -35,6 +35,7 @@ use crate::arrow::util::pretty;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan,
SendableRecordBatchStream,
};
+use crate::sql::utils::find_window_exprs;
use async_trait::async_trait;
/// Implementation of DataFrame API
@@ -75,10 +76,17 @@ impl DataFrame for DataFrameImpl {
/// Create a projection based on arbitrary expressions
fn select(&self, expr_list: Vec<Expr>) -> Result<Arc<dyn DataFrame>> {
- let plan = LogicalPlanBuilder::from(self.to_logical_plan())
- .project(expr_list)?
- .build()?;
- Ok(Arc::new(DataFrameImpl::new(self.ctx_state.clone(), &plan)))
+ let window_func_exprs = find_window_exprs(&expr_list);
+ let plan = if window_func_exprs.is_empty() {
+ self.to_logical_plan()
+ } else {
+ LogicalPlanBuilder::window_plan(self.to_logical_plan(),
window_func_exprs)?
+ };
+ let project_plan =
LogicalPlanBuilder::from(plan).project(expr_list)?.build()?;
+ Ok(Arc::new(DataFrameImpl::new(
+ self.ctx_state.clone(),
+ &project_plan,
+ )))
}
/// Create a filter based on a predicate expression
@@ -233,7 +241,7 @@ mod tests {
use crate::execution::options::CsvReadOptions;
use crate::logical_plan::*;
use crate::physical_plan::functions::Volatility;
- use crate::physical_plan::ColumnarValue;
+ use crate::physical_plan::{window_functions, ColumnarValue};
use crate::{assert_batches_sorted_eq,
execution::context::ExecutionContext};
use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
use arrow::datatypes::DataType;
@@ -271,6 +279,31 @@ mod tests {
}
#[tokio::test]
+ async fn select_with_window_exprs() -> Result<()> {
+ // build plan using Table API
+ let t = test_table().await?;
+ let first_row = Expr::WindowFunction {
+ fun: window_functions::WindowFunction::BuiltInWindowFunction(
+ window_functions::BuiltInWindowFunction::FirstValue,
+ ),
+ args: vec![col("aggregate_test_100.c1")],
+ partition_by: vec![col("aggregate_test_100.c2")],
+ order_by: vec![],
+ window_frame: None,
+ };
+ let t2 = t.select(vec![col("c1"), first_row])?;
+ let plan = t2.to_logical_plan();
+
+ let sql_plan = create_plan(
+ "select c1, first_value(c1) over (partition by c2) from
aggregate_test_100",
+ )
+ .await?;
+
+ assert_same_plan(&plan, &sql_plan);
+ Ok(())
+ }
+
+ #[tokio::test]
async fn aggregate() -> Result<()> {
// build plan using DataFrame API
let df = test_table().await?;
diff --git a/datafusion/src/logical_plan/builder.rs
b/datafusion/src/logical_plan/builder.rs
index 3c6c444..09c3a14 100644
--- a/datafusion/src/logical_plan/builder.rs
+++ b/datafusion/src/logical_plan/builder.rs
@@ -44,6 +44,7 @@ use crate::logical_plan::{
columnize_expr, normalize_col, normalize_cols, Column, DFField, DFSchema,
DFSchemaRef, Partitioning,
};
+use crate::sql::utils::group_window_expr_by_sort_keys;
/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";
@@ -401,7 +402,29 @@ impl LogicalPlanBuilder {
Ok(Self::from(table_scan))
}
-
+ /// Wrap a plan in a window
+ pub(crate) fn window_plan(
+ input: LogicalPlan,
+ window_exprs: Vec<Expr>,
+ ) -> Result<LogicalPlan> {
+ let mut plan = input;
+ let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
+ // sort by sort_key len descending, so that more deeply sorted plans
gets nested further
+ // down as children; to further mimic the behavior of PostgreSQL, we
want stable sort
+ // and a reverse so that tieing sort keys are reversed in order; note
that by this rule
+ // if there's an empty over, it'll be at the top level
+ groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
+ groups.reverse();
+ for (_, exprs) in groups {
+ let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
+ // the partition and sort itself is done at physical level, see
physical_planner's
+ // fn create_initial_plan
+ plan = LogicalPlanBuilder::from(plan)
+ .window(window_exprs)?
+ .build()?;
+ }
+ Ok(plan)
+ }
/// Apply a projection without alias.
pub fn project(&self, expr: impl IntoIterator<Item = Expr>) ->
Result<Self> {
self.project_with_alias(expr, None)
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index 73fb681..7bdb7b8 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -59,9 +59,8 @@ use super::{
parser::DFParser,
utils::{
can_columns_satisfy_exprs, expr_as_column_expr, extract_aliases,
- find_aggregate_exprs, find_column_exprs, find_window_exprs,
- group_window_expr_by_sort_keys, rebase_expr, resolve_aliases_to_exprs,
- resolve_positions_to_exprs,
+ find_aggregate_exprs, find_column_exprs, find_window_exprs,
rebase_expr,
+ resolve_aliases_to_exprs, resolve_positions_to_exprs,
},
};
use crate::logical_plan::builder::project_with_alias;
@@ -792,7 +791,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let plan = if window_func_exprs.is_empty() {
plan
} else {
- self.window(plan, window_func_exprs)?
+ LogicalPlanBuilder::window_plan(plan, window_func_exprs)?
};
let plan = if select.distinct {
@@ -839,28 +838,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
LogicalPlanBuilder::from(input).project(expr)?.build()
}
- /// Wrap a plan in a window
- fn window(&self, input: LogicalPlan, window_exprs: Vec<Expr>) ->
Result<LogicalPlan> {
- let mut plan = input;
- let mut groups = group_window_expr_by_sort_keys(&window_exprs)?;
- // sort by sort_key len descending, so that more deeply sorted plans
gets nested further
- // down as children; to further mimic the behavior of PostgreSQL, we
want stable sort
- // and a reverse so that tieing sort keys are reversed in order; note
that by this rule
- // if there's an empty over, it'll be at the top level
- groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
- groups.reverse();
- for (_, exprs) in groups {
- let window_exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
- // the partition and sort itself is done at physical level, see
physical_planner's
- // fn create_initial_plan
- plan = LogicalPlanBuilder::from(plan)
- .window(window_exprs)?
- .build()?;
- }
-
- Ok(plan)
- }
-
/// Wrap a plan in an aggregate
fn aggregate(
&self,