mbutrovich opened a new issue, #4614:
URL: https://github.com/apache/datafusion-comet/issues/4614

   ## Describe the problem
   
   Spark's `CollapseCodegenStages` fuses a maximal subtree of codegen-capable 
operators into one generated method. Rows flow through the whole chain as local 
variables, with no inter-operator row or batch materialization. Comet runs each 
operator as a separate vectorized DataFusion operator, and a batch is produced 
at the boundary between operators. For joins, `HashJoinExec` materializes a 
fresh `RecordBatch` via `take()` for every adjacent join. (Note: a 
`ProjectionExec` of pure column references is zero-copy - it builds a new 
`RecordBatch` of `Arc`-cloned arrays - so projections are not part of the cost.)
   
   Native execution usually wins and also removes the row/columnar conversions 
Spark needs. But when a stage carries a large intermediate (for example an 
upstream join inflates the row count before the rest of the chain runs), 
running the chain as separate native operators can be slower than Spark's 
single fused loop. As Comet converts more operators natively and falls back to 
Spark less often, more of these fusible stages run natively and the cost 
becomes visible.
   
   This issue documents the regression, a concrete example (TPC-DS q72), and 
two solutions at different scopes: (1) detect the pattern in `CometExecRule` 
and selectively fall back to Spark, and (2) a fused native operator that 
removes inter-operator materialization while staying native.
   
   ## Evidence: TPC-DS q72, SF1000, Iceberg
   
   Full-suite results are posted in #3978 
