pchintar opened a new issue, #3841:
URL: https://github.com/apache/datafusion-comet/issues/3841

   Description
   
   There have been prior discussions around supporting 'max_by' and 'min_by':
   
   - #12075 (discussion only, closed without implementation)
   - #12252 (moved to another repo, not implemented in DataFusion/Comet)
   
   From those discussions, it was suggested that these functions could be 
expressed using:
   - 'first_value(...) ORDER BY ...'
   or similar approaches.
   
   However, this still requires sorting and does not provide a clean or 
efficient native implementation.
   
   Currently, these functions are not fully supported for native execution in 
Comet, especially in cases where Spark plans 'SortAggregateExec' (e.g., 
variable-width types like strings), which leads to fallback.
   
   ---
   
   Proposed Solution
   
   Add full native Comet support for:
   
   - 'max_by(x, y)'
   - 'min_by(x, y)'
   
   covering both:
   - 'HashAggregateExec'
   - 'SortAggregateExec'
   
   ---
   
   Approach
   
   Instead of relying on sorting (as in 'ORDER BY'-based approaches), we 
implement this using a simple selector-style aggregation:
   
   For each group, we keep only:
     - the current best value
     - the corresponding ordering value
   
   As we scan rows:
     - we compare the incoming ordering value with the current best
     - if it is better, we replace the stored pair
   
   This idea is directly inspired & was discussed in #12075 (avoiding full 
sorting and using a running comparison instead).
   
   In practice, this means:
   - no sorting
   - no storing all rows
   - just a single-pass update using a '(value, order)' pair
   
   The same logic is also used during merge:
   - partial aggregates are combined by comparing their stored '(value, order)' 
pairs
   
   ---
   
   Implementation Plan (High-Level)
   
   - Native Rust aggregate implementation (single shared logic for max/min)
   - Proto support for MaxBy / MinBy
   - Scala serde integration
   - Planner integration
   - SortAggregate operator support (to eliminate fallback cases)
   
   ---
   
   Status
   
   I have implemented and validated this locally:
   
   - Native execution confirmed
   - Works for both HashAggregate and SortAggregate
   - Matches Spark semantics:
     - null handling
     - tie behavior (newer value wins)
   - Eliminates fallback in tested cases
   
   (Example implementation: :contentReference[oaicite:0]{index=0})
   
   I can open a PR if this direction is aligned.
   
   ---
   
   Related
   
   - #12075
   - #12252
   
   ---
   
   cc @andygrove 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to