This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d67c0bbecd Remove order_bys from AggregateExec state (#8537)
d67c0bbecd is described below

commit d67c0bbecd8f32049de2c931c077a66ed640413a
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Dec 14 23:15:15 2023 +0300

    Remove order_bys from AggregateExec state (#8537)
    
    * Initial commit
    
    * Remove order by from aggregate exec state
---
 .../src/physical_optimizer/aggregate_statistics.rs | 12 ----------
 .../combine_partial_final_agg.rs                   |  4 ----
 .../src/physical_optimizer/enforce_distribution.rs |  4 ----
 .../limited_distinct_aggregation.rs                | 25 ++++---------------
 .../core/src/physical_optimizer/test_utils.rs      |  1 -
 .../src/physical_optimizer/topk_aggregation.rs     |  1 -
 datafusion/core/src/physical_planner.rs            |  5 +---
 datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs |  2 --
 datafusion/physical-plan/src/aggregates/mod.rs     | 28 ++++------------------
 datafusion/physical-plan/src/limit.rs              |  1 -
 datafusion/proto/proto/datafusion.proto            |  1 -
 datafusion/proto/src/generated/pbjson.rs           | 18 --------------
 datafusion/proto/src/generated/prost.rs            |  2 --
 datafusion/proto/src/physical_plan/mod.rs          | 21 ----------------
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  3 ---
 15 files changed, 9 insertions(+), 119 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs 
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 795857b10e..86a8cdb7b3 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -397,7 +397,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             source,
             Arc::clone(&schema),
         )?;
@@ -407,7 +406,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
         )?;
@@ -429,7 +427,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             source,
             Arc::clone(&schema),
         )?;
@@ -439,7 +436,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
         )?;
@@ -460,7 +456,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             source,
             Arc::clone(&schema),
         )?;
@@ -473,7 +468,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             Arc::new(coalesce),
             Arc::clone(&schema),
         )?;
@@ -494,7 +488,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             source,
             Arc::clone(&schema),
         )?;
@@ -507,7 +500,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             Arc::new(coalesce),
             Arc::clone(&schema),
         )?;
@@ -539,7 +531,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             filter,
             Arc::clone(&schema),
         )?;
@@ -549,7 +540,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
         )?;
@@ -586,7 +576,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             filter,
             Arc::clone(&schema),
         )?;
@@ -596,7 +585,6 @@ pub(crate) mod tests {
             PhysicalGroupBy::default(),
             vec![agg.count_expr()],
             vec![None],
-            vec![None],
             Arc::new(partial_agg),
             Arc::clone(&schema),
         )?;
diff --git 
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs 
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 0948445de2..c50ea36b68 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -91,7 +91,6 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
                                             input_agg_exec.group_by().clone(),
                                             
input_agg_exec.aggr_expr().to_vec(),
                                             
input_agg_exec.filter_expr().to_vec(),
-                                            
input_agg_exec.order_by_expr().to_vec(),
                                             input_agg_exec.input().clone(),
                                             input_agg_exec.input_schema(),
                                         )
@@ -277,7 +276,6 @@ mod tests {
                 group_by,
                 aggr_expr,
                 vec![],
-                vec![],
                 input,
                 schema,
             )
@@ -297,7 +295,6 @@ mod tests {
                 group_by,
                 aggr_expr,
                 vec![],
-                vec![],
                 input,
                 schema,
             )
@@ -458,7 +455,6 @@ mod tests {
                 final_group_by,
                 aggr_expr,
                 vec![],
-                vec![],
                 partial_agg,
                 schema,
             )
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 93cdbf8583..f2e04989ef 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -521,7 +521,6 @@ fn reorder_aggregate_keys(
                             new_partial_group_by,
                             agg_exec.aggr_expr().to_vec(),
                             agg_exec.filter_expr().to_vec(),
-                            agg_exec.order_by_expr().to_vec(),
                             agg_exec.input().clone(),
                             agg_exec.input_schema.clone(),
                         )?))
@@ -548,7 +547,6 @@ fn reorder_aggregate_keys(
                         new_group_by,
                         agg_exec.aggr_expr().to_vec(),
                         agg_exec.filter_expr().to_vec(),
-                        agg_exec.order_by_expr().to_vec(),
                         partial_agg,
                         agg_exec.input_schema(),
                     )?);
