This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 485b80ec49 [Minor]: Remove input_schema field from window executor
(#7810)
485b80ec49 is described below
commit 485b80ec49b75d9b181807c1a726eddf627cf4ab
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Oct 13 08:51:03 2023 +0300
[Minor]: Remove input_schema field from window executor (#7810)
* Initial commit
* Remove input schema from proto
---
.../core/src/physical_optimizer/enforce_sorting.rs | 3 ---
datafusion/core/src/physical_optimizer/test_utils.rs | 1 -
datafusion/core/src/physical_planner.rs | 2 --
datafusion/core/tests/fuzz_cases/window_fuzz.rs | 2 --
.../src/windows/bounded_window_agg_exec.rs | 14 ++------------
datafusion/physical-plan/src/windows/mod.rs | 3 ---
.../physical-plan/src/windows/window_agg_exec.rs | 14 ++------------
datafusion/proto/proto/datafusion.proto | 1 -
datafusion/proto/src/generated/pbjson.rs | 18 ------------------
datafusion/proto/src/generated/prost.rs | 2 --
datafusion/proto/src/physical_plan/mod.rs | 19 +------------------
.../proto/tests/cases/roundtrip_physical_plan.rs | 1 -
12 files changed, 5 insertions(+), 75 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index f84a05f0fd..a381bbb501 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -608,12 +608,10 @@ fn analyze_window_sort_removal(
add_sort_above(&mut window_child, sort_expr, None)?;
let uses_bounded_memory = window_expr.iter().all(|e|
e.uses_bounded_memory());
- let input_schema = window_child.schema();
let new_window = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
window_expr.to_vec(),
window_child,
- input_schema,
partitionby_exprs.to_vec(),
PartitionSearchMode::Sorted,
)?) as _
@@ -621,7 +619,6 @@ fn analyze_window_sort_removal(
Arc::new(WindowAggExec::try_new(
window_expr.to_vec(),
window_child,
- input_schema,
partitionby_exprs.to_vec(),
)?) as _
};
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 9f966990b8..ed73456514 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -238,7 +238,6 @@ pub fn bounded_window_exec(
)
.unwrap()],
input.clone(),
- input.schema(),
vec![],
crate::physical_plan::windows::PartitionSearchMode::Sorted,
)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 35119f374f..325927bb73 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -751,7 +751,6 @@ impl DefaultPhysicalPlanner {
Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input_exec,
- physical_input_schema,
physical_partition_keys,
PartitionSearchMode::Sorted,
)?)
@@ -759,7 +758,6 @@ impl DefaultPhysicalPlanner {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
- physical_input_schema,
physical_partition_keys,
)?)
})
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 1f0a4b09b1..83c8e1f578 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -461,7 +461,6 @@ async fn run_window_test(
)
.unwrap()],
exec1,
- schema.clone(),
vec![],
)
.unwrap(),
@@ -484,7 +483,6 @@ async fn run_window_test(
)
.unwrap()],
exec2,
- schema.clone(),
vec![],
search_mode,
)
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index dfef0ddefa..800ea42b35 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -88,8 +88,6 @@ pub struct BoundedWindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
- /// Schema before the window
- input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
@@ -110,11 +108,10 @@ impl BoundedWindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
- input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
partition_search_mode: PartitionSearchMode,
) -> Result<Self> {
- let schema = create_schema(&input_schema, &window_expr)?;
+ let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
let partition_by_exprs = window_expr[0].partition_by();
let ordered_partition_by_indices = match &partition_search_mode {
@@ -140,7 +137,6 @@ impl BoundedWindowAggExec {
input,
window_expr,
schema,
- input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
partition_search_mode,
@@ -158,11 +154,6 @@ impl BoundedWindowAggExec {
&self.input
}
- /// Get the input schema before any window functions are applied
- pub fn input_schema(&self) -> SchemaRef {
- self.input_schema.clone()
- }
-
/// Return the output sort order of partition keys: For example
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
// We are sure that partition by columns are always at the beginning of
sort_keys
@@ -303,7 +294,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
Ok(Arc::new(BoundedWindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
- self.input_schema.clone(),
self.partition_keys.clone(),
self.partition_search_mode.clone(),
)?))
@@ -333,7 +323,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
- let input_cols = self.input_schema.fields().len();
+ let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such
as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 0f165f7935..cc915e54af 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -421,7 +421,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
window_expr,
input.clone(),
- input.schema(),
physical_partition_keys.to_vec(),
partition_search_mode,
)?) as _))
@@ -435,7 +434,6 @@ pub fn get_best_fitting_window(
Ok(Some(Arc::new(WindowAggExec::try_new(
window_expr,
input.clone(),
- input.schema(),
physical_partition_keys.to_vec(),
)?) as _))
}
@@ -759,7 +757,6 @@ mod tests {
schema.as_ref(),
)?],
blocking_exec,
- schema,
vec![],
)?);
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index b56a9c194c..b4dc8ec88c 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -59,8 +59,6 @@ pub struct WindowAggExec {
window_expr: Vec<Arc<dyn WindowExpr>>,
/// Schema after the window is run
schema: SchemaRef,
- /// Schema before the window
- input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
@@ -75,10 +73,9 @@ impl WindowAggExec {
pub fn try_new(
window_expr: Vec<Arc<dyn WindowExpr>>,
input: Arc<dyn ExecutionPlan>,
- input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
- let schema = create_schema(&input_schema, &window_expr)?;
+ let schema = create_schema(&input.schema(), &window_expr)?;
let schema = Arc::new(schema);
let ordered_partition_by_indices =
@@ -87,7 +84,6 @@ impl WindowAggExec {
input,
window_expr,
schema,
- input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
ordered_partition_by_indices,
@@ -104,11 +100,6 @@ impl WindowAggExec {
&self.input
}
- /// Get the input schema before any window functions are applied
- pub fn input_schema(&self) -> SchemaRef {
- self.input_schema.clone()
- }
-
/// Return the output sort order of partition keys: For example
/// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
// We are sure that partition by columns are always at the beginning of
sort_keys
@@ -230,7 +221,6 @@ impl ExecutionPlan for WindowAggExec {
Ok(Arc::new(WindowAggExec::try_new(
self.window_expr.clone(),
children[0].clone(),
- self.input_schema.clone(),
self.partition_keys.clone(),
)?))
}
@@ -259,7 +249,7 @@ impl ExecutionPlan for WindowAggExec {
fn statistics(&self) -> Statistics {
let input_stat = self.input.statistics();
let win_cols = self.window_expr.len();
- let input_cols = self.input_schema.fields().len();
+ let input_cols = self.input.schema().fields().len();
// TODO stats: some windowing function will maintain invariants such
as min, max...
let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
if let Some(input_col_stats) = input_stat.column_statistics {
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index bda0f78287..c60dae71ef 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1419,7 +1419,6 @@ message PartiallySortedPartitionSearchMode {
message WindowAggExecNode {
PhysicalPlanNode input = 1;
repeated PhysicalWindowExprNode window_expr = 2;
- Schema input_schema = 4;
repeated PhysicalExprNode partition_keys = 5;
// Set optional to `None` for `BoundedWindowAggExec`.
oneof partition_search_mode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index ced0c8bd7c..266075e689 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -23969,9 +23969,6 @@ impl serde::Serialize for WindowAggExecNode {
if !self.window_expr.is_empty() {
len += 1;
}
- if self.input_schema.is_some() {
- len += 1;
- }
if !self.partition_keys.is_empty() {
len += 1;
}
@@ -23985,9 +23982,6 @@ impl serde::Serialize for WindowAggExecNode {
if !self.window_expr.is_empty() {
struct_ser.serialize_field("windowExpr", &self.window_expr)?;
}
- if let Some(v) = self.input_schema.as_ref() {
- struct_ser.serialize_field("inputSchema", v)?;
- }
if !self.partition_keys.is_empty() {
struct_ser.serialize_field("partitionKeys", &self.partition_keys)?;
}
@@ -24017,8 +24011,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode
{
"input",
"window_expr",
"windowExpr",
- "input_schema",
- "inputSchema",
"partition_keys",
"partitionKeys",
"linear",
@@ -24031,7 +24023,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode
{
enum GeneratedField {
Input,
WindowExpr,
- InputSchema,
PartitionKeys,
Linear,
PartiallySorted,
@@ -24059,7 +24050,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode
{
match value {
"input" => Ok(GeneratedField::Input),
"windowExpr" | "window_expr" =>
Ok(GeneratedField::WindowExpr),
- "inputSchema" | "input_schema" =>
Ok(GeneratedField::InputSchema),
"partitionKeys" | "partition_keys" =>
Ok(GeneratedField::PartitionKeys),
"linear" => Ok(GeneratedField::Linear),
"partiallySorted" | "partially_sorted" =>
Ok(GeneratedField::PartiallySorted),
@@ -24085,7 +24075,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode
{
{
let mut input__ = None;
let mut window_expr__ = None;
- let mut input_schema__ = None;
let mut partition_keys__ = None;
let mut partition_search_mode__ = None;
while let Some(k) = map_.next_key()? {
@@ -24102,12 +24091,6 @@ impl<'de> serde::Deserialize<'de> for
WindowAggExecNode {
}
window_expr__ = Some(map_.next_value()?);
}
- GeneratedField::InputSchema => {
- if input_schema__.is_some() {
- return
Err(serde::de::Error::duplicate_field("inputSchema"));
- }
- input_schema__ = map_.next_value()?;
- }
GeneratedField::PartitionKeys => {
if partition_keys__.is_some() {
return
Err(serde::de::Error::duplicate_field("partitionKeys"));
@@ -24140,7 +24123,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode
{
Ok(WindowAggExecNode {
input: input__,
window_expr: window_expr__.unwrap_or_default(),
- input_schema: input_schema__,
partition_keys: partition_keys__.unwrap_or_default(),
partition_search_mode: partition_search_mode__,
})
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index ca20cd35cb..894afa570f 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1994,8 +1994,6 @@ pub struct WindowAggExecNode {
pub input:
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
#[prost(message, repeated, tag = "2")]
pub window_expr: ::prost::alloc::vec::Vec<PhysicalWindowExprNode>,
- #[prost(message, optional, tag = "4")]
- pub input_schema: ::core::option::Option<Schema>,
#[prost(message, repeated, tag = "5")]
pub partition_keys: ::prost::alloc::vec::Vec<PhysicalExprNode>,
/// Set optional to `None` for `BoundedWindowAggExec`.
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 8257f9aa34..08010a3151 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -282,16 +282,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
runtime,
extension_codec,
)?;
- let input_schema = window_agg
- .input_schema
- .as_ref()
- .ok_or_else(|| {
- DataFusionError::Internal(
- "input_schema in WindowAggrNode is
missing.".to_owned(),
- )
- })?
- .clone();
- let input_schema: SchemaRef =
SchemaRef::new((&input_schema).try_into()?);
+ let input_schema = input.schema();
let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
.window_expr
@@ -333,7 +324,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(BoundedWindowAggExec::try_new(
physical_window_expr,
input,
- input_schema,
partition_keys,
partition_search_mode,
)?))
@@ -341,7 +331,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(WindowAggExec::try_new(
physical_window_expr,
input,
- input_schema,
partition_keys,
)?))
}
@@ -1315,8 +1304,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;
- let input_schema =
protobuf::Schema::try_from(exec.input_schema().as_ref())?;
-
let window_expr =
exec.window_expr()
.iter()
@@ -1334,7 +1321,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
- input_schema: Some(input_schema),
partition_keys,
partition_search_mode: None,
},
@@ -1346,8 +1332,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
extension_codec,
)?;
- let input_schema =
protobuf::Schema::try_from(exec.input_schema().as_ref())?;
-
let window_expr =
exec.window_expr()
.iter()
@@ -1385,7 +1369,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::WindowAggExecNode {
input: Some(Box::new(input)),
window_expr,
- input_schema: Some(input_schema),
partition_keys,
partition_search_mode: Some(partition_search_mode),
},
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 77e77630bc..e30d416bdc 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -275,7 +275,6 @@ fn roundtrip_window() -> Result<()> {
sliding_aggr_window_expr,
],
input,
- schema.clone(),
vec![col("b", &schema)?],
)?))
}