jayzhan211 commented on code in PR #10554:
URL: https://github.com/apache/datafusion/pull/10554#discussion_r1604805920
##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -194,282 +194,3 @@ fn discard_column_index(group_expr: Arc<dyn
PhysicalExpr>) -> Arc<dyn PhysicalEx
.data()
.unwrap_or(group_expr)
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::datasource::listing::PartitionedFile;
- use crate::datasource::object_store::ObjectStoreUrl;
- use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
- use crate::physical_plan::expressions::lit;
- use crate::physical_plan::repartition::RepartitionExec;
- use crate::physical_plan::{displayable, Partitioning, Statistics};
-
- use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use datafusion_physical_expr::expressions::{col, Count, Sum};
-
- /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan
against the expected
- macro_rules! assert_optimized {
- ($EXPECTED_LINES: expr, $PLAN: expr) => {
- let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
-
- // run optimizer
- let optimizer = CombinePartialFinalAggregate {};
- let config = ConfigOptions::new();
- let optimized = optimizer.optimize($PLAN, &config)?;
- // Now format correctly
- let plan =
displayable(optimized.as_ref()).indent(true).to_string();
- let actual_lines = trim_plan_display(&plan);
-
- assert_eq!(
- &expected_lines, &actual_lines,
- "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
- expected_lines, actual_lines
- );
- };
- }
-
- fn trim_plan_display(plan: &str) -> Vec<&str> {
- plan.split('\n')
- .map(|s| s.trim())
- .filter(|s| !s.is_empty())
- .collect()
- }
-
- fn schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int64, true),
- Field::new("b", DataType::Int64, true),
- Field::new("c", DataType::Int64, true),
- ]))
- }
-
- fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
- Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
- None,
- None,
- Default::default(),
- ))
- }
-
- fn partial_aggregate_exec(
- input: Arc<dyn ExecutionPlan>,
- group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- ) -> Arc<dyn ExecutionPlan> {
- let schema = input.schema();
- let n_aggr = aggr_expr.len();
- Arc::new(
- AggregateExec::try_new(
- AggregateMode::Partial,
- group_by,
- aggr_expr,
- vec![None; n_aggr],
- input,
- schema,
- )
- .unwrap(),
- )
- }
-
- fn final_aggregate_exec(
- input: Arc<dyn ExecutionPlan>,
- group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- ) -> Arc<dyn ExecutionPlan> {
- let schema = input.schema();
- let n_aggr = aggr_expr.len();
- Arc::new(
- AggregateExec::try_new(
- AggregateMode::Final,
- group_by,
- aggr_expr,
- vec![None; n_aggr],
- input,
- schema,
- )
- .unwrap(),
- )
- }
-
- fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
- Arc::new(
- RepartitionExec::try_new(input,
Partitioning::RoundRobinBatch(10)).unwrap(),
- )
- }
-
- #[test]
- fn aggregations_not_combined() -> Result<()> {
- let schema = schema();
-
- let aggr_expr = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(1)".to_string(),
- DataType::Int64,
- )) as _];
- let plan = final_aggregate_exec(
- repartition_exec(partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr.clone(),
- )),
- PhysicalGroupBy::default(),
- aggr_expr,
- );
- // should not combine the Partial/Final AggregateExecs
- let expected = &[
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
- assert_optimized!(expected, plan);
-
- let aggr_expr1 = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(1)".to_string(),
- DataType::Int64,
- )) as _];
- let aggr_expr2 = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(2)".to_string(),
- DataType::Int64,
- )) as _];
-
- let plan = final_aggregate_exec(
- partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr1,
- ),
- PhysicalGroupBy::default(),
- aggr_expr2,
- );
- // should not combine the Partial/Final AggregateExecs
- let expected = &[
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
- "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
-
- Ok(())
- }
-
- #[test]
- fn aggregations_combined() -> Result<()> {
- let schema = schema();
- let aggr_expr = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(1)".to_string(),
- DataType::Int64,
- )) as _];
-
- let plan = final_aggregate_exec(
- partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr.clone(),
- ),
- PhysicalGroupBy::default(),
- aggr_expr,
- );
- // should combine the Partial/Final AggregateExecs to tne Single
AggregateExec
- let expected = &[
- "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
- Ok(())
- }
-
- #[test]
- fn aggregations_with_group_combined() -> Result<()> {
Review Comment:
group by and limit tests are not ported because I didn't find a query that
has a result of `combine aggregate`.
But I think it is fine because there is at least one query covers the test
of optimization here, and there is no specific optimization based on group by
or limit.
##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -194,282 +194,3 @@ fn discard_column_index(group_expr: Arc<dyn
PhysicalExpr>) -> Arc<dyn PhysicalEx
.data()
.unwrap_or(group_expr)
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::datasource::listing::PartitionedFile;
- use crate::datasource::object_store::ObjectStoreUrl;
- use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
- use crate::physical_plan::expressions::lit;
- use crate::physical_plan::repartition::RepartitionExec;
- use crate::physical_plan::{displayable, Partitioning, Statistics};
-
- use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use datafusion_physical_expr::expressions::{col, Count, Sum};
-
- /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan
against the expected
- macro_rules! assert_optimized {
- ($EXPECTED_LINES: expr, $PLAN: expr) => {
- let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s|
*s).collect();
-
- // run optimizer
- let optimizer = CombinePartialFinalAggregate {};
- let config = ConfigOptions::new();
- let optimized = optimizer.optimize($PLAN, &config)?;
- // Now format correctly
- let plan =
displayable(optimized.as_ref()).indent(true).to_string();
- let actual_lines = trim_plan_display(&plan);
-
- assert_eq!(
- &expected_lines, &actual_lines,
- "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
- expected_lines, actual_lines
- );
- };
- }
-
- fn trim_plan_display(plan: &str) -> Vec<&str> {
- plan.split('\n')
- .map(|s| s.trim())
- .filter(|s| !s.is_empty())
- .collect()
- }
-
- fn schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int64, true),
- Field::new("b", DataType::Int64, true),
- Field::new("c", DataType::Int64, true),
- ]))
- }
-
- fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
- Arc::new(ParquetExec::new(
- FileScanConfig {
- object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
- statistics: Statistics::new_unknown(schema),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- output_ordering: vec![],
- },
- None,
- None,
- Default::default(),
- ))
- }
-
- fn partial_aggregate_exec(
- input: Arc<dyn ExecutionPlan>,
- group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- ) -> Arc<dyn ExecutionPlan> {
- let schema = input.schema();
- let n_aggr = aggr_expr.len();
- Arc::new(
- AggregateExec::try_new(
- AggregateMode::Partial,
- group_by,
- aggr_expr,
- vec![None; n_aggr],
- input,
- schema,
- )
- .unwrap(),
- )
- }
-
- fn final_aggregate_exec(
- input: Arc<dyn ExecutionPlan>,
- group_by: PhysicalGroupBy,
- aggr_expr: Vec<Arc<dyn AggregateExpr>>,
- ) -> Arc<dyn ExecutionPlan> {
- let schema = input.schema();
- let n_aggr = aggr_expr.len();
- Arc::new(
- AggregateExec::try_new(
- AggregateMode::Final,
- group_by,
- aggr_expr,
- vec![None; n_aggr],
- input,
- schema,
- )
- .unwrap(),
- )
- }
-
- fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
- Arc::new(
- RepartitionExec::try_new(input,
Partitioning::RoundRobinBatch(10)).unwrap(),
- )
- }
-
- #[test]
- fn aggregations_not_combined() -> Result<()> {
- let schema = schema();
-
- let aggr_expr = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(1)".to_string(),
- DataType::Int64,
- )) as _];
- let plan = final_aggregate_exec(
- repartition_exec(partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr.clone(),
- )),
- PhysicalGroupBy::default(),
- aggr_expr,
- );
- // should not combine the Partial/Final AggregateExecs
- let expected = &[
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
- "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
- assert_optimized!(expected, plan);
-
- let aggr_expr1 = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(1)".to_string(),
- DataType::Int64,
- )) as _];
- let aggr_expr2 = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(2)".to_string(),
- DataType::Int64,
- )) as _];
-
- let plan = final_aggregate_exec(
- partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr1,
- ),
- PhysicalGroupBy::default(),
- aggr_expr2,
- );
- // should not combine the Partial/Final AggregateExecs
- let expected = &[
- "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
- "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
-
- Ok(())
- }
-
- #[test]
- fn aggregations_combined() -> Result<()> {
- let schema = schema();
- let aggr_expr = vec![Arc::new(Count::new(
- lit(1i8),
- "COUNT(1)".to_string(),
- DataType::Int64,
- )) as _];
-
- let plan = final_aggregate_exec(
- partial_aggregate_exec(
- parquet_exec(&schema),
- PhysicalGroupBy::default(),
- aggr_expr.clone(),
- ),
- PhysicalGroupBy::default(),
- aggr_expr,
- );
- // should combine the Partial/Final AggregateExecs to tne Single
AggregateExec
- let expected = &[
- "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
- ];
-
- assert_optimized!(expected, plan);
- Ok(())
- }
-
- #[test]
- fn aggregations_with_group_combined() -> Result<()> {
Review Comment:
group by and limit tests are not ported because I didn't find a query that
has a result of `combine aggregate`.
But I think it is fine because there is at least one query that covers the
test of optimization here, and there is no specific optimization based on group
by or limit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]