@@ -1909,14 +1907,12 @@ pub(crate) mod tests {
                 final_grouping,
                 vec![],
                 vec![],
-                vec![],
                 Arc::new(
                     AggregateExec::try_new(
                         AggregateMode::Partial,
                         group_by,
                         vec![],
                         vec![],
-                        vec![],
                         input,
                         schema.clone(),
                     )
diff --git 
a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs 
b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
index 8f5dbc2e92..540f9a6a13 100644
--- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
@@ -55,7 +55,6 @@ impl LimitedDistinctAggregation {
             aggr.group_by().clone(),
             aggr.aggr_expr().to_vec(),
             aggr.filter_expr().to_vec(),
-            aggr.order_by_expr().to_vec(),
             aggr.input().clone(),
             aggr.input_schema(),
         )
@@ -307,7 +306,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![],         /* aggr_expr */
             vec![None],     /* filter_expr */
-            vec![None],     /* order_by_expr */
             source,         /* input */
             schema.clone(), /* input_schema */
         )?;
@@ -316,7 +314,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![],                /* aggr_expr */
             vec![None],            /* filter_expr */
-            vec![None],            /* order_by_expr */
             Arc::new(partial_agg), /* input */
             schema.clone(),        /* input_schema */
         )?;
@@ -359,7 +356,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![],         /* aggr_expr */
             vec![None],     /* filter_expr */
-            vec![None],     /* order_by_expr */
             source,         /* input */
             schema.clone(), /* input_schema */
         )?;
@@ -401,7 +397,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![],         /* aggr_expr */
             vec![None],     /* filter_expr */
-            vec![None],     /* order_by_expr */
             source,         /* input */
             schema.clone(), /* input_schema */
         )?;
@@ -443,7 +438,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string(), 
"b".to_string()]),
             vec![],         /* aggr_expr */
             vec![None],     /* filter_expr */
-            vec![None],     /* order_by_expr */
             source,         /* input */
             schema.clone(), /* input_schema */
         )?;
@@ -452,7 +446,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![],                 /* aggr_expr */
             vec![None],             /* filter_expr */
-            vec![None],             /* order_by_expr */
             Arc::new(group_by_agg), /* input */
             schema.clone(),         /* input_schema */
         )?;
@@ -495,7 +488,6 @@ mod tests {
             build_group_by(&schema.clone(), vec![]),
             vec![],         /* aggr_expr */
             vec![None],     /* filter_expr */
-            vec![None],     /* order_by_expr */
             source,         /* input */
             schema.clone(), /* input_schema */
         )?;
@@ -526,7 +518,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![agg.count_expr()], /* aggr_expr */
             vec![None],             /* filter_expr */
-            vec![None],             /* order_by_expr */
             source,                 /* input */
             schema.clone(),         /* input_schema */
         )?;
@@ -563,7 +554,6 @@ mod tests {
             build_group_by(&schema.clone(), vec!["a".to_string()]),
             vec![],            /* aggr_expr */
             vec![filter_expr], /* filter_expr */
-            vec![None],        /* order_by_expr */
             source,            /* input */
             schema.clone(),    /* input_schema */
         )?;
@@ -592,22 +582,15 @@ mod tests {
         let source = parquet_exec_with_sort(vec![sort_key]);
         let schema = source.schema();
 
-        // `SELECT a FROM MemoryExec GROUP BY a ORDER BY a LIMIT 10;`, Single 
AggregateExec
-        let order_by_expr = Some(vec![PhysicalSortExpr {
-            expr: expressions::col("a", &schema.clone()).unwrap(),
-            options: SortOptions::default(),
-        }]);
-
         // `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single 
AggregateExec
         // the `a > 1` filter is applied in the AggregateExec
         let single_agg = AggregateExec::try_new(
             AggregateMode::Single,
             build_group_by(&schema.clone(), vec!["a".to_string()]),
-            vec![],              /* aggr_expr */
-            vec![None],          /* filter_expr */
-            vec![order_by_expr], /* order_by_expr */
-            source,              /* input */
-            schema.clone(),      /* input_schema */
+            vec![],         /* aggr_expr */
+            vec![None],     /* filter_expr */
+            source,         /* input */
+            schema.clone(), /* input_schema */
         )?;
         let limit_exec = LocalLimitExec::new(
             Arc::new(single_agg),
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 37a76eff1e..678dc1f373 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -339,7 +339,6 @@ pub fn aggregate_exec(input: Arc<dyn ExecutionPlan>) -> 
Arc<dyn ExecutionPlan> {
             PhysicalGroupBy::default(),
             vec![],
             vec![],
-            vec![],
             input,
             schema,
         )
diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs 
b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
index 52d34d4f81..dd02614203 100644
--- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
@@ -73,7 +73,6 @@ impl TopKAggregation {
             aggr.group_by().clone(),
             aggr.aggr_expr().to_vec(),
             aggr.filter_expr().to_vec(),
-            aggr.order_by_expr().to_vec(),
             aggr.input().clone(),
             aggr.input_schema(),
         )
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 93f0b31e52..e5816eb49e 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -795,14 +795,13 @@ impl DefaultPhysicalPlanner {
                         })
                         .collect::<Result<Vec<_>>>()?;
 
-                    let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>, 
Vec<_>) = multiunzip(agg_filter);
+                    let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>, 
Vec<_>) = multiunzip(agg_filter);
 
                     let initial_aggr = Arc::new(AggregateExec::try_new(
                         AggregateMode::Partial,
                         groups.clone(),
                         aggregates.clone(),
                         filters.clone(),
-                        order_bys,
                         input_exec,
                         physical_input_schema.clone(),
                     )?);
