This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 485b80ec49 [Minor]: Remove input_schema field from window executor 
(#7810)
485b80ec49 is described below

commit 485b80ec49b75d9b181807c1a726eddf627cf4ab
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Oct 13 08:51:03 2023 +0300

    [Minor]: Remove input_schema field from window executor (#7810)
    
    * Initial commit
    
    * Remove input schema from proto
---
 .../core/src/physical_optimizer/enforce_sorting.rs    |  3 ---
 datafusion/core/src/physical_optimizer/test_utils.rs  |  1 -
 datafusion/core/src/physical_planner.rs               |  2 --
 datafusion/core/tests/fuzz_cases/window_fuzz.rs       |  2 --
 .../src/windows/bounded_window_agg_exec.rs            | 14 ++------------
 datafusion/physical-plan/src/windows/mod.rs           |  3 ---
 .../physical-plan/src/windows/window_agg_exec.rs      | 14 ++------------
 datafusion/proto/proto/datafusion.proto               |  1 -
 datafusion/proto/src/generated/pbjson.rs              | 18 ------------------
 datafusion/proto/src/generated/prost.rs               |  2 --
 datafusion/proto/src/physical_plan/mod.rs             | 19 +------------------
 .../proto/tests/cases/roundtrip_physical_plan.rs      |  1 -
 12 files changed, 5 insertions(+), 75 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index f84a05f0fd..a381bbb501 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -608,12 +608,10 @@ fn analyze_window_sort_removal(
         add_sort_above(&mut window_child, sort_expr, None)?;
 
         let uses_bounded_memory = window_expr.iter().all(|e| 
e.uses_bounded_memory());
-        let input_schema = window_child.schema();
         let new_window = if uses_bounded_memory {
             Arc::new(BoundedWindowAggExec::try_new(
                 window_expr.to_vec(),
                 window_child,
-                input_schema,
                 partitionby_exprs.to_vec(),
                 PartitionSearchMode::Sorted,
             )?) as _
@@ -621,7 +619,6 @@ fn analyze_window_sort_removal(
             Arc::new(WindowAggExec::try_new(
                 window_expr.to_vec(),
                 window_child,
-                input_schema,
                 partitionby_exprs.to_vec(),
             )?) as _
         };
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 9f966990b8..ed73456514 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -238,7 +238,6 @@ pub fn bounded_window_exec(
             )
             .unwrap()],
             input.clone(),
-            input.schema(),
             vec![],
             crate::physical_plan::windows::PartitionSearchMode::Sorted,
         )
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 35119f374f..325927bb73 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -751,7 +751,6 @@ impl DefaultPhysicalPlanner {
                         Arc::new(BoundedWindowAggExec::try_new(
                             window_expr,
                             input_exec,
-                            physical_input_schema,
                             physical_partition_keys,
                             PartitionSearchMode::Sorted,
                         )?)
@@ -759,7 +758,6 @@ impl DefaultPhysicalPlanner {
                         Arc::new(WindowAggExec::try_new(
                             window_expr,
                             input_exec,
-                            physical_input_schema,
                             physical_partition_keys,
                         )?)
                     })
diff --git a/datafusion/core/tests/fuzz_cases/window_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
index 1f0a4b09b1..83c8e1f578 100644
--- a/datafusion/core/tests/fuzz_cases/window_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/window_fuzz.rs
@@ -461,7 +461,6 @@ async fn run_window_test(
             )
             .unwrap()],
             exec1,
-            schema.clone(),
             vec![],
         )
         .unwrap(),
@@ -484,7 +483,6 @@ async fn run_window_test(
             )
             .unwrap()],
             exec2,
