mbutrovich opened a new issue, #4614: URL: https://github.com/apache/datafusion-comet/issues/4614
## Describe the problem Spark's `CollapseCodegenStages` fuses a maximal subtree of codegen-capable operators into one generated method. Rows flow through the whole chain as local variables, with no inter-operator row or batch materialization. Comet runs each operator as a separate vectorized DataFusion operator, and a batch is produced at the boundary between operators. For joins, `HashJoinExec` materializes a fresh `RecordBatch` via `take()` for every adjacent join. (Note: a `ProjectionExec` of pure column references is zero-copy - it builds a new `RecordBatch` of `Arc`-cloned arrays - so projections are not part of the cost.) Native execution usually wins and also removes the row/columnar conversions Spark needs. But when a stage carries a large intermediate (for example an upstream join inflates the row count before the rest of the chain runs), running the chain as separate native operators can be slower than Spark's single fused loop. As Comet converts more operators natively and falls back to Spark less often, more of these fusible stages run natively and the cost becomes visible. This issue documents the regression, a concrete example (TPC-DS q72), and two solutions at different scopes: (1) detect the pattern in `CometExecRule` and selectively fall back to Spark, and (2) a fused native operator that removes inter-operator materialization while staying native. ## Evidence: TPC-DS q72, SF1000, Iceberg Full-suite results are posted in #3978 ([comment](https://github.com/apache/datafusion-comet/issues/3978#issuecomment-4660914029)). Across all 103 queries the two configurations are within noise (1027s vs 1015s total). q72 is the main outlier: 13s to 42s when the join chain moved from a Spark WholeStageCodegen fallback to native Comet operators. Both plans share an identical logical plan (same scan row counts, same final output). The only structural difference is where the chain executes: - Fallback plan: a `SortMergeJoin` plus 8 `BroadcastHashJoin` run inside one `WholeStageCodegen` block (Spark JVM), output via `CometColumnarExchange`. - Native plan: the same joins run as one `CometSortMergeJoin` plus 8 `CometBroadcastHashJoin` (7 Inner, 1 top LeftOuter), each producing its own Arrow batch. The streaming (probe) side of the chain starts with a `SortMergeJoin` on `catalog_sales x inventory` with an inequality condition (`inv_quantity_on_hand < cs_quantity`) that expands 791M probe rows to about 7.9B, then carries that stream through several broadcast joins before selective joins reduce it to 1.58B and then about 9M. Per-task timings (min, med, max across 512 tasks) localize the regression: - Fallback: the entire fused `WholeStageCodegen` block (the inequality SMJ plus all 8 broadcast joins) is 6.1s median, 8.9s max per task. - Native: the `CometSortMergeJoin` alone is 23.9s median, 32.3s max per task. The broadcast joins add roughly 1 to 2s per join per task on top. All other stages match across both plans (top `CometSortMergeJoin` 5.8m vs 5.7m aggregate, the three large shuffles identical row counts), which isolates the difference to this stage. ### Two candidate causes The plan diff localizes the cost but does not by itself attribute it. Two causes are consistent with the data and are separable only by a CPU profile: 1. Lost WSCG fusion: the large intermediate is materialized between native operators where Spark kept it in registers. This is what Solution 2 targets. 2. The native non-equi sort-merge join is itself slower than Spark's at producing 7.9B rows, independent of fusion. The per-task split (one native SMJ at 32.3s max vs Spark's whole fused stage at 8.9s max) suggests this is a significant part of the gap. A profiler run (async-profiler or perf) on the native q72 stage would confirm whether `build_batch_from_indices` / `take` dominates, or whether the sort-merge-join compute itself does. ### What flips the behavior Comet only converts a `SortMergeJoin` that has a join condition when `spark.comet.exec.sortMergeJoinWithJoinFilter.enabled` is set; that flag is documented "Experimental" and defaults to `false`. With the default, this SMJ falls back, and because `CometExecRule` only converts a node when all of its children are already native, every `BroadcastHashJoin` above the non-native SMJ also stays on Spark and `CollapseCodegenStages` fuses the whole chain. The regression appears only when that experimental flag is enabled (as in the tuned benchmark above). At the default config, q72 already behaves like the fallback plan, so part of this is a config interaction - worth confirming in the benchmark configuration. ## Background: why the gap exists `CollapseCodegenStages` wraps maximal `CodegenSupport` subtrees in `WholeStageCodegenExec`. `SortMergeJoinExec` and `ShuffledHashJoinExec` are the only joins that break codegen at their children; `BroadcastHashJoinExec` does not, so its streaming child fuses into the same stage. A chain of broadcast joins (optionally driven by a sort-merge join at the bottom) collapses into one Java method with no `Row`/`UnsafeRow`/batch allocations between operators. Fusing broadcast joins is safe because the build sides are `HashedRelation`s already resident in memory before the stage runs; Spark picks a broadcast join only when the build side fits under `spark.sql.autoBroadcastJoinThreshold`, so no fused build side spills. DataFusion is vectorized batch-at-a-time with no codegen. Materialization between joins lives in `build_batch_from_indices`, called from `process_probe_batch`; for each output column it does `take(array, indices)` into a fresh Arrow buffer. Probing itself (`lookup_join_hashmap`) produces index pairs in reusable scratch buffers and does not touch payload columns. The probe work is already separate from materialization; they are just colocated in one operator. Velox and DuckDB avoid inter-operator materialization via dictionary-vector wrapping (lazy materialization across operators), which would require every downstream DataFusion operator to handle dictionary-wrapped inputs without flattening, an engine-wide change. ## Solution 1: detect the pattern in CometExecRule and fall back to Spark Identify a maximal chain of WholeStageCodegen-fusible operators (a run of `BroadcastHashJoinExec` on a single streaming spine, optionally with the sort-merge join that drives it) and decline to convert it, leaving it for `CollapseCodegenStages` to fuse. This restores the fallback plan's behavior for the regressing shape. `CometExecRule` already has both mechanisms this needs: - A pre-pass that tags nodes to skip, mirroring `tagUnsafePartialAggregates` (sets a `TreeNodeTag` read later during conversion). - A post-transform revert, mirroring `revertRedundantColumnarShuffle` (added in #4004), which reverts a `CometExec` back to its `originalPlan` when conversion only added overhead. Detection is structural and cheap, and the fallback cascades: because a node is only converted when all of its children are native, declining the bottom driver keeps the entire chain above it on Spark for free. Trade-offs: - Pro: small change, reuses existing patterns, strictly no worse than the current fallback plan for the matched shape. - Con: unlike `CollapseCodegenStages` (where fusion is never worse for Spark), declining a Comet conversion forfeits the common-case native win. The full suite is flat with q72 the lone outlier, so a blanket structural rule (for example "fall back any broadcast-join chain of length >= N") risks net regression across other queries. - The decision criterion is the hard part. A trigger correlated with a large intermediate is needed. The non-equi / range join condition is a natural signal (it is the cardinality amplifier here, and it is exactly what the experimental flag gates). AQE runtime statistics do not help directly, because the row expansion happens inside the stage (the sort-merge join output is not a shuffle boundary), so the chain's input stats do not reveal it. - Leaves performance on the table: the chain runs on the Spark JVM rather than natively. ## Solution 2: fused native operator A Comet native operator (working name `MultiHashJoinExec`) that replaces a chain of adjacent broadcast `HashJoinExec` nodes sharing one streaming probe. It probes each build side and composes gather indices through the chain, materializing the output `RecordBatch` once at the end rather than between every join. The operator emits the same `RecordBatch` the chain of `HashJoinExec`s would produce, so results are unchanged when the rule does not fire. This lives in Comet's native crate and requires no DataFusion changes for an initial implementation; it could be proposed upstream later if it generalizes. ### Components 1. `MultiHashJoinExec` - the `ExecutionPlan` that replaces N adjacent `HashJoinExec` nodes. Holds K build-side hash tables, a single streaming probe child, a stage plan, and the output schema. 2. `MultiHashJoinStream` - the per-partition stream, a state machine paralleling `HashJoinStream`. 3. `JoinStage` / `LazyState` - an internal pipeline. Each stage transforms `LazyState -> LazyState`; a final `MaterializeStage` produces the output. Stage kinds: `ProbeStage`, `ProjectStage` (column-ref only), `MaterializeStage`. 4. A Comet planning pass that pattern-matches the chain and rewrites it. ### Lazy index composition `LazyState` carries the probe batch, the current probe-side gather indices, and a `(build_table, build_indices)` pair per executed stage. Each `ProbeStage` extracts join keys from either the probe input or a prior build side (without materializing the intermediate batch), probes its hash table, then rebases all prior indices through the new match indices. The final `MaterializeStage` performs one `take()` per (source, output column). The win is on payload columns: instead of `N joins x M payload columns` take calls, the operator pays one `take()` per (source, output column). Join-key columns still pay per stage, but those are small fixed columns. ### Planning rule predicate - a chain of `HashJoinExec` nodes on a shared probe driver - intermediate `ProjectionExec` with column-ref-only expressions allowed - every join is `PartitionMode::CollectLeft` - every join is `JoinType::Inner` - equi-join only (non-empty join keys, no `JoinFilter`) - chain length >= 2 On match, build the stage plan from the chain (rewiring column indices into the unified output schema) and emit one `MultiHashJoinExec`. Any deviation leaves the chain unchanged. Gate behind a config flag, default off for initial rollout. ### Phase 1 scope In scope: `PartitionMode::CollectLeft` only (broadcast build sides, no spilling), `JoinType::Inner` only, equi-join keys only (no `IS NOT DISTINCT FROM`), no `JoinFilter`, `ProbeStage` / `ProjectStage` (column-ref only) / `MaterializeStage`, intermediate column-ref-only `ProjectionExec`, chain length >= 2, off by default. Out of scope for phase 1: outer / semi / anti / mark joins, `Partitioned` join mode, join filters, intermediate `FilterExec`, expression-bearing intermediate `ProjectionExec`, dynamic filter pushdown across fused stages, spilling within fused stages. Rationale for the cuts: | Cut | Reason | |-----|--------| | CollectLeft only | Guarantees no spilling and no inter-stage partitioning requirements. Mirrors Spark's broadcast-join-only WSCG fusion. | | Inner only | Null-index composition across stages for outer joins is the likeliest source of subtle bugs. | | No JoinFilter | Composition semantics for multi-stage filters need design work. | | No FilterExec between joins | Same reason as JoinFilter; a `FilterStage` is the natural phase-2 addition. | | Column-ref Project only | General expression evaluation against the lazy form needs design. Column refs are trivial metadata. | | Off by default | Standard for a new optimization with novel semantics. | ### Caveat for q72 under phase 1 q72's row expansion originates from a `SortMergeJoin`, which the broadcast-only phase-1 scope would not fuse. The native sort-merge join is also the dominant per-task cost in the measurements above. So phase 1 would only collapse the broadcast-chain materializations (the smaller component for q72) and would leave the sort-merge join unchanged. For this query specifically, Solution 1 (or keeping the experimental flag off) is the more direct fix; Solution 2 is the general fix for broadcast-join chains that carry large intermediates. ### Future phases Phase 2: `FilterStage` for column-ref-comparable filters between stages; `LeftSemi` / `LeftAnti` join types; general `ProjectStage` with expression evaluation; equi-join with `JoinFilter` via the existing `apply_join_filter_to_indices`. Phase 3 (harder): outer joins (null-index composition across stages); dynamic filter pushdown across fused stages; `Partitioned` mode chains where partitioning aligns. Permanently out of scope: spilling within fused stages (fall back to vanilla `HashJoinExec` per stage instead, mirroring Spark's permanent broadcast-only fusion cut); cross-partition fusion across `RepartitionExec` boundaries. ### Testing strategy 1. Unit tests per stage with hand-crafted `LazyState` inputs. 2. A chain fuzz harness extending DataFusion's `join_fuzz.rs`, using a chain of vanilla `HashJoinExec` as the oracle and asserting equal outputs after sort. Vary stage count (2..=5), per-stage build cardinality and key uniqueness, key sourcing (probe vs prior build-side columns), interleaved column-ref `ProjectionExec`, NULL keys across stages, and empty build side at stage K. Reuse the existing `batch_sizes = [1, 2, 7, 49, 50, 51, 100]`. 3. Plan-shape snapshot tests on q72 and a few other star-schema queries, asserting the rule fires and produces `MultiHashJoinExec`, plus negative cases (must not fire on `Partitioned` mode, outer joins, or chains broken by `FilterExec`). 4. Benchmarks: q72 before/after at minimum, full TPC-DS to catch cases where fusion is a net loss (very low-selectivity build sides where index-composition overhead exceeds the saved `take()`s). ### Risk areas 1. Index composition correctness, particularly once nulls appear in later phases. 2. Dynamic filter pushdown interaction with a fused operator owning multiple build sides (deferred to phase 3, confirm it does not break in phase 1). 3. Build-side memory accounting: K hash tables live in one operator's reservation. Net pressure is unchanged from the separate operators, but the accounting may need updating. 4. `ReuseExchange` interaction: q72 reuses the `date_dim` broadcast and carries a dynamic-partition-pruning subquery broadcast. The fused operator holds an `Arc` per build side, so it should compose with reuse; worth a plan-shape test. 5. Column-index rewiring when collapsing N joins into a unified output schema is fiddly; snapshot tests should catch regressions. ## Scope comparison | | Solution 1: selective fallback | Solution 2: fused operator | |---|---|---| | Effort | Small; reuses existing tag / revert patterns | Large; new operator, rule, fuzz harness, phased | | Execution | Spark JVM (WSCG) for the matched chain | Native | | Best case | No worse than the current fallback plan | Removes inter-operator materialization, stays native | | Coverage of q72 | Full (restores the fallback plan) | Partial (broadcast chain only; sort-merge driver not fused) | | Main risk | Heuristic trigger; blanket rule risks net regression | Index-composition correctness; effort | The two are complementary. Solution 1 (or simply keeping the experimental sort-merge-join-with-filter flag at its default) is a near-term safety valve and the more direct fix for q72; Solution 2 is the general fix that keeps broadcast-join chains native. A reasonable path is to confirm the cause with a profile, land the safety valve as needed, and pursue the fused operator behind a config flag over time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