@@ -820,7 +819,6 @@ impl DefaultPhysicalPlanner {
                     // To reflect such changes to subsequent stages, use the 
updated
                     // `AggregateExpr`/`PhysicalSortExpr` objects.
                     let updated_aggregates = initial_aggr.aggr_expr().to_vec();
-                    let updated_order_bys = 
initial_aggr.order_by_expr().to_vec();
 
                     let next_partition_mode = if can_repartition {
                         // construct a second aggregation with 
'AggregateMode::FinalPartitioned'
@@ -844,7 +842,6 @@ impl DefaultPhysicalPlanner {
                         final_grouping_set,
                         updated_aggregates,
                         filters,
-                        updated_order_bys,
                         initial_aggr,
                         physical_input_schema.clone(),
                     )?))
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 821f236af8..9069dbbd58 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -109,7 +109,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, 
group_by_columns: Vec<&str
             group_by.clone(),
             aggregate_expr.clone(),
             vec![None],
-            vec![None],
             running_source,
             schema.clone(),
         )
@@ -122,7 +121,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, 
group_by_columns: Vec<&str
             group_by.clone(),
             aggregate_expr.clone(),
             vec![None],
-            vec![None],
             usual_source,
             schema.clone(),
         )
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 2f69ed061c..c74c4ac0f8 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -279,8 +279,6 @@ pub struct AggregateExec {
     aggr_expr: Vec<Arc<dyn AggregateExpr>>,
     /// FILTER (WHERE clause) expression for each aggregate expression
     filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
-    /// (ORDER BY clause) expression for each aggregate expression
-    order_by_expr: Vec<Option<LexOrdering>>,
     /// Set if the output of this aggregation is truncated by a upstream 
sort/limit clause
     limit: Option<usize>,
     /// Input plan, could be a partial aggregate or the input to the aggregate
@@ -468,8 +466,6 @@ impl AggregateExec {
         group_by: PhysicalGroupBy,
         mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
-        // Ordering requirement of each aggregate expression
-        mut order_by_expr: Vec<Option<LexOrdering>>,
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
     ) -> Result<Self> {
@@ -487,10 +483,10 @@ impl AggregateExec {
         ));
         let original_schema = Arc::new(original_schema);
         // Reset ordering requirement to `None` if aggregator is not 
order-sensitive
-        order_by_expr = aggr_expr
+        let mut order_by_expr = aggr_expr
             .iter()
-            .zip(order_by_expr)
-            .map(|(aggr_expr, fn_reqs)| {
+            .map(|aggr_expr| {
+                let fn_reqs = aggr_expr.order_bys().map(|ordering| 
ordering.to_vec());
                 // If
                 // - aggregation function is order-sensitive and
                 // - aggregation is performing a "first stage" calculation, and
@@ -558,7 +554,6 @@ impl AggregateExec {
             group_by,
             aggr_expr,
             filter_expr,
-            order_by_expr,
             input,
             original_schema,
             schema,
@@ -602,11 +597,6 @@ impl AggregateExec {
         &self.filter_expr
     }
 
-    /// ORDER BY clause expression for each aggregate expression
-    pub fn order_by_expr(&self) -> &[Option<LexOrdering>] {
-        &self.order_by_expr
-    }
-
     /// Input plan
     pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
         &self.input
@@ -684,7 +674,7 @@ impl AggregateExec {
             return false;
         }
         // ensure there are no order by expressions
-        if self.order_by_expr().iter().any(|e| e.is_some()) {
+        if self.aggr_expr().iter().any(|e| e.order_bys().is_some()) {
             return false;
         }
         // ensure there is no output ordering; can this rule be relaxed?
@@ -873,7 +863,6 @@ impl ExecutionPlan for AggregateExec {
             self.group_by.clone(),
             self.aggr_expr.clone(),
             self.filter_expr.clone(),
-            self.order_by_expr.clone(),
             children[0].clone(),
             self.input_schema.clone(),
         )?;
@@ -1395,7 +1384,6 @@ mod tests {
             grouping_set.clone(),
             aggregates.clone(),
             vec![None],
-            vec![None],
             input,
             input_schema.clone(),
         )?);
@@ -1474,7 +1462,6 @@ mod tests {
             final_grouping_set,
             aggregates,
             vec![None],
-            vec![None],
             merge,
             input_schema,
         )?);
@@ -1540,7 +1527,6 @@ mod tests {
             grouping_set.clone(),
             aggregates.clone(),
             vec![None],
-            vec![None],
             input,
             input_schema.clone(),
         )?);
@@ -1588,7 +1574,6 @@ mod tests {
             final_grouping_set,
             aggregates,
             vec![None],
-            vec![None],
             merge,
             input_schema,
         )?);
@@ -1855,7 +1840,6 @@ mod tests {
                 groups,
                 aggregates,
                 vec![None; 3],
-                vec![None; 3],
                 input.clone(),
                 input_schema.clone(),
             )?);
@@ -1911,7 +1895,6 @@ mod tests {
             groups.clone(),
             aggregates.clone(),
             vec![None],
-            vec![None],
             blocking_exec,
             schema,
         )?);
@@ -1950,7 +1933,6 @@ mod tests {
             groups,
             aggregates.clone(),
             vec![None],
-            vec![None],
             blocking_exec,
             schema,
         )?);
