mbutrovich commented on PR #4003:
URL:
https://github.com/apache/datafusion-comet/pull/4003#issuecomment-4353922011
Thanks @comphead! The `MergeAsPartialUDF` wrapper is a clever way to get
`PartialMerge` semantics without reaching into DataFusion's aggregation logic.
### Streaming shim asymmetry
I checked the Spark source at `v3.5.8`, `v4.0.2`, and `v4.1.1`.
`StreamSourceAwareSparkPlan` doesn't exist before 4.x, so needing a different
3.x approach is unavoidable. The shapes of the two checks do differ in a way
worth flagging though.
On 3.x the shim does `plan.logicalLink.exists(_.isStreaming)`. That only
looks at the logical link of the root physical node. `LogicalPlan.isStreaming`
does recurse through logical children, but if the root physical node has
`logicalLink = None` (a rule built a fresh node without copying the link) the
check silently returns `false`.
On 4.x the shim does `plan.exists { case p: StreamSourceAwareSparkPlan =>
p.getStream.isDefined }`. `SparkPlan.exists` walks the whole physical tree, and
the leaf check does not depend on logical link preservation.
Would `plan.exists(_.logicalLink.exists(_.isStreaming))` on 3.x be a safer
analogue, or is there a reason you only want to inspect the root? Totally fine
if this is out of scope for the PartialMerge work, just wanted to surface it.
### `sum_int.rs` merge_batch rewrite
The prior `merge_batch` only read row 0 of the state array, which looks like
a latent bug whenever a merge batch had more than one row. Switching to
`update_batch(states)` fixes that. Do you think it's worth adding a small
direct test that exercises `merge_batch` with a multi-row state array
independent of the `PartialMerge` plumbing, so the fix is captured even if the
`MergeAsPartial` path changes later?
### `collect_set` in the description
The description lists `FIRST` / `LAST` / `collect_set` as falling back, but
the serde only filters `First` and `Last`. Is `collect_set` covered entirely by
the streaming plan fallback? If so, a line of comment near the `First`/`Last`
filter explaining the split would help future readers.
### `distinct` test lost its aggregate count assertion
The previous version used `checkSparkAnswerAndNumOfAggregates` to assert how
many native aggregate stages ran. The new version drops that. Since this PR is
specifically about enabling more stages to stay native, would it be worth
keeping a mode count assertion in one or two of the cases so a future
regression that silently falls back one stage still shows up? Same thread as
the existing review comment.
### Test name mismatch
`partialMerge - distinct + non-distinct with first() FILTER (Expand
pattern)` doesn't use `FIRST` or `FILTER` in the queries it runs. Was that the
intended coverage, or did the queries drift from the original intent?
### Leftover configs
In `partialMerge - cnt distinct + sum`, are
`spark.comet.cast.allowIncompatible` and
`spark.comet.expression.Cast.allowIncompatible` actually needed? Looks like
they might be leftover from local debugging.
### AVG(DISTINCT) coverage
The serde relaxation also admits `SUM(DISTINCT x)` and `AVG(DISTINCT x)` for
single column. The `distinct` test covers `COUNT` and `SUM` variants but I
didn't see `AVG(DISTINCT)` exercised. Could we add one case, ideally mixed with
a non distinct aggregate like `AVG(DISTINCT a) + SUM(b)`, so the Expand plan is
exercised for a non count distinct?
### `planner.rs` mode literals
The match on `agg.mode` uses `0`, `1`, `2` directly. Would it be cleaner to
match on the generated `AggregateMode` enum so the proto and native sides can't
desync silently?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]