jayzhan211 commented on code in PR #10554:
URL: https://github.com/apache/datafusion/pull/10554#discussion_r1604805920


##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -194,282 +194,3 @@ fn discard_column_index(group_expr: Arc<dyn 
PhysicalExpr>) -> Arc<dyn PhysicalEx
         .data()
         .unwrap_or(group_expr)
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::datasource::listing::PartitionedFile;
-    use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
-    use crate::physical_plan::expressions::lit;
-    use crate::physical_plan::repartition::RepartitionExec;
-    use crate::physical_plan::{displayable, Partitioning, Statistics};
-
-    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use datafusion_physical_expr::expressions::{col, Count, Sum};
-
-    /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan 
against the expected
-    macro_rules! assert_optimized {
-        ($EXPECTED_LINES: expr, $PLAN: expr) => {
-            let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| 
*s).collect();
-
-            // run optimizer
-            let optimizer = CombinePartialFinalAggregate {};
-            let config = ConfigOptions::new();
-            let optimized = optimizer.optimize($PLAN, &config)?;
-            // Now format correctly
-            let plan = 
displayable(optimized.as_ref()).indent(true).to_string();
-            let actual_lines = trim_plan_display(&plan);
-
-            assert_eq!(
-                &expected_lines, &actual_lines,
-                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-                expected_lines, actual_lines
-            );
-        };
-    }
-
-    fn trim_plan_display(plan: &str) -> Vec<&str> {
-        plan.split('\n')
-            .map(|s| s.trim())
-            .filter(|s| !s.is_empty())
-            .collect()
-    }
-
-    fn schema() -> SchemaRef {
-        Arc::new(Schema::new(vec![
-            Field::new("a", DataType::Int64, true),
-            Field::new("b", DataType::Int64, true),
-            Field::new("c", DataType::Int64, true),
-        ]))
-    }
-
-    fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
-        Arc::new(ParquetExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(schema),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
-            None,
-            None,
-            Default::default(),
-        ))
-    }
-
-    fn partial_aggregate_exec(
-        input: Arc<dyn ExecutionPlan>,
-        group_by: PhysicalGroupBy,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    ) -> Arc<dyn ExecutionPlan> {
-        let schema = input.schema();
-        let n_aggr = aggr_expr.len();
-        Arc::new(
-            AggregateExec::try_new(
-                AggregateMode::Partial,
-                group_by,
-                aggr_expr,
-                vec![None; n_aggr],
-                input,
-                schema,
-            )
-            .unwrap(),
-        )
-    }
-
-    fn final_aggregate_exec(
-        input: Arc<dyn ExecutionPlan>,
-        group_by: PhysicalGroupBy,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    ) -> Arc<dyn ExecutionPlan> {
-        let schema = input.schema();
-        let n_aggr = aggr_expr.len();
-        Arc::new(
-            AggregateExec::try_new(
-                AggregateMode::Final,
-                group_by,
-                aggr_expr,
-                vec![None; n_aggr],
-                input,
-                schema,
-            )
-            .unwrap(),
-        )
-    }
-
-    fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
-        Arc::new(
-            RepartitionExec::try_new(input, 
Partitioning::RoundRobinBatch(10)).unwrap(),
-        )
-    }
-
-    #[test]
-    fn aggregations_not_combined() -> Result<()> {
-        let schema = schema();
-
-        let aggr_expr = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        )) as _];
-        let plan = final_aggregate_exec(
-            repartition_exec(partial_aggregate_exec(
-                parquet_exec(&schema),
-                PhysicalGroupBy::default(),
-                aggr_expr.clone(),
-            )),
-            PhysicalGroupBy::default(),
-            aggr_expr,
-        );
-        // should not combine the Partial/Final AggregateExecs
-        let expected = &[
-            "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
-        ];
-        assert_optimized!(expected, plan);
-
-        let aggr_expr1 = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        )) as _];
-        let aggr_expr2 = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(2)".to_string(),
-            DataType::Int64,
-        )) as _];
-
-        let plan = final_aggregate_exec(
-            partial_aggregate_exec(
-                parquet_exec(&schema),
-                PhysicalGroupBy::default(),
-                aggr_expr1,
-            ),
-            PhysicalGroupBy::default(),
-            aggr_expr2,
-        );
-        // should not combine the Partial/Final AggregateExecs
-        let expected = &[
-            "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
-            "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
-        ];
-
-        assert_optimized!(expected, plan);
-
-        Ok(())
-    }
-
-    #[test]
-    fn aggregations_combined() -> Result<()> {
-        let schema = schema();
-        let aggr_expr = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        )) as _];
-
-        let plan = final_aggregate_exec(
-            partial_aggregate_exec(
-                parquet_exec(&schema),
-                PhysicalGroupBy::default(),
-                aggr_expr.clone(),
-            ),
-            PhysicalGroupBy::default(),
-            aggr_expr,
-        );
-        // should combine the Partial/Final AggregateExecs to tne Single 
AggregateExec
-        let expected = &[
-            "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
-        ];
-
-        assert_optimized!(expected, plan);
-        Ok(())
-    }
-
-    #[test]
-    fn aggregations_with_group_combined() -> Result<()> {

Review Comment:
   group by and limit tests are not ported because I didn't find a query that 
has a result of `combine aggregate`.
   But I think it is fine because there is at least one query covers the test 
of optimization here, and there is no specific optimization based on group by 
or limit.



##########
datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs:
##########
@@ -194,282 +194,3 @@ fn discard_column_index(group_expr: Arc<dyn 
PhysicalExpr>) -> Arc<dyn PhysicalEx
         .data()
         .unwrap_or(group_expr)
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::datasource::listing::PartitionedFile;
-    use crate::datasource::object_store::ObjectStoreUrl;
-    use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
-    use crate::physical_plan::expressions::lit;
-    use crate::physical_plan::repartition::RepartitionExec;
-    use crate::physical_plan::{displayable, Partitioning, Statistics};
-
-    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use datafusion_physical_expr::expressions::{col, Count, Sum};
-
-    /// Runs the CombinePartialFinalAggregate optimizer and asserts the plan 
against the expected
-    macro_rules! assert_optimized {
-        ($EXPECTED_LINES: expr, $PLAN: expr) => {
-            let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| 
*s).collect();
-
-            // run optimizer
-            let optimizer = CombinePartialFinalAggregate {};
-            let config = ConfigOptions::new();
-            let optimized = optimizer.optimize($PLAN, &config)?;
-            // Now format correctly
-            let plan = 
displayable(optimized.as_ref()).indent(true).to_string();
-            let actual_lines = trim_plan_display(&plan);
-
-            assert_eq!(
-                &expected_lines, &actual_lines,
-                "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-                expected_lines, actual_lines
-            );
-        };
-    }
-
-    fn trim_plan_display(plan: &str) -> Vec<&str> {
-        plan.split('\n')
-            .map(|s| s.trim())
-            .filter(|s| !s.is_empty())
-            .collect()
-    }
-
-    fn schema() -> SchemaRef {
-        Arc::new(Schema::new(vec![
-            Field::new("a", DataType::Int64, true),
-            Field::new("b", DataType::Int64, true),
-            Field::new("c", DataType::Int64, true),
-        ]))
-    }
-
-    fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
-        Arc::new(ParquetExec::new(
-            FileScanConfig {
-                object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
-                file_schema: schema.clone(),
-                file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
-                statistics: Statistics::new_unknown(schema),
-                projection: None,
-                limit: None,
-                table_partition_cols: vec![],
-                output_ordering: vec![],
-            },
-            None,
-            None,
-            Default::default(),
-        ))
-    }
-
-    fn partial_aggregate_exec(
-        input: Arc<dyn ExecutionPlan>,
-        group_by: PhysicalGroupBy,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    ) -> Arc<dyn ExecutionPlan> {
-        let schema = input.schema();
-        let n_aggr = aggr_expr.len();
-        Arc::new(
-            AggregateExec::try_new(
-                AggregateMode::Partial,
-                group_by,
-                aggr_expr,
-                vec![None; n_aggr],
-                input,
-                schema,
-            )
-            .unwrap(),
-        )
-    }
-
-    fn final_aggregate_exec(
-        input: Arc<dyn ExecutionPlan>,
-        group_by: PhysicalGroupBy,
-        aggr_expr: Vec<Arc<dyn AggregateExpr>>,
-    ) -> Arc<dyn ExecutionPlan> {
-        let schema = input.schema();
-        let n_aggr = aggr_expr.len();
-        Arc::new(
-            AggregateExec::try_new(
-                AggregateMode::Final,
-                group_by,
-                aggr_expr,
-                vec![None; n_aggr],
-                input,
-                schema,
-            )
-            .unwrap(),
-        )
-    }
-
-    fn repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
-        Arc::new(
-            RepartitionExec::try_new(input, 
Partitioning::RoundRobinBatch(10)).unwrap(),
-        )
-    }
-
-    #[test]
-    fn aggregations_not_combined() -> Result<()> {
-        let schema = schema();
-
-        let aggr_expr = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        )) as _];
-        let plan = final_aggregate_exec(
-            repartition_exec(partial_aggregate_exec(
-                parquet_exec(&schema),
-                PhysicalGroupBy::default(),
-                aggr_expr.clone(),
-            )),
-            PhysicalGroupBy::default(),
-            aggr_expr,
-        );
-        // should not combine the Partial/Final AggregateExecs
-        let expected = &[
-            "AggregateExec: mode=Final, gby=[], aggr=[COUNT(1)]",
-            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
-        ];
-        assert_optimized!(expected, plan);
-
-        let aggr_expr1 = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        )) as _];
-        let aggr_expr2 = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(2)".to_string(),
-            DataType::Int64,
-        )) as _];
-
-        let plan = final_aggregate_exec(
-            partial_aggregate_exec(
-                parquet_exec(&schema),
-                PhysicalGroupBy::default(),
-                aggr_expr1,
-            ),
-            PhysicalGroupBy::default(),
-            aggr_expr2,
-        );
-        // should not combine the Partial/Final AggregateExecs
-        let expected = &[
-            "AggregateExec: mode=Final, gby=[], aggr=[COUNT(2)]",
-            "AggregateExec: mode=Partial, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
-        ];
-
-        assert_optimized!(expected, plan);
-
-        Ok(())
-    }
-
-    #[test]
-    fn aggregations_combined() -> Result<()> {
-        let schema = schema();
-        let aggr_expr = vec![Arc::new(Count::new(
-            lit(1i8),
-            "COUNT(1)".to_string(),
-            DataType::Int64,
-        )) as _];
-
-        let plan = final_aggregate_exec(
-            partial_aggregate_exec(
-                parquet_exec(&schema),
-                PhysicalGroupBy::default(),
-                aggr_expr.clone(),
-            ),
-            PhysicalGroupBy::default(),
-            aggr_expr,
-        );
-        // should combine the Partial/Final AggregateExecs to tne Single 
AggregateExec
-        let expected = &[
-            "AggregateExec: mode=Single, gby=[], aggr=[COUNT(1)]",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c]",
-        ];
-
-        assert_optimized!(expected, plan);
-        Ok(())
-    }
-
-    #[test]
-    fn aggregations_with_group_combined() -> Result<()> {

Review Comment:
   group by and limit tests are not ported because I didn't find a query that 
has a result of `combine aggregate`.
   But I think it is fine because there is at least one query that covers the 
test of optimization here, and there is no specific optimization based on group 
by or limit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to