@@ -2052,7 +2034,6 @@ mod tests {
             groups.clone(),
             aggregates.clone(),
             vec![None],
-            vec![Some(ordering_req.clone())],
             memory_exec,
             schema.clone(),
         )?);
@@ -2068,7 +2049,6 @@ mod tests {
             groups,
             aggregates.clone(),
             vec![None],
-            vec![Some(ordering_req)],
             coalesce,
             schema,
         )?) as Arc<dyn ExecutionPlan>;
diff --git a/datafusion/physical-plan/src/limit.rs 
b/datafusion/physical-plan/src/limit.rs
index 355561c36f..37e8ffd761 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -878,7 +878,6 @@ mod tests {
             build_group_by(&csv.schema().clone(), vec!["i".to_string()]),
             vec![],
             vec![None],
-            vec![None],
             csv.clone(),
             csv.schema().clone(),
         )?;
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index f391592dfe..bd8053c817 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1553,7 +1553,6 @@ message AggregateExecNode {
   repeated PhysicalExprNode null_expr = 8;
   repeated bool groups = 9;
   repeated MaybeFilter filter_expr = 10;
-  repeated MaybePhysicalSortExprs order_by_expr = 11;
 }
 
 message GlobalLimitExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index d506b5dcce..88310be031 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -36,9 +36,6 @@ impl serde::Serialize for AggregateExecNode {
         if !self.filter_expr.is_empty() {
             len += 1;
         }
-        if !self.order_by_expr.is_empty() {
-            len += 1;
-        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.AggregateExecNode", len)?;
         if !self.group_expr.is_empty() {
             struct_ser.serialize_field("groupExpr", &self.group_expr)?;
@@ -72,9 +69,6 @@ impl serde::Serialize for AggregateExecNode {
         if !self.filter_expr.is_empty() {
             struct_ser.serialize_field("filterExpr", &self.filter_expr)?;
         }
-        if !self.order_by_expr.is_empty() {
-            struct_ser.serialize_field("orderByExpr", &self.order_by_expr)?;
-        }
         struct_ser.end()
     }
 }
@@ -102,8 +96,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
             "groups",
             "filter_expr",
             "filterExpr",
-            "order_by_expr",
-            "orderByExpr",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -118,7 +110,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
             NullExpr,
             Groups,
             FilterExpr,
-            OrderByExpr,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -150,7 +141,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
                             "nullExpr" | "null_expr" => 
Ok(GeneratedField::NullExpr),
                             "groups" => Ok(GeneratedField::Groups),
                             "filterExpr" | "filter_expr" => 
Ok(GeneratedField::FilterExpr),
-                            "orderByExpr" | "order_by_expr" => 
Ok(GeneratedField::OrderByExpr),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -180,7 +170,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
                 let mut null_expr__ = None;
                 let mut groups__ = None;
                 let mut filter_expr__ = None;
-                let mut order_by_expr__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::GroupExpr => {
@@ -243,12 +232,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
                             }
                             filter_expr__ = Some(map_.next_value()?);
                         }
-                        GeneratedField::OrderByExpr => {
-                            if order_by_expr__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("orderByExpr"));
-                            }
-                            order_by_expr__ = Some(map_.next_value()?);
-                        }
                     }
                 }
                 Ok(AggregateExecNode {
@@ -262,7 +245,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
                     null_expr: null_expr__.unwrap_or_default(),
                     groups: groups__.unwrap_or_default(),
                     filter_expr: filter_expr__.unwrap_or_default(),
-                    order_by_expr: order_by_expr__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 8aadc96349..3dfd393861 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2193,8 +2193,6 @@ pub struct AggregateExecNode {
     pub groups: ::prost::alloc::vec::Vec<bool>,
     #[prost(message, repeated, tag = "10")]
     pub filter_expr: ::prost::alloc::vec::Vec<MaybeFilter>,
-    #[prost(message, repeated, tag = "11")]
-    pub order_by_expr: ::prost::alloc::vec::Vec<MaybePhysicalSortExprs>,
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 73091a6fce..df01097cfa 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -427,19 +427,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                             .transpose()
                     })
                     .collect::<Result<Vec<_>, _>>()?;
-                let physical_order_by_expr = hash_agg
-                    .order_by_expr
-                    .iter()
-                    .map(|expr| {
-                        expr.sort_expr
-                            .iter()
-                            .map(|e| {
-                                parse_physical_sort_expr(e, registry, 
&physical_schema)
-                            })
-                            .collect::<Result<Vec<_>>>()
-                            .map(|exprs| (!exprs.is_empty()).then_some(exprs))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
 
                 let physical_aggr_expr: Vec<Arc<dyn AggregateExpr>> = hash_agg
                     .aggr_expr
@@ -498,7 +485,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     PhysicalGroupBy::new(group_expr, null_expr, groups),
                     physical_aggr_expr,
                     physical_filter_expr,
-                    physical_order_by_expr,
                     input,
                     Arc::new(input_schema.try_into()?),
                 )?))
