alamb commented on code in PR #2716:
URL: https://github.com/apache/arrow-datafusion/pull/2716#discussion_r895577319
##########
datafusion/core/src/physical_optimizer/aggregate_statistics.rs:
##########
@@ -265,7 +265,7 @@ mod tests {
use crate::error::Result;
use crate::logical_plan::Operator;
- use crate::physical_plan::aggregates::AggregateExec;
+ use crate::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
Review Comment:
Love the new name `PhysicalGroupBy`
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -65,13 +66,60 @@ pub enum AggregateMode {
FinalPartitioned,
}
+/// Represents `GROUP BY` clause in the plan (including the more general
GROUPING SET)
Review Comment:
Thank you -- this is super helpful
##########
datafusion/optimizer/src/single_distinct_to_groupby.rs:
##########
@@ -212,6 +224,9 @@ mod tests {
let optimized_plan = rule
.optimize(plan, &OptimizerConfig::new())
.expect("failed to optimize plan");
+
+ println!("{:?}", optimized_plan);
Review Comment:
left over?
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1346,6 +1616,74 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_create_cube_expr() -> Result<()> {
+ let logical_plan = test_csv_scan()
+ .await?
+ .project(vec![col("c1"), col("c2"), col("c3")])?
+ .aggregate(
+ vec![cube(vec![col("c1"), col("c2"), col("c3")])],
+ vec![sum(col("c2"))],
+ )?
Review Comment:
I don't understand the need to creating the `aggregate` on the logical plan
(as then new cube expressions are planned below). Can you simply use the output
of the `project` plan?
The same question applies to the other plans below
##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -117,14 +171,16 @@ impl AggregateExec {
/// Grouping expressions
pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
- &self.group_expr
+ // TODO Is this right?
Review Comment:
I don't think so -- this seems to be used by the "use statistics instead of
aggregates" optimization
```
/Users/alamb/Software/arrow-datafusion/datafusion/core/src/physical_optimizer/aggregate_statistics.rs
113: && final_agg_exec.group_expr().is_empty()
121: && partial_agg_exec.group_expr().is_empty()
/Users/alamb/Software/arrow-datafusion/datafusion/core/src/physical_plan/aggregates/mod.rs
728: let groups = partial_aggregate.group_expr().to_vec();
```
In general, it might make sense to disable / skip all such optimizations in
the cases of grouping sets / cube / rollup -- that would be the conservative
approach and avoid potential subtle wrong answer bugs. As the feature is used
more and people have a need to optimize it more, we can revisit the
optimizations and make sure they are relevant to grouping sets
##########
datafusion/optimizer/src/single_distinct_to_groupby.rs:
##########
@@ -62,9 +63,11 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
schema,
group_expr,
}) => {
- if is_single_distinct_agg(plan) {
+ if is_single_distinct_agg(plan) &&
!contains_grouping_set(group_expr) {
Review Comment:
I think this is a good idea -- to skip grouping sets in optimizations
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -574,11 +569,12 @@ impl DefaultPhysicalPlanner {
// TODO: dictionary type not yet supported in Hash
Repartition
let contains_dict = groups
+ .expr
Review Comment:
I think it is a minor thing, but one might imagine keeping the fields of
`PhysicalGroupBy` private and adding functions like `fn expr()` and `fn
is_empty()` mostly as a way of additional documentation
##########
datafusion/optimizer/src/single_distinct_to_groupby.rs:
##########
@@ -160,6 +166,7 @@ fn optimize_children(plan: &LogicalPlan) ->
Result<LogicalPlan> {
}
fn is_single_distinct_agg(plan: &LogicalPlan) -> bool {
+ // false
Review Comment:
Left over?
##########
datafusion/core/tests/sql/aggregates.rs:
##########
@@ -1223,6 +1669,79 @@ async fn count_aggregated() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn count_aggregated_cube() -> Result<()> {
+ let results = execute_with_partition(
+ "SELECT c1, c2, COUNT(c3) FROM test GROUP BY CUBE (c1, c2) ORDER BY
c1, c2",
+ 4,
+ )
+ .await?;
+
+ let expected = vec![
+ "+----+----+----------------+",
+ "| c1 | c2 | COUNT(test.c3) |",
+ "+----+----+----------------+",
+ "| | | 40 |",
+ "| | 1 | 4 |",
+ "| | 10 | 4 |",
+ "| | 2 | 4 |",
+ "| | 3 | 4 |",
+ "| | 4 | 4 |",
+ "| | 5 | 4 |",
+ "| | 6 | 4 |",
+ "| | 7 | 4 |",
+ "| | 8 | 4 |",
+ "| | 9 | 4 |",
+ "| 0 | | 10 |",
+ "| 0 | 1 | 1 |",
+ "| 0 | 10 | 1 |",
+ "| 0 | 2 | 1 |",
+ "| 0 | 3 | 1 |",
+ "| 0 | 4 | 1 |",
+ "| 0 | 5 | 1 |",
+ "| 0 | 6 | 1 |",
+ "| 0 | 7 | 1 |",
+ "| 0 | 8 | 1 |",
+ "| 0 | 9 | 1 |",
+ "| 1 | | 10 |",
+ "| 1 | 1 | 1 |",
+ "| 1 | 10 | 1 |",
+ "| 1 | 2 | 1 |",
+ "| 1 | 3 | 1 |",
+ "| 1 | 4 | 1 |",
+ "| 1 | 5 | 1 |",
+ "| 1 | 6 | 1 |",
+ "| 1 | 7 | 1 |",
+ "| 1 | 8 | 1 |",
+ "| 1 | 9 | 1 |",
+ "| 2 | | 10 |",
+ "| 2 | 1 | 1 |",
+ "| 2 | 10 | 1 |",
+ "| 2 | 2 | 1 |",
+ "| 2 | 3 | 1 |",
+ "| 2 | 4 | 1 |",
+ "| 2 | 5 | 1 |",
+ "| 2 | 6 | 1 |",
+ "| 2 | 7 | 1 |",
+ "| 2 | 8 | 1 |",
+ "| 2 | 9 | 1 |",
+ "| 3 | | 10 |",
+ "| 3 | 1 | 1 |",
+ "| 3 | 10 | 1 |",
+ "| 3 | 2 | 1 |",
+ "| 3 | 3 | 1 |",
+ "| 3 | 4 | 1 |",
+ "| 3 | 5 | 1 |",
+ "| 3 | 6 | 1 |",
+ "| 3 | 7 | 1 |",
+ "| 3 | 8 | 1 |",
+ "| 3 | 9 | 1 |",
+ "+----+----+----------------+",
+ ];
+ assert_batches_sorted_eq!(expected, &results);
+ Ok(())
+}
+
Review Comment:
I think the test coverage is quite good. Thank you
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -1001,6 +1001,275 @@ impl DefaultPhysicalPlanner {
exec_plan
}.boxed()
}
+
+ fn create_grouping_physical_expr(
+ &self,
+ group_expr: &[Expr],
+ input_dfschema: &DFSchema,
+ input_schema: &Schema,
+ session_state: &SessionState,
+ ) -> Result<PhysicalGroupBy> {
+ if group_expr.len() == 1 {
+ match &group_expr[0] {
+ Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets)) =>
{
+ merge_grouping_set_physical_expr(
+ grouping_sets,
+ input_dfschema,
+ input_schema,
+ session_state,
+ )
+ }
+ Expr::GroupingSet(GroupingSet::Cube(exprs)) =>
create_cube_physical_expr(
+ exprs,
+ input_dfschema,
+ input_schema,
+ session_state,
+ ),
+ Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
+ create_rollup_physical_expr(
+ exprs,
+ input_dfschema,
+ input_schema,
+ session_state,
+ )
+ }
+ expr => Ok(PhysicalGroupBy::new_single(vec![tuple_err((
+ self.create_physical_expr(
+ expr,
+ input_dfschema,
+ input_schema,
+ session_state,
+ ),
+ physical_name(expr),
+ ))?])),
+ }
+ } else {
+ Ok(PhysicalGroupBy::new_single(
+ group_expr
+ .iter()
+ .map(|e| {
+ tuple_err((
+ self.create_physical_expr(
+ e,
+ input_dfschema,
+ input_schema,
+ session_state,
+ ),
+ physical_name(e),
+ ))
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ))
+ }
+ }
+}
+
+/// Expand and align a GROUPING SET expression.
+/// (see
https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
+///
+/// This will take a list of grouping sets and ensure that each group is
+/// properly aligned for the physical execution plan. We do this by
+/// identifying all unique expression in each group and conforming each
+/// group to the same set of expression types and ordering.
+/// For example, if we have something like `GROUPING SETS
((a,b,c),(a),(b),(b,c))`
+/// we would expand this to `GROUPING SETS
((a,b,c),(a,NULL,NULL),(NULL,b,NULL),(NULL,b,c))
+/// (see
https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
+fn merge_grouping_set_physical_expr(
+ grouping_sets: &[Vec<Expr>],
+ input_dfschema: &DFSchema,
+ input_schema: &Schema,
+ session_state: &SessionState,
+) -> Result<PhysicalGroupBy> {
+ let num_groups = grouping_sets.len();
+ let mut all_exprs: Vec<Expr> = vec![];
+ let mut grouping_set_expr: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
+ let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
+
+ for expr in grouping_sets.iter().flatten() {
+ if !all_exprs.contains(expr) {
+ all_exprs.push(expr.clone());
+
+ grouping_set_expr.push(get_physical_expr_pair(
+ expr,
+ input_dfschema,
+ input_schema,
+ session_state,
+ )?);
+
+ null_exprs.push(get_null_physical_expr_pair(
+ expr,
+ input_dfschema,
+ input_schema,
+ session_state,
+ )?);
+ }
+ }
+
+ let mut merged_sets: Vec<Vec<bool>> = Vec::with_capacity(num_groups);
+
+ let expr_count = all_exprs.len();
+
+ for expr_group in grouping_sets.iter() {
+ let mut group: Vec<bool> = Vec::with_capacity(expr_count);
+ for expr in all_exprs.iter() {
+ if expr_group.contains(expr) {
+ group.push(false);
+ } else {
+ group.push(true)
+ }
+ }
Review Comment:
I don't think it matters, but you can probably express this in a functional
style like:
```suggestion
let group: Vec<bool> = all_exprs.iter()
.map(expr_group.contains(expr))
.collect();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]