-            schema.clone(),
             vec![],
             search_mode,
         )
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index dfef0ddefa..800ea42b35 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -88,8 +88,6 @@ pub struct BoundedWindowAggExec {
     window_expr: Vec<Arc<dyn WindowExpr>>,
     /// Schema after the window is run
     schema: SchemaRef,
-    /// Schema before the window
-    input_schema: SchemaRef,
     /// Partition Keys
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
@@ -110,11 +108,10 @@ impl BoundedWindowAggExec {
     pub fn try_new(
         window_expr: Vec<Arc<dyn WindowExpr>>,
         input: Arc<dyn ExecutionPlan>,
-        input_schema: SchemaRef,
         partition_keys: Vec<Arc<dyn PhysicalExpr>>,
         partition_search_mode: PartitionSearchMode,
     ) -> Result<Self> {
-        let schema = create_schema(&input_schema, &window_expr)?;
+        let schema = create_schema(&input.schema(), &window_expr)?;
         let schema = Arc::new(schema);
         let partition_by_exprs = window_expr[0].partition_by();
         let ordered_partition_by_indices = match &partition_search_mode {
@@ -140,7 +137,6 @@ impl BoundedWindowAggExec {
             input,
             window_expr,
             schema,
-            input_schema,
             partition_keys,
             metrics: ExecutionPlanMetricsSet::new(),
             partition_search_mode,
@@ -158,11 +154,6 @@ impl BoundedWindowAggExec {
         &self.input
     }
 
-    /// Get the input schema before any window functions are applied
-    pub fn input_schema(&self) -> SchemaRef {
-        self.input_schema.clone()
-    }
-
     /// Return the output sort order of partition keys: For example
     /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
     // We are sure that partition by columns are always at the beginning of 
sort_keys
@@ -303,7 +294,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
         Ok(Arc::new(BoundedWindowAggExec::try_new(
             self.window_expr.clone(),
             children[0].clone(),
-            self.input_schema.clone(),
             self.partition_keys.clone(),
             self.partition_search_mode.clone(),
         )?))
@@ -333,7 +323,7 @@ impl ExecutionPlan for BoundedWindowAggExec {
     fn statistics(&self) -> Statistics {
         let input_stat = self.input.statistics();
         let win_cols = self.window_expr.len();
-        let input_cols = self.input_schema.fields().len();
+        let input_cols = self.input.schema().fields().len();
         // TODO stats: some windowing function will maintain invariants such 
as min, max...
         let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
         if let Some(input_col_stats) = input_stat.column_statistics {
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 0f165f7935..cc915e54af 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -421,7 +421,6 @@ pub fn get_best_fitting_window(
         Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
             window_expr,
             input.clone(),
-            input.schema(),
             physical_partition_keys.to_vec(),
             partition_search_mode,
         )?) as _))
@@ -435,7 +434,6 @@ pub fn get_best_fitting_window(
         Ok(Some(Arc::new(WindowAggExec::try_new(
             window_expr,
             input.clone(),
-            input.schema(),
             physical_partition_keys.to_vec(),
         )?) as _))
     }
@@ -759,7 +757,6 @@ mod tests {
                 schema.as_ref(),
             )?],
             blocking_exec,
-            schema,
             vec![],
         )?);
 
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index b56a9c194c..b4dc8ec88c 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -59,8 +59,6 @@ pub struct WindowAggExec {
     window_expr: Vec<Arc<dyn WindowExpr>>,
     /// Schema after the window is run
     schema: SchemaRef,
-    /// Schema before the window
-    input_schema: SchemaRef,
     /// Partition Keys
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
@@ -75,10 +73,9 @@ impl WindowAggExec {
     pub fn try_new(
         window_expr: Vec<Arc<dyn WindowExpr>>,
         input: Arc<dyn ExecutionPlan>,
-        input_schema: SchemaRef,
         partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     ) -> Result<Self> {
-        let schema = create_schema(&input_schema, &window_expr)?;
+        let schema = create_schema(&input.schema(), &window_expr)?;
         let schema = Arc::new(schema);
 
         let ordered_partition_by_indices =
@@ -87,7 +84,6 @@ impl WindowAggExec {
             input,
             window_expr,
             schema,
-            input_schema,
             partition_keys,
             metrics: ExecutionPlanMetricsSet::new(),
             ordered_partition_by_indices,
@@ -104,11 +100,6 @@ impl WindowAggExec {
         &self.input
     }
 
-    /// Get the input schema before any window functions are applied
-    pub fn input_schema(&self) -> SchemaRef {
-        self.input_schema.clone()
-    }
-
     /// Return the output sort order of partition keys: For example
     /// OVER(PARTITION BY a, ORDER BY b) -> would give sorting of the column a
     // We are sure that partition by columns are always at the beginning of 
sort_keys
@@ -230,7 +221,6 @@ impl ExecutionPlan for WindowAggExec {
         Ok(Arc::new(WindowAggExec::try_new(
             self.window_expr.clone(),
             children[0].clone(),
-            self.input_schema.clone(),
             self.partition_keys.clone(),
         )?))
     }
@@ -259,7 +249,7 @@ impl ExecutionPlan for WindowAggExec {
     fn statistics(&self) -> Statistics {
         let input_stat = self.input.statistics();
         let win_cols = self.window_expr.len();
-        let input_cols = self.input_schema.fields().len();
+        let input_cols = self.input.schema().fields().len();
         // TODO stats: some windowing function will maintain invariants such 
as min, max...
         let mut column_statistics = Vec::with_capacity(win_cols + input_cols);
         if let Some(input_col_stats) = input_stat.column_statistics {
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index bda0f78287..c60dae71ef 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1419,7 +1419,6 @@ message PartiallySortedPartitionSearchMode {
 message WindowAggExecNode {
   PhysicalPlanNode input = 1;
   repeated PhysicalWindowExprNode window_expr = 2;
-  Schema input_schema = 4;
   repeated PhysicalExprNode partition_keys = 5;
   // Set optional to `None` for `BoundedWindowAggExec`.
   oneof partition_search_mode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index ced0c8bd7c..266075e689 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -23969,9 +23969,6 @@ impl serde::Serialize for WindowAggExecNode {
         if !self.window_expr.is_empty() {
             len += 1;
         }
-        if self.input_schema.is_some() {
-            len += 1;
-        }
         if !self.partition_keys.is_empty() {
             len += 1;
         }
@@ -23985,9 +23982,6 @@ impl serde::Serialize for WindowAggExecNode {
         if !self.window_expr.is_empty() {
             struct_ser.serialize_field("windowExpr", &self.window_expr)?;
         }
-        if let Some(v) = self.input_schema.as_ref() {
-            struct_ser.serialize_field("inputSchema", v)?;
-        }
         if !self.partition_keys.is_empty() {
             struct_ser.serialize_field("partitionKeys", &self.partition_keys)?;
         }
@@ -24017,8 +24011,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode 
{
             "input",
             "window_expr",
             "windowExpr",
-            "input_schema",
-            "inputSchema",
             "partition_keys",
             "partitionKeys",
             "linear",
@@ -24031,7 +24023,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode 
{
         enum GeneratedField {
             Input,
             WindowExpr,
-            InputSchema,
             PartitionKeys,
             Linear,
             PartiallySorted,
@@ -24059,7 +24050,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode 
{
                         match value {
                             "input" => Ok(GeneratedField::Input),
                             "windowExpr" | "window_expr" => 
Ok(GeneratedField::WindowExpr),
-                            "inputSchema" | "input_schema" => 
Ok(GeneratedField::InputSchema),
                             "partitionKeys" | "partition_keys" => 
Ok(GeneratedField::PartitionKeys),
                             "linear" => Ok(GeneratedField::Linear),
                             "partiallySorted" | "partially_sorted" => 
Ok(GeneratedField::PartiallySorted),
@@ -24085,7 +24075,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode 
{
             {
                 let mut input__ = None;
                 let mut window_expr__ = None;
-                let mut input_schema__ = None;
                 let mut partition_keys__ = None;
                 let mut partition_search_mode__ = None;
                 while let Some(k) = map_.next_key()? {
@@ -24102,12 +24091,6 @@ impl<'de> serde::Deserialize<'de> for 
WindowAggExecNode {
                             }
                             window_expr__ = Some(map_.next_value()?);
                         }
-                        GeneratedField::InputSchema => {
-                            if input_schema__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("inputSchema"));
-                            }
-                            input_schema__ = map_.next_value()?;
-                        }
                         GeneratedField::PartitionKeys => {
                             if partition_keys__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("partitionKeys"));
@@ -24140,7 +24123,6 @@ impl<'de> serde::Deserialize<'de> for WindowAggExecNode 
{
                 Ok(WindowAggExecNode {
                     input: input__,
                     window_expr: window_expr__.unwrap_or_default(),
-                    input_schema: input_schema__,
                     partition_keys: partition_keys__.unwrap_or_default(),
                     partition_search_mode: partition_search_mode__,
                 })
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index ca20cd35cb..894afa570f 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1994,8 +1994,6 @@ pub struct WindowAggExecNode {
     pub input: 
::core::option::Option<::prost::alloc::boxed::Box<PhysicalPlanNode>>,
     #[prost(message, repeated, tag = "2")]
     pub window_expr: ::prost::alloc::vec::Vec<PhysicalWindowExprNode>,
-    #[prost(message, optional, tag = "4")]
-    pub input_schema: ::core::option::Option<Schema>,
     #[prost(message, repeated, tag = "5")]
     pub partition_keys: ::prost::alloc::vec::Vec<PhysicalExprNode>,
     /// Set optional to `None` for `BoundedWindowAggExec`.
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 8257f9aa34..08010a3151 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -282,16 +282,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     runtime,
                     extension_codec,
                 )?;
-                let input_schema = window_agg
-                    .input_schema
-                    .as_ref()
-                    .ok_or_else(|| {
-                        DataFusionError::Internal(
-                            "input_schema in WindowAggrNode is 
missing.".to_owned(),
-                        )
-                    })?
-                    .clone();
-                let input_schema: SchemaRef = 
SchemaRef::new((&input_schema).try_into()?);
+                let input_schema = input.schema();
 
                 let physical_window_expr: Vec<Arc<dyn WindowExpr>> = window_agg
                     .window_expr
@@ -333,7 +324,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     Ok(Arc::new(BoundedWindowAggExec::try_new(
                         physical_window_expr,
                         input,
-                        input_schema,
                         partition_keys,
                         partition_search_mode,
                     )?))
@@ -341,7 +331,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     Ok(Arc::new(WindowAggExec::try_new(
                         physical_window_expr,
                         input,
-                        input_schema,
                         partition_keys,
                     )?))
                 }
@@ -1315,8 +1304,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 extension_codec,
             )?;
 
-            let input_schema = 
protobuf::Schema::try_from(exec.input_schema().as_ref())?;
-
             let window_expr =
                 exec.window_expr()
                     .iter()
@@ -1334,7 +1321,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     protobuf::WindowAggExecNode {
                         input: Some(Box::new(input)),
                         window_expr,
-                        input_schema: Some(input_schema),
                         partition_keys,
                         partition_search_mode: None,
                     },
@@ -1346,8 +1332,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 extension_codec,
             )?;
 
-            let input_schema = 
protobuf::Schema::try_from(exec.input_schema().as_ref())?;
-
             let window_expr =
                 exec.window_expr()
                     .iter()
@@ -1385,7 +1369,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     protobuf::WindowAggExecNode {
                         input: Some(Box::new(input)),
                         window_expr,
-                        input_schema: Some(input_schema),
                         partition_keys,
                         partition_search_mode: Some(partition_search_mode),
                     },
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 77e77630bc..e30d416bdc 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -275,7 +275,6 @@ fn roundtrip_window() -> Result<()> {
             sliding_aggr_window_expr,
         ],
         input,
-        schema.clone(),
         vec![col("b", &schema)?],
     )?))
 }

Reply via email to