mbutrovich commented on PR #4254:
URL: 
https://github.com/apache/datafusion-comet/pull/4254#issuecomment-4397270538

   Thanks for picking this up, @andygrove! The speedups are really nice to see 
and the refactor into `welford.rs` makes the per-row and grouped paths a lot 
easier to reason about together. A few things I wanted to ask about while 
reading through.
   
   ### Correlation update_batch mask
   
   In `CorrelationGroupsAccumulator::update_batch` we build `is_not_null(a) & 
is_not_null(b)` on every batch and AND it with `opt_filter`, then hand the 
combined mask to all three children. The per-row `CorrelationAccumulator` has a 
`values[0].null_count() != 0 || values[1].null_count() != 0` short-circuit that 
skips the whole thing when both columns are dense. Could we add the same fast 
path here? Otherwise each child's hot loop hits the `opt_filter` branch for 
every row even when the input was fully non-null, and the allocation is paid 
regardless.
   
   ### Correlation evaluate doing three finalizes
   
   `CorrelationGroupsAccumulator::evaluate` calls `evaluate` on `covar`, 
`var1`, and `var2`, each of which allocates a `Float64Array` and runs its own 
count==0 / sample divide-by-zero branch. We then unpack all three back to 
primitives and largely discard the metadata. Since correlation already reaches 
into `covar.counts()`, would it be cleaner and cheaper to iterate the raw 
`counts`, `m2s`, and `algo_consts` directly once? That would also remove the 
need for the `pub(crate) counts()` accessor, which currently exists only so 
correlation can snapshot counts before the children's `evaluate` drains their 
state.
   
   ### finalize duplication between variance and covariance
   
   `VarianceGroupsAccumulator::finalize` and 
`CovarianceGroupsAccumulator::finalize` look structurally identical, with the 
only difference being `m2s` vs `algo_consts` as the numerator. Would it be 
worth hoisting the shared body into something like 
`welford::finalize_moments(counts, numerators, stats_type, 
null_on_divide_by_zero)`? Feels like it would drop ~70 lines and keep the Spark 
sample-NaN vs null rule in one place.
   
   ### Stddev evaluate allocation
   
   In `StddevGroupsAccumulator::evaluate` we do `values.iter().map(|v| 
v.sqrt()).collect::<Vec<_>>()` and rebuild a `Float64Array`. Arrow's 
`PrimitiveArray::unary_mut` or `unary` would run the sqrt in place on the 
buffer we already produced, avoiding the intermediate `Vec<f64>`. Not a huge 
win per emit but removes one allocation per output batch.
   
   ### Covariance argument count
   
   `welford::covariance_merge` takes 8 `f64`s and needs 
`#[allow(clippy::too_many_arguments)]`, and `covariance_update` / 
`covariance_retract` each take 6. I wonder if a small `CovarianceMoments { 
count, mean1, mean2, c }` struct would read better? It would also let 
`CovarianceGroupsAccumulator` collapse its four parallel `Vec<f64>`s into a 
single `Vec<CovarianceMoments>`, which is friendlier on the cache when the same 
group index is revisited.
   
   ### Redundant null_on_divide_by_zero on Correlation
   
   `CorrelationGroupsAccumulator` stores `null_on_divide_by_zero` on the struct 
and also passes it into all three children. The children's copy looks 
unreachable because the top-level `evaluate` short-circuits at `count <= 1` 
before delegating. Is there a reason to keep both, or could we drop one side?
   
   ### DataFusion accumulate helpers
   
   Have you looked at `datafusion::functions-aggregate-common::accumulate` and 
`accumulate_multiple`? DF's own `VarianceGroupsAccumulator::update_batch` uses 
them to handle `opt_filter + null skip` in a single cache-friendly pass. I 
think the Spark `f64` count requirement is what blocks direct reuse of DF's 
accumulator, but the update loop itself might still be a nice win here. Curious 
whether you considered this and found it not worth it.
   
   ### Stddev test coverage
   
   Variance has `sample_single_row_nan_legacy` and 
`sample_single_row_null_when_flag_set`, and correlation has the equivalent 
pair. Stddev grouped tests only cover pop-single-group and empty-group. Stddev 
wraps variance so the behavior is covered indirectly, but could we mirror those 
two tests so the Spark contract is explicit in the stddev file too?
   
   ### CometBenchmarkBase shuffle manager
   
   The `spark.shuffle.manager = CometShuffleManager` line added to 
`CometBenchmarkBase` caught my eye. The aggregate benchmarks in this PR run 
`local[1]` and don't cross a shuffle boundary, and the change applies to every 
benchmark that extends the base trait. Does it belong in its own PR, or moved 
into the specific benchmark that needed it? I want to make sure the 1.9x-3.7x 
numbers were measured against the same harness the previous baseline used.
   
   Overall very nice work, the unit tests do a good job of pinning down the 
Spark-specific edges and the state ordering comments on correlation are helpful 
for future readers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to