([comment](https://github.com/apache/datafusion-comet/issues/3978#issuecomment-4660914029)).
 Across all 103 queries the two configurations are within noise (1027s vs 1015s 
total). q72 is the main outlier: 13s to 42s when the join chain moved from a 
Spark WholeStageCodegen fallback to native Comet operators.
   
   Both plans share an identical logical plan (same scan row counts, same final 
output). The only structural difference is where the chain executes:
   
   - Fallback plan: a `SortMergeJoin` plus 8 `BroadcastHashJoin` run inside one 
`WholeStageCodegen` block (Spark JVM), output via `CometColumnarExchange`.
   - Native plan: the same joins run as one `CometSortMergeJoin` plus 8 
`CometBroadcastHashJoin` (7 Inner, 1 top LeftOuter), each producing its own 
Arrow batch.
   
   The streaming (probe) side of the chain starts with a `SortMergeJoin` on 
`catalog_sales x inventory` with an inequality condition (`inv_quantity_on_hand 
< cs_quantity`) that expands 791M probe rows to about 7.9B, then carries that 
stream through several broadcast joins before selective joins reduce it to 
1.58B and then about 9M.
   
   Per-task timings (min, med, max across 512 tasks) localize the regression:
   
   - Fallback: the entire fused `WholeStageCodegen` block (the inequality SMJ 
plus all 8 broadcast joins) is 6.1s median, 8.9s max per task.
   - Native: the `CometSortMergeJoin` alone is 23.9s median, 32.3s max per 
task. The broadcast joins add roughly 1 to 2s per join per task on top.
   
   All other stages match across both plans (top `CometSortMergeJoin` 5.8m vs 
5.7m aggregate, the three large shuffles identical row counts), which isolates 
the difference to this stage.
   
   ### Two candidate causes
   
   The plan diff localizes the cost but does not by itself attribute it. Two 
causes are consistent with the data and are separable only by a CPU profile:
   
   1. Lost WSCG fusion: the large intermediate is materialized between native 
operators where Spark kept it in registers. This is what Solution 2 targets.
   2. The native non-equi sort-merge join is itself slower than Spark's at 
producing 7.9B rows, independent of fusion. The per-task split (one native SMJ 
at 32.3s max vs Spark's whole fused stage at 8.9s max) suggests this is a 
significant part of the gap.
   
   A profiler run (async-profiler or perf) on the native q72 stage would 
confirm whether `build_batch_from_indices` / `take` dominates, or whether the 
sort-merge-join compute itself does.
   
   ### What flips the behavior
   
   Comet only converts a `SortMergeJoin` that has a join condition when 
`spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` is set; that flag is 
documented "Experimental" and defaults to `false`. With the default, this SMJ 
falls back, and because `CometExecRule` only converts a node when all of its 
children are already native, every `BroadcastHashJoin` above the non-native SMJ 
also stays on Spark and `CollapseCodegenStages` fuses the whole chain. The 
regression appears only when that experimental flag is enabled (as in the tuned 
benchmark above). At the default config, q72 already behaves like the fallback 
plan, so part of this is a config interaction - worth confirming in the 
benchmark configuration.
   
   ## Background: why the gap exists
   
   `CollapseCodegenStages` wraps maximal `CodegenSupport` subtrees in 
`WholeStageCodegenExec`. `SortMergeJoinExec` and `ShuffledHashJoinExec` are the 
only joins that break codegen at their children; `BroadcastHashJoinExec` does 
not, so its streaming child fuses into the same stage. A chain of broadcast 
joins (optionally driven by a sort-merge join at the bottom) collapses into one 
Java method with no `Row`/`UnsafeRow`/batch allocations between operators. 
Fusing broadcast joins is safe because the build sides are `HashedRelation`s 
already resident in memory before the stage runs; Spark picks a broadcast join 
only when the build side fits under `spark.sql.autoBroadcastJoinThreshold`, so 
no fused build side spills.
   
   DataFusion is vectorized batch-at-a-time with no codegen. Materialization 
between joins lives in `build_batch_from_indices`, called from 
`process_probe_batch`; for each output column it does `take(array, indices)` 
into a fresh Arrow buffer. Probing itself (`lookup_join_hashmap`) produces 
index pairs in reusable scratch buffers and does not touch payload columns. The 
probe work is already separate from materialization; they are just colocated in 
one operator. Velox and DuckDB avoid inter-operator materialization via 
dictionary-vector wrapping (lazy materialization across operators), which would 
require every downstream DataFusion operator to handle dictionary-wrapped 
inputs without flattening, an engine-wide change.
   
   ## Solution 1: detect the pattern in CometExecRule and fall back to Spark
   
   Identify a maximal chain of WholeStageCodegen-fusible operators (a run of 
`BroadcastHashJoinExec` on a single streaming spine, optionally with the 
sort-merge join that drives it) and decline to convert it, leaving it for 
`CollapseCodegenStages` to fuse. This restores the fallback plan's behavior for 
the regressing shape.
   
   `CometExecRule` already has both mechanisms this needs:
   
   - A pre-pass that tags nodes to skip, mirroring `tagUnsafePartialAggregates` 
(sets a `TreeNodeTag` read later during conversion).
   - A post-transform revert, mirroring `revertRedundantColumnarShuffle` (added 
in #4004), which reverts a `CometExec` back to its `originalPlan` when 
conversion only added overhead.
   
   Detection is structural and cheap, and the fallback cascades: because a node 
is only converted when all of its children are native, declining the bottom 
driver keeps the entire chain above it on Spark for free.
   
   Trade-offs:
   
   - Pro: small change, reuses existing patterns, strictly no worse than the 
current fallback plan for the matched shape.
   - Con: unlike `CollapseCodegenStages` (where fusion is never worse for 
Spark), declining a Comet conversion forfeits the common-case native win. The 
full suite is flat with q72 the lone outlier, so a blanket structural rule (for 
example "fall back any broadcast-join chain of length >= N") risks net 
regression across other queries.
   - The decision criterion is the hard part. A trigger correlated with a large 
intermediate is needed. The non-equi / range join condition is a natural signal 
(it is the cardinality amplifier here, and it is exactly what the experimental 
flag gates). AQE runtime statistics do not help directly, because the row 
expansion happens inside the stage (the sort-merge join output is not a shuffle 
boundary), so the chain's input stats do not reveal it.
   - Leaves performance on the table: the chain runs on the Spark JVM rather 
than natively.
   
   ## Solution 2: fused native operator
   
   A Comet native operator (working name `MultiHashJoinExec`) that replaces a 
chain of adjacent broadcast `HashJoinExec` nodes sharing one streaming probe. 
It probes each build side and composes gather indices through the chain, 
materializing the output `RecordBatch` once at the end rather than between 
every join. The operator emits the same `RecordBatch` the chain of 
`HashJoinExec`s would produce, so results are unchanged when the rule does not 
fire. This lives in Comet's native crate and requires no DataFusion changes for 
an initial implementation; it could be proposed upstream later if it 
generalizes.
   
   ### Components
   
   1. `MultiHashJoinExec` - the `ExecutionPlan` that replaces N adjacent 
`HashJoinExec` nodes. Holds K build-side hash tables, a single streaming probe 
child, a stage plan, and the output schema.
   2. `MultiHashJoinStream` - the per-partition stream, a state machine 
paralleling `HashJoinStream`.
   3. `JoinStage` / `LazyState` - an internal pipeline. Each stage transforms 
`LazyState -> LazyState`; a final `MaterializeStage` produces the output. Stage 
kinds: `ProbeStage`, `ProjectStage` (column-ref only), `MaterializeStage`.
   4. A Comet planning pass that pattern-matches the chain and rewrites it.
   
   ### Lazy index composition
   
   `LazyState` carries the probe batch, the current probe-side gather indices, 
and a `(build_table, build_indices)` pair per executed stage. Each `ProbeStage` 
extracts join keys from either the probe input or a prior build side (without 
materializing the intermediate batch), probes its hash table, then rebases all 
prior indices through the new match indices. The final `MaterializeStage` 
performs one `take()` per (source, output column). The win is on payload 
columns: instead of `N joins x M payload columns` take calls, the operator pays 
one `take()` per (source, output column). Join-key columns still pay per stage, 
but those are small fixed columns.
   
   ### Planning rule predicate
   
   - a chain of `HashJoinExec` nodes on a shared probe driver
   - intermediate `ProjectionExec` with column-ref-only expressions allowed
   - every join is `PartitionMode::CollectLeft`
   - every join is `JoinType::Inner`
   - equi-join only (non-empty join keys, no `JoinFilter`)
   - chain length >= 2
   
   On match, build the stage plan from the chain (rewiring column indices into 
the unified output schema) and emit one `MultiHashJoinExec`. Any deviation 
leaves the chain unchanged. Gate behind a config flag, default off for initial 
rollout.
   
   ### Phase 1 scope
   
   In scope: `PartitionMode::CollectLeft` only (broadcast build sides, no 
spilling), `JoinType::Inner` only, equi-join keys only (no `IS NOT DISTINCT 
FROM`), no `JoinFilter`, `ProbeStage` / `ProjectStage` (column-ref only) / 
`MaterializeStage`, intermediate column-ref-only `ProjectionExec`, chain length 
>= 2, off by default.
   
   Out of scope for phase 1: outer / semi / anti / mark joins, `Partitioned` 
join mode, join filters, intermediate `FilterExec`, expression-bearing 
intermediate `ProjectionExec`, dynamic filter pushdown across fused stages, 
spilling within fused stages.
   
   Rationale for the cuts:
   
   | Cut | Reason |
   |-----|--------|
   | CollectLeft only | Guarantees no spilling and no inter-stage partitioning 
requirements. Mirrors Spark's broadcast-join-only WSCG fusion. |
   | Inner only | Null-index composition across stages for outer joins is the 
likeliest source of subtle bugs. |
   | No JoinFilter | Composition semantics for multi-stage filters need design 
work. |
   | No FilterExec between joins | Same reason as JoinFilter; a `FilterStage` 
is the natural phase-2 addition. |
   | Column-ref Project only | General expression evaluation against the lazy 
form needs design. Column refs are trivial metadata. |
   | Off by default | Standard for a new optimization with novel semantics. |
   
   ### Caveat for q72 under phase 1
   
   q72's row expansion originates from a `SortMergeJoin`, which the 
broadcast-only phase-1 scope would not fuse. The native sort-merge join is also 
the dominant per-task cost in the measurements above. So phase 1 would only 
collapse the broadcast-chain materializations (the smaller component for q72) 
and would leave the sort-merge join unchanged. For this query specifically, 
Solution 1 (or keeping the experimental flag off) is the more direct fix; 
Solution 2 is the general fix for broadcast-join chains that carry large 
intermediates.
   
   ### Future phases
   
   Phase 2: `FilterStage` for column-ref-comparable filters between stages; 
`LeftSemi` / `LeftAnti` join types; general `ProjectStage` with expression 
evaluation; equi-join with `JoinFilter` via the existing 
`apply_join_filter_to_indices`.
   
   Phase 3 (harder): outer joins (null-index composition across stages); 
dynamic filter pushdown across fused stages; `Partitioned` mode chains where 
partitioning aligns.
   
   Permanently out of scope: spilling within fused stages (fall back to vanilla 
`HashJoinExec` per stage instead, mirroring Spark's permanent broadcast-only 
fusion cut); cross-partition fusion across `RepartitionExec` boundaries.
   
   ### Testing strategy
   
   1. Unit tests per stage with hand-crafted `LazyState` inputs.
   2. A chain fuzz harness extending DataFusion's `join_fuzz.rs`, using a chain 
of vanilla `HashJoinExec` as the oracle and asserting equal outputs after sort. 
Vary stage count (2..=5), per-stage build cardinality and key uniqueness, key 
sourcing (probe vs prior build-side columns), interleaved column-ref 
`ProjectionExec`, NULL keys across stages, and empty build side at stage K. 
Reuse the existing `batch_sizes = [1, 2, 7, 49, 50, 51, 100]`.
   3. Plan-shape snapshot tests on q72 and a few other star-schema queries, 
asserting the rule fires and produces `MultiHashJoinExec`, plus negative cases 
(must not fire on `Partitioned` mode, outer joins, or chains broken by 
`FilterExec`).
   4. Benchmarks: q72 before/after at minimum, full TPC-DS to catch cases where 
fusion is a net loss (very low-selectivity build sides where index-composition 
overhead exceeds the saved `take()`s).
   
   ### Risk areas
   
   1. Index composition correctness, particularly once nulls appear in later 
phases.
   2. Dynamic filter pushdown interaction with a fused operator owning multiple 
build sides (deferred to phase 3, confirm it does not break in phase 1).
   3. Build-side memory accounting: K hash tables live in one operator's 
reservation. Net pressure is unchanged from the separate operators, but the 
accounting may need updating.
   4. `ReuseExchange` interaction: q72 reuses the `date_dim` broadcast and 
carries a dynamic-partition-pruning subquery broadcast. The fused operator 
holds an `Arc` per build side, so it should compose with reuse; worth a 
plan-shape test.
   5. Column-index rewiring when collapsing N joins into a unified output 
schema is fiddly; snapshot tests should catch regressions.
   
   ## Scope comparison
   
   | | Solution 1: selective fallback | Solution 2: fused operator |
   |---|---|---|
   | Effort | Small; reuses existing tag / revert patterns | Large; new 
operator, rule, fuzz harness, phased |
   | Execution | Spark JVM (WSCG) for the matched chain | Native |
   | Best case | No worse than the current fallback plan | Removes 
inter-operator materialization, stays native |
   | Coverage of q72 | Full (restores the fallback plan) | Partial (broadcast 
chain only; sort-merge driver not fused) |
   | Main risk | Heuristic trigger; blanket rule risks net regression | 
Index-composition correctness; effort |
   
   The two are complementary. Solution 1 (or simply keeping the experimental 
sort-merge-join-with-filter flag at its default) is a near-term safety valve 
and the more direct fix for q72; Solution 2 is the general fix that keeps 
broadcast-join chains native. A reasonable path is to confirm the cause with a 
profile, land the safety valve as needed, and pursue the fused operator behind 
a config flag over time.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to