jonahgao commented on code in PR #11469:
URL: https://github.com/apache/datafusion/pull/11469#discussion_r1678992594


##########
datafusion/sql/src/select.rs:
##########
@@ -354,6 +358,118 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .build()
     }
 
+    fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> 
Result<LogicalPlan> {
+        match &input {
+            LogicalPlan::Aggregate(agg) => {
+                let (new_input, new_group_by_exprs) =
+                    self.try_process_group_by_unnest(agg)?;
+                LogicalPlanBuilder::from(new_input)
+                    .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                    .build()
+            }
+            LogicalPlan::Filter(filter) => match filter.input.as_ref() {
+                LogicalPlan::Aggregate(agg) => {
+                    let (new_input, new_group_by_exprs) =
+                        self.try_process_group_by_unnest(agg)?;
+                    LogicalPlanBuilder::from(new_input)
+                        .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                        .filter(filter.predicate.clone())?
+                        .build()
+                }
+                _ => Ok(input),
+            },
+            _ => Ok(input),
+        }
+    }
+
+    /// Try converting Unnest(Expr) of group by to Unnest/Projection
+    /// Return the new input and group_by_exprs of Aggregate.
+    fn try_process_group_by_unnest(

Review Comment:
   This function seems to have some code repetition with function 
[try_process_unnest](https://github.com/apache/datafusion/blob/f204869ff55bb3e39cf23fc0a34ebd5021e6773f/datafusion/sql/src/select.rs#L295).
 
   
   I wonder if there's a better way to handle this, such as planning unnest 
before aggregation, and then reusing the current group-by planning logic. This 
seems more intuitive to me. But I'm not sure about it.
   



##########
datafusion/sql/src/select.rs:
##########
@@ -297,6 +298,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         input: LogicalPlan,
         select_exprs: Vec<Expr>,
     ) -> Result<LogicalPlan> {
+        // Try process group by unnest
+        let input = self.try_process_aggregate_unnest(input)?;

Review Comment:
   If unnest has already been processed by `try_process_aggregate_unnest`, does 
the following logic for handling `unnest` become redundant?



##########
datafusion/sql/src/select.rs:
##########
@@ -354,6 +358,118 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .build()
     }
 
+    fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> 
Result<LogicalPlan> {
+        match &input {
+            LogicalPlan::Aggregate(agg) => {
+                let (new_input, new_group_by_exprs) =
+                    self.try_process_group_by_unnest(agg)?;
+                LogicalPlanBuilder::from(new_input)
+                    .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                    .build()
+            }
+            LogicalPlan::Filter(filter) => match filter.input.as_ref() {
+                LogicalPlan::Aggregate(agg) => {
+                    let (new_input, new_group_by_exprs) =
+                        self.try_process_group_by_unnest(agg)?;
+                    LogicalPlanBuilder::from(new_input)
+                        .aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
+                        .filter(filter.predicate.clone())?
+                        .build()
+                }
+                _ => Ok(input),
+            },
+            _ => Ok(input),
+        }
+    }
+
+    /// Try converting Unnest(Expr) of group by to Unnest/Projection
+    /// Return the new input and group_by_exprs of Aggregate.
+    fn try_process_group_by_unnest(
+        &self,
+        agg: &Aggregate,
+    ) -> Result<(LogicalPlan, Vec<Expr>)> {
+        let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
+
+        let input = agg.input.as_ref();
+        let group_by_exprs = &agg.group_expr;
+        let aggr_exprs = &agg.aggr_expr;
+
+        // process unnest of group_by_exprs, and input of agg will be rewritten
+        // for example:
+        //
+        // ```
+        // Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { 
table: "tab" }), name: "array_col" }))]], aggr=[[]]
+        //   TableScan: tab
+        // ```
+        //
+        // will be transformed into
+        //
+        // ```
+        // Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
+        //   Unnest: lists[unnest(tab.array_col)] structs[]
+        //     Projection: tab.array_col AS unnest(tab.array_col)
+        //       TableScan: tab
+        // ```
+        let mut intermediate_plan = input.clone();

Review Comment:
   Is it possible to avoid this clone?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to