mustafasrepo commented on code in PR #6332:
URL: https://github.com/apache/arrow-datafusion/pull/6332#discussion_r1193641400


##########
datafusion/core/src/physical_plan/aggregates/mod.rs:
##########
@@ -337,13 +341,62 @@ fn output_group_expr_helper(group_by: &PhysicalGroupBy) 
-> Vec<Arc<dyn PhysicalE
         .collect()
 }
 
+/// This function gets the finest ordering requirement among all the 
aggregation
+/// functions. If requirements are conflicting, (i.e. we can not compute the
+/// aggregations in a single [`AggregateExec`]), the function returns an error.
+fn get_finest_requirement<
+    F: Fn() -> EquivalenceProperties,
+    F2: Fn() -> OrderingEquivalenceProperties,
+>(
+    order_by_expr: &[Option<Vec<PhysicalSortExpr>>],
+    eq_properties: F,
+    ordering_eq_properties: F2,
+) -> Result<Option<Vec<PhysicalSortExpr>>> {
+    let mut result: Option<Vec<PhysicalSortExpr>> = None;
+    for fn_reqs in order_by_expr.iter().flatten() {
+        if let Some(result) = &mut result {
+            if ordering_satisfy_concrete(
+                result,
+                fn_reqs,
+                &eq_properties,
+                &ordering_eq_properties,
+            ) {
+                // Do not update the result as it already satisfies current
+                // function's requirement:
+                continue;
+            }
+            if ordering_satisfy_concrete(
+                fn_reqs,
+                result,
+                &eq_properties,
+                &ordering_eq_properties,
+            ) {
+                // Update result with current function's requirements, as it is
+                // a finer requirement than what we currently have.
+                *result = fn_reqs.clone();
+                continue;
+            }
+            // If neither of the requirements satisfy the other, this means
+            // requirements are conflicting. Currently, we do not support
+            // conflicting requirements.
+            return Err(DataFusionError::Plan(
+                "Conflicting ordering requirements in aggregate 
functions".to_string(),
+            ));
+        } else {
+            result = Some(fn_reqs.clone());
+        }
+    }
+    Ok(result)
+}
+
 impl AggregateExec {
     /// Create a new hash aggregate execution plan
     pub fn try_new(
         mode: AggregateMode,
         group_by: PhysicalGroupBy,
         aggr_expr: Vec<Arc<dyn AggregateExpr>>,
         filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
+        order_by_expr: Vec<Option<Vec<PhysicalSortExpr>>>,

Review Comment:
   This makes sense to me. With this change we may decrease clutter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to