@@ -1237,12 +1223,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                 .map(|expr| expr.to_owned().try_into())
                 .collect::<Result<Vec<_>>>()?;
 
-            let order_by = exec
-                .order_by_expr()
-                .iter()
-                .map(|expr| expr.to_owned().try_into())
-                .collect::<Result<Vec<_>>>()?;
-
             let agg = exec
                 .aggr_expr()
                 .iter()
@@ -1295,7 +1275,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                         group_expr_name: group_names,
                         aggr_expr: agg,
                         filter_expr: filter,
-                        order_by_expr: order_by,
                         aggr_expr_name: agg_names,
                         mode: agg_mode as i32,
                         input: Some(Box::new(input)),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index da76209dbb..4a512413e7 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -311,7 +311,6 @@ fn rountrip_aggregate() -> Result<()> {
         PhysicalGroupBy::new_single(groups.clone()),
         aggregates.clone(),
         vec![None],
-        vec![None],
         Arc::new(EmptyExec::new(schema.clone())),
         schema,
     )?))
@@ -379,7 +378,6 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
             PhysicalGroupBy::new_single(groups.clone()),
             aggregates.clone(),
             vec![None],
-            vec![None],
             Arc::new(EmptyExec::new(schema.clone())),
             schema,
         )?),
@@ -594,7 +592,6 @@ fn roundtrip_distinct_count() -> Result<()> {
         PhysicalGroupBy::new_single(groups),
         aggregates.clone(),
         vec![None],
-        vec![None],
         Arc::new(EmptyExec::new(schema.clone())),
         schema,
     )?))

Reply via email to