This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8199e9e660 Fix window expr deserialization (#10506)
8199e9e660 is described below
commit 8199e9e6601d91320e395b43ba3a005ae7ba4816
Author: 张林伟 <[email protected]>
AuthorDate: Thu May 16 02:53:36 2024 +0800
Fix window expr deserialization (#10506)
* Fix window expr deserialization
* Improve naming and doc
* Update window test
---
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 34 ++--------------------
datafusion/physical-plan/src/windows/mod.rs | 26 +++++++++++++++++
datafusion/proto/src/physical_plan/from_proto.rs | 12 +++++---
.../proto/tests/cases/roundtrip_physical_plan.rs | 3 +-
4 files changed, 38 insertions(+), 37 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 2514324a95..fe0c408dc1 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -22,11 +22,10 @@ use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
-use arrow_schema::{Field, Schema};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
- create_window_expr, BoundedWindowAggExec, WindowAggExec,
+ create_window_expr, schema_add_window_field, BoundedWindowAggExec,
WindowAggExec,
};
use datafusion::physical_plan::InputOrderMode::{Linear, PartiallySorted,
Sorted};
use datafusion::physical_plan::{collect, InputOrderMode};
@@ -40,7 +39,6 @@ use datafusion_expr::{
};
use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use itertools::Itertools;
use test_utils::add_empty_batches;
use hashbrown::HashMap;
@@ -276,7 +274,7 @@ async fn bounded_window_causal_non_causal() -> Result<()> {
};
let extended_schema =
- schema_add_window_fields(&args, &schema, &window_fn,
fn_name)?;
+ schema_add_window_field(&args, &schema, &window_fn,
fn_name)?;
let window_expr = create_window_expr(
&window_fn,
@@ -683,7 +681,7 @@ async fn run_window_test(
exec1 = Arc::new(SortExec::new(sort_keys, exec1)) as _;
}
- let extended_schema = schema_add_window_fields(&args, &schema, &window_fn,
&fn_name)?;
+ let extended_schema = schema_add_window_field(&args, &schema, &window_fn,
&fn_name)?;
let usual_window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
@@ -754,32 +752,6 @@ async fn run_window_test(
Ok(())
}
-// The planner has fully updated schema before calling the `create_window_expr`
-// Replicate the same for this test
-fn schema_add_window_fields(
- args: &[Arc<dyn PhysicalExpr>],
- schema: &Arc<Schema>,
- window_fn: &WindowFunctionDefinition,
- fn_name: &str,
-) -> Result<Arc<Schema>> {
- let data_types = args
- .iter()
- .map(|e| e.clone().as_ref().data_type(schema))
- .collect::<Result<Vec<_>>>()?;
- let window_expr_return_type = window_fn.return_type(&data_types)?;
- let mut window_fields = schema
- .fields()
- .iter()
- .map(|f| f.as_ref().clone())
- .collect_vec();
- window_fields.extend_from_slice(&[Field::new(
- fn_name,
- window_expr_return_type,
- true,
- )]);
- Ok(Arc::new(Schema::new(window_fields)))
-}
-
/// Return randomly sized record batches with:
/// three sorted int32 columns 'a', 'b', 'c' ranged from 0..DISTINCT as columns
/// one random int32 column x
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index d1223f7880..42c630741c 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -42,6 +42,7 @@ use datafusion_physical_expr::{
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
AggregateExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
};
+use itertools::Itertools;
mod bounded_window_agg_exec;
mod window_agg_exec;
@@ -52,6 +53,31 @@ pub use datafusion_physical_expr::window::{
};
pub use window_agg_exec::WindowAggExec;
+/// Build field from window function and add it into schema
+pub fn schema_add_window_field(
+ args: &[Arc<dyn PhysicalExpr>],
+ schema: &Schema,
+ window_fn: &WindowFunctionDefinition,
+ fn_name: &str,
+) -> Result<Arc<Schema>> {
+ let data_types = args
+ .iter()
+ .map(|e| e.clone().as_ref().data_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ let window_expr_return_type = window_fn.return_type(&data_types)?;
+ let mut window_fields = schema
+ .fields()
+ .iter()
+ .map(|f| f.as_ref().clone())
+ .collect_vec();
+ window_fields.extend_from_slice(&[Field::new(
+ fn_name,
+ window_expr_return_type,
+ false,
+ )]);
+ Ok(Arc::new(Schema::new(window_fields)))
+}
+
/// Create a physical expression for window function
#[allow(clippy::too_many_arguments)]
pub fn create_window_expr(
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index c907e991fb..a290f30586 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -40,7 +40,7 @@ use datafusion::physical_plan::expressions::{
in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr,
IsNullExpr, LikeExpr,
Literal, NegativeExpr, NotExpr, TryCastExpr,
};
-use datafusion::physical_plan::windows::create_window_expr;
+use datafusion::physical_plan::windows::{create_window_expr,
schema_add_window_field};
use datafusion::physical_plan::{
ColumnStatistics, Partitioning, PhysicalExpr, Statistics, WindowExpr,
};
@@ -155,14 +155,18 @@ pub fn parse_physical_window_expr(
)
})?;
+ let fun: WindowFunctionDefinition =
convert_required!(proto.window_function)?;
+ let name = proto.name.clone();
+ let extended_schema =
+ schema_add_window_field(&window_node_expr, input_schema, &fun, &name)?;
create_window_expr(
- &convert_required!(proto.window_function)?,
- proto.name.clone(),
+ &fun,
+ name,
&window_node_expr,
&partition_by,
&order_by,
Arc::new(window_frame),
- input_schema,
+ &extended_schema,
false,
)
}
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 30a28081ed..dd8e450d31 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -253,8 +253,7 @@ fn roundtrip_nested_loop_join() -> Result<()> {
fn roundtrip_window() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Int64, false);
- let field_c = Field::new("FIRST_VALUE(a) PARTITION BY [b] ORDER BY [a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
DataType::Int64, false);
- let schema = Arc::new(Schema::new(vec![field_a, field_b, field_c]));
+ let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let window_frame = WindowFrame::new_bounds(
datafusion_expr::WindowFrameUnits::Range,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]