This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d67c0bbecd Remove order_bys from AggregateExec state (#8537)
d67c0bbecd is described below
commit d67c0bbecd8f32049de2c931c077a66ed640413a
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Dec 14 23:15:15 2023 +0300
Remove order_bys from AggregateExec state (#8537)
* Initial commit
* Remove order by from aggregate exec state
---
.../src/physical_optimizer/aggregate_statistics.rs | 12 ----------
.../combine_partial_final_agg.rs | 4 ----
.../src/physical_optimizer/enforce_distribution.rs | 4 ----
.../limited_distinct_aggregation.rs | 25 ++++---------------
.../core/src/physical_optimizer/test_utils.rs | 1 -
.../src/physical_optimizer/topk_aggregation.rs | 1 -
datafusion/core/src/physical_planner.rs | 5 +---
datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs | 2 --
datafusion/physical-plan/src/aggregates/mod.rs | 28 ++++------------------
datafusion/physical-plan/src/limit.rs | 1 -
datafusion/proto/proto/datafusion.proto | 1 -
datafusion/proto/src/generated/pbjson.rs | 18 --------------
datafusion/proto/src/generated/prost.rs | 2 --
datafusion/proto/src/physical_plan/mod.rs | 21 ----------------
.../proto/tests/cases/roundtrip_physical_plan.rs | 3 ---
15 files changed, 9 insertions(+), 119 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
index 795857b10e..86a8cdb7b3 100644
--- a/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
+++ b/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
@@ -397,7 +397,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
source,
Arc::clone(&schema),
)?;
@@ -407,7 +406,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
@@ -429,7 +427,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
source,
Arc::clone(&schema),
)?;
@@ -439,7 +436,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
@@ -460,7 +456,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
source,
Arc::clone(&schema),
)?;
@@ -473,7 +468,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
@@ -494,7 +488,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
source,
Arc::clone(&schema),
)?;
@@ -507,7 +500,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
Arc::new(coalesce),
Arc::clone(&schema),
)?;
@@ -539,7 +531,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
filter,
Arc::clone(&schema),
)?;
@@ -549,7 +540,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
@@ -586,7 +576,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
filter,
Arc::clone(&schema),
)?;
@@ -596,7 +585,6 @@ pub(crate) mod tests {
PhysicalGroupBy::default(),
vec![agg.count_expr()],
vec![None],
- vec![None],
Arc::new(partial_agg),
Arc::clone(&schema),
)?;
diff --git
a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index 0948445de2..c50ea36b68 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -91,7 +91,6 @@ impl PhysicalOptimizerRule for CombinePartialFinalAggregate {
input_agg_exec.group_by().clone(),
input_agg_exec.aggr_expr().to_vec(),
input_agg_exec.filter_expr().to_vec(),
-
input_agg_exec.order_by_expr().to_vec(),
input_agg_exec.input().clone(),
input_agg_exec.input_schema(),
)
@@ -277,7 +276,6 @@ mod tests {
group_by,
aggr_expr,
vec![],
- vec![],
input,
schema,
)
@@ -297,7 +295,6 @@ mod tests {
group_by,
aggr_expr,
vec![],
- vec![],
input,
schema,
)
@@ -458,7 +455,6 @@ mod tests {
final_group_by,
aggr_expr,
vec![],
- vec![],
partial_agg,
schema,
)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 93cdbf8583..f2e04989ef 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -521,7 +521,6 @@ fn reorder_aggregate_keys(
new_partial_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
- agg_exec.order_by_expr().to_vec(),
agg_exec.input().clone(),
agg_exec.input_schema.clone(),
)?))
@@ -548,7 +547,6 @@ fn reorder_aggregate_keys(
new_group_by,
agg_exec.aggr_expr().to_vec(),
agg_exec.filter_expr().to_vec(),
- agg_exec.order_by_expr().to_vec(),
partial_agg,
agg_exec.input_schema(),
)?);
@@ -1909,14 +1907,12 @@ pub(crate) mod tests {
final_grouping,
vec![],
vec![],
- vec![],
Arc::new(
AggregateExec::try_new(
AggregateMode::Partial,
group_by,
vec![],
vec![],
- vec![],
input,
schema.clone(),
)
diff --git
a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
index 8f5dbc2e92..540f9a6a13 100644
--- a/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/limited_distinct_aggregation.rs
@@ -55,7 +55,6 @@ impl LimitedDistinctAggregation {
aggr.group_by().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
- aggr.order_by_expr().to_vec(),
aggr.input().clone(),
aggr.input_schema(),
)
@@ -307,7 +306,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -316,7 +314,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
Arc::new(partial_agg), /* input */
schema.clone(), /* input_schema */
)?;
@@ -359,7 +356,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -401,7 +397,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -443,7 +438,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string(),
"b".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -452,7 +446,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
Arc::new(group_by_agg), /* input */
schema.clone(), /* input_schema */
)?;
@@ -495,7 +488,6 @@ mod tests {
build_group_by(&schema.clone(), vec![]),
vec![], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -526,7 +518,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![agg.count_expr()], /* aggr_expr */
vec![None], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -563,7 +554,6 @@ mod tests {
build_group_by(&schema.clone(), vec!["a".to_string()]),
vec![], /* aggr_expr */
vec![filter_expr], /* filter_expr */
- vec![None], /* order_by_expr */
source, /* input */
schema.clone(), /* input_schema */
)?;
@@ -592,22 +582,15 @@ mod tests {
let source = parquet_exec_with_sort(vec![sort_key]);
let schema = source.schema();
- // `SELECT a FROM MemoryExec GROUP BY a ORDER BY a LIMIT 10;`, Single
AggregateExec
- let order_by_expr = Some(vec![PhysicalSortExpr {
- expr: expressions::col("a", &schema.clone()).unwrap(),
- options: SortOptions::default(),
- }]);
-
// `SELECT a FROM MemoryExec WHERE a > 1 GROUP BY a LIMIT 10;`, Single
AggregateExec
// the `a > 1` filter is applied in the AggregateExec
let single_agg = AggregateExec::try_new(
AggregateMode::Single,
build_group_by(&schema.clone(), vec!["a".to_string()]),
- vec![], /* aggr_expr */
- vec![None], /* filter_expr */
- vec![order_by_expr], /* order_by_expr */
- source, /* input */
- schema.clone(), /* input_schema */
+ vec![], /* aggr_expr */
+ vec![None], /* filter_expr */
+ source, /* input */
+ schema.clone(), /* input_schema */
)?;
let limit_exec = LocalLimitExec::new(
Arc::new(single_agg),
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index 37a76eff1e..678dc1f373 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -339,7 +339,6 @@ pub fn aggregate_exec(input: Arc<dyn ExecutionPlan>) ->
Arc<dyn ExecutionPlan> {
PhysicalGroupBy::default(),
vec![],
vec![],
- vec![],
input,
schema,
)
diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
index 52d34d4f81..dd02614203 100644
--- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs
+++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs
@@ -73,7 +73,6 @@ impl TopKAggregation {
aggr.group_by().clone(),
aggr.aggr_expr().to_vec(),
aggr.filter_expr().to_vec(),
- aggr.order_by_expr().to_vec(),
aggr.input().clone(),
aggr.input_schema(),
)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 93f0b31e52..e5816eb49e 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -795,14 +795,13 @@ impl DefaultPhysicalPlanner {
})
.collect::<Result<Vec<_>>>()?;
- let (aggregates, filters, order_bys) : (Vec<_>, Vec<_>,
Vec<_>) = multiunzip(agg_filter);
+ let (aggregates, filters, _order_bys) : (Vec<_>, Vec<_>,
Vec<_>) = multiunzip(agg_filter);
let initial_aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups.clone(),
aggregates.clone(),
filters.clone(),
- order_bys,
input_exec,
physical_input_schema.clone(),
)?);
@@ -820,7 +819,6 @@ impl DefaultPhysicalPlanner {
// To reflect such changes to subsequent stages, use the
updated
// `AggregateExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();
- let updated_order_bys =
initial_aggr.order_by_expr().to_vec();
let next_partition_mode = if can_repartition {
// construct a second aggregation with
'AggregateMode::FinalPartitioned'
@@ -844,7 +842,6 @@ impl DefaultPhysicalPlanner {
final_grouping_set,
updated_aggregates,
filters,
- updated_order_bys,
initial_aggr,
physical_input_schema.clone(),
)?))
diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
index 821f236af8..9069dbbd58 100644
--- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
@@ -109,7 +109,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>,
group_by_columns: Vec<&str
group_by.clone(),
aggregate_expr.clone(),
vec![None],
- vec![None],
running_source,
schema.clone(),
)
@@ -122,7 +121,6 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>,
group_by_columns: Vec<&str
group_by.clone(),
aggregate_expr.clone(),
vec![None],
- vec![None],
usual_source,
schema.clone(),
)
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 2f69ed061c..c74c4ac0f8 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -279,8 +279,6 @@ pub struct AggregateExec {
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
/// FILTER (WHERE clause) expression for each aggregate expression
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
- /// (ORDER BY clause) expression for each aggregate expression
- order_by_expr: Vec<Option<LexOrdering>>,
/// Set if the output of this aggregation is truncated by a upstream
sort/limit clause
limit: Option<usize>,
/// Input plan, could be a partial aggregate or the input to the aggregate
@@ -468,8 +466,6 @@ impl AggregateExec {
group_by: PhysicalGroupBy,
mut aggr_expr: Vec<Arc<dyn AggregateExpr>>,
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
- // Ordering requirement of each aggregate expression
- mut order_by_expr: Vec<Option<LexOrdering>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
) -> Result<Self> {
@@ -487,10 +483,10 @@ impl AggregateExec {
));
let original_schema = Arc::new(original_schema);
// Reset ordering requirement to `None` if aggregator is not
order-sensitive
- order_by_expr = aggr_expr
+ let mut order_by_expr = aggr_expr
.iter()
- .zip(order_by_expr)
- .map(|(aggr_expr, fn_reqs)| {
+ .map(|aggr_expr| {
+ let fn_reqs = aggr_expr.order_bys().map(|ordering|
ordering.to_vec());
// If
// - aggregation function is order-sensitive and
// - aggregation is performing a "first stage" calculation, and
@@ -558,7 +554,6 @@ impl AggregateExec {
group_by,
aggr_expr,
filter_expr,
- order_by_expr,
input,
original_schema,
schema,
@@ -602,11 +597,6 @@ impl AggregateExec {
&self.filter_expr
}
- /// ORDER BY clause expression for each aggregate expression
- pub fn order_by_expr(&self) -> &[Option<LexOrdering>] {
- &self.order_by_expr
- }
-
/// Input plan
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
@@ -684,7 +674,7 @@ impl AggregateExec {
return false;
}
// ensure there are no order by expressions
- if self.order_by_expr().iter().any(|e| e.is_some()) {
+ if self.aggr_expr().iter().any(|e| e.order_bys().is_some()) {
return false;
}
// ensure there is no output ordering; can this rule be relaxed?
@@ -873,7 +863,6 @@ impl ExecutionPlan for AggregateExec {
self.group_by.clone(),
self.aggr_expr.clone(),
self.filter_expr.clone(),
- self.order_by_expr.clone(),
children[0].clone(),
self.input_schema.clone(),
)?;
@@ -1395,7 +1384,6 @@ mod tests {
grouping_set.clone(),
aggregates.clone(),
vec![None],
- vec![None],
input,
input_schema.clone(),
)?);
@@ -1474,7 +1462,6 @@ mod tests {
final_grouping_set,
aggregates,
vec![None],
- vec![None],
merge,
input_schema,
)?);
@@ -1540,7 +1527,6 @@ mod tests {
grouping_set.clone(),
aggregates.clone(),
vec![None],
- vec![None],
input,
input_schema.clone(),
)?);
@@ -1588,7 +1574,6 @@ mod tests {
final_grouping_set,
aggregates,
vec![None],
- vec![None],
merge,
input_schema,
)?);
@@ -1855,7 +1840,6 @@ mod tests {
groups,
aggregates,
vec![None; 3],
- vec![None; 3],
input.clone(),
input_schema.clone(),
)?);
@@ -1911,7 +1895,6 @@ mod tests {
groups.clone(),
aggregates.clone(),
vec![None],
- vec![None],
blocking_exec,
schema,
)?);
@@ -1950,7 +1933,6 @@ mod tests {
groups,
aggregates.clone(),
vec![None],
- vec![None],
blocking_exec,
schema,
)?);
@@ -2052,7 +2034,6 @@ mod tests {
groups.clone(),
aggregates.clone(),
vec![None],
- vec![Some(ordering_req.clone())],
memory_exec,
schema.clone(),
)?);
@@ -2068,7 +2049,6 @@ mod tests {
groups,
aggregates.clone(),
vec![None],
- vec![Some(ordering_req)],
coalesce,
schema,
)?) as Arc<dyn ExecutionPlan>;
diff --git a/datafusion/physical-plan/src/limit.rs
b/datafusion/physical-plan/src/limit.rs
index 355561c36f..37e8ffd761 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -878,7 +878,6 @@ mod tests {
build_group_by(&csv.schema().clone(), vec!["i".to_string()]),
vec![],
vec![None],
- vec![None],
csv.clone(),
csv.schema().clone(),
)?;
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index f391592dfe..bd8053c817 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1553,7 +1553,6 @@ message AggregateExecNode {
repeated PhysicalExprNode null_expr = 8;
repeated bool groups = 9;
repeated MaybeFilter filter_expr = 10;
- repeated MaybePhysicalSortExprs order_by_expr = 11;
}
message GlobalLimitExecNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index d506b5dcce..88310be031 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -36,9 +36,6 @@ impl serde::Serialize for AggregateExecNode {
if !self.filter_expr.is_empty() {
len += 1;
}
- if !self.order_by_expr.is_empty() {
- len += 1;
- }
let mut struct_ser =
serializer.serialize_struct("datafusion.AggregateExecNode", len)?;
if !self.group_expr.is_empty() {
struct_ser.serialize_field("groupExpr", &self.group_expr)?;
@@ -72,9 +69,6 @@ impl serde::Serialize for AggregateExecNode {
if !self.filter_expr.is_empty() {
struct_ser.serialize_field("filterExpr", &self.filter_expr)?;
}
- if !self.order_by_expr.is_empty() {
- struct_ser.serialize_field("orderByExpr", &self.order_by_expr)?;
- }
struct_ser.end()
}
}
@@ -102,8 +96,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
"groups",
"filter_expr",
"filterExpr",
- "order_by_expr",
- "orderByExpr",
];
#[allow(clippy::enum_variant_names)]
@@ -118,7 +110,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
NullExpr,
Groups,
FilterExpr,
- OrderByExpr,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -150,7 +141,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
"nullExpr" | "null_expr" =>
Ok(GeneratedField::NullExpr),
"groups" => Ok(GeneratedField::Groups),
"filterExpr" | "filter_expr" =>
Ok(GeneratedField::FilterExpr),
- "orderByExpr" | "order_by_expr" =>
Ok(GeneratedField::OrderByExpr),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -180,7 +170,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
let mut null_expr__ = None;
let mut groups__ = None;
let mut filter_expr__ = None;
- let mut order_by_expr__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::GroupExpr => {
@@ -243,12 +232,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
}
filter_expr__ = Some(map_.next_value()?);
}
- GeneratedField::OrderByExpr => {
- if order_by_expr__.is_some() {
- return
Err(serde::de::Error::duplicate_field("orderByExpr"));
- }
- order_by_expr__ = Some(map_.next_value()?);
- }
}
}
Ok(AggregateExecNode {
@@ -262,7 +245,6 @@ impl<'de> serde::Deserialize<'de> for AggregateExecNode {
null_expr: null_expr__.unwrap_or_default(),
groups: groups__.unwrap_or_default(),
filter_expr: filter_expr__.unwrap_or_default(),
- order_by_expr: order_by_expr__.unwrap_or_default(),
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 8aadc96349..3dfd393861 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2193,8 +2193,6 @@ pub struct AggregateExecNode {
pub groups: ::prost::alloc::vec::Vec<bool>,
#[prost(message, repeated, tag = "10")]
pub filter_expr: ::prost::alloc::vec::Vec<MaybeFilter>,
- #[prost(message, repeated, tag = "11")]
- pub order_by_expr: ::prost::alloc::vec::Vec<MaybePhysicalSortExprs>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 73091a6fce..df01097cfa 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -427,19 +427,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
.transpose()
})
.collect::<Result<Vec<_>, _>>()?;
- let physical_order_by_expr = hash_agg
- .order_by_expr
- .iter()
- .map(|expr| {
- expr.sort_expr
- .iter()
- .map(|e| {
- parse_physical_sort_expr(e, registry,
&physical_schema)
- })
- .collect::<Result<Vec<_>>>()
- .map(|exprs| (!exprs.is_empty()).then_some(exprs))
- })
- .collect::<Result<Vec<_>>>()?;
let physical_aggr_expr: Vec<Arc<dyn AggregateExpr>> = hash_agg
.aggr_expr
@@ -498,7 +485,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
PhysicalGroupBy::new(group_expr, null_expr, groups),
physical_aggr_expr,
physical_filter_expr,
- physical_order_by_expr,
input,
Arc::new(input_schema.try_into()?),
)?))
@@ -1237,12 +1223,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
.map(|expr| expr.to_owned().try_into())
.collect::<Result<Vec<_>>>()?;
- let order_by = exec
- .order_by_expr()
- .iter()
- .map(|expr| expr.to_owned().try_into())
- .collect::<Result<Vec<_>>>()?;
-
let agg = exec
.aggr_expr()
.iter()
@@ -1295,7 +1275,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
group_expr_name: group_names,
aggr_expr: agg,
filter_expr: filter,
- order_by_expr: order_by,
aggr_expr_name: agg_names,
mode: agg_mode as i32,
input: Some(Box::new(input)),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index da76209dbb..4a512413e7 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -311,7 +311,6 @@ fn rountrip_aggregate() -> Result<()> {
PhysicalGroupBy::new_single(groups.clone()),
aggregates.clone(),
vec![None],
- vec![None],
Arc::new(EmptyExec::new(schema.clone())),
schema,
)?))
@@ -379,7 +378,6 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
PhysicalGroupBy::new_single(groups.clone()),
aggregates.clone(),
vec![None],
- vec![None],
Arc::new(EmptyExec::new(schema.clone())),
schema,
)?),
@@ -594,7 +592,6 @@ fn roundtrip_distinct_count() -> Result<()> {
PhysicalGroupBy::new_single(groups),
aggregates.clone(),
vec![None],
- vec![None],
Arc::new(EmptyExec::new(schema.clone())),
schema,
)?))