andygrove opened a new pull request, #4782:
URL: https://github.com/apache/datafusion-comet/pull/4782

   ## Which issue does this PR close?
   
   Closes #3970.
   
   ## Rationale for this change
   
   The `mode` aggregate (the most frequent value in a group) is a mainstream 
statistical aggregate that previously fell back to Spark, preventing native 
execution of any query using it. Adding native support keeps these queries in 
Comet's pipeline.
   
   ## What changes are included in this PR?
   
   This PR was scaffolded with the `implement-comet-expression` project skill.
   
   - Native Rust implementation in `native/spark-expr/src/agg_funcs/mode.rs`: a 
`Mode` aggregate UDF with both a global `Accumulator` and a vectorized 
`GroupsAccumulator`. State is a frequency map keyed by `ScalarValue`, 
serialized as a single `struct<values: array<T>, counts: array<bigint>>` buffer 
column so partial/final buffer schemas stay aligned with Spark's 
single-attribute `TypedImperativeAggregate` buffer.
   - Protobuf `Mode` message and `AggExpr` oneof entry, plus the planner arm in 
`planner.rs`.
   - `CometMode` serde and registration in `QueryPlanSerde.aggrSerdeMap`.
   - A `Mode` branch in `adjustOutputForNativeState` (`operators.scala`) 
mapping the Spark binary buffer type to the native struct state type.
   - A `modeHasUnsupportedOrdering` shim in `CometTypeShim` (spark-3.x / 
spark-4.x) because `Mode.reverseOpt` only exists on Spark 4.0+.
   - Docs: `mode` marked supported in the expressions guide.
   
   Scope and compatibility:
   
   - Only the plain `mode(col)` form is supported. The `mode(col, 
deterministic)` and `mode() WITHIN GROUP (ORDER BY col)` forms (Spark 4.0+, 
which set `reverseOpt`) fall back to Spark.
   - Registered as `Incompatible` (opt-in via 
`spark.comet.expression.Mode.allowIncompatible=true`): Spark breaks ties 
non-deterministically based on JVM hash-map iteration order, which a native 
hash map cannot reproduce bit-for-bit. Comet instead returns the smallest tied 
value deterministically.
   - NULLs are ignored, empty input returns NULL, and float keys are normalized 
(`-0.0` to `0.0`, canonical `NaN`) to match Spark's counting. Supported input 
types are numeric, boolean, decimal, date, timestamp, timestamp_ntz, and 
default-collation string; other types fall back.
   
   ## How are these changes tested?
   
   - Rust unit tests in `mode.rs` covering most-frequent value, tie-break to 
smallest, NULL handling, empty input, float normalization, and partial/final 
merge equivalence for both the accumulator and the groups accumulator.
   - A `mode.sql` file test exercising global and grouped aggregation, NULLs, 
all-NULL groups, mixed aggregates, HAVING, and 
boolean/integer/double/decimal/string/date/timestamp inputs, plus an 
unsupported-type fallback assertion. Verified on Spark 3.5 and Spark 4.1.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to