wirybeaver opened a new pull request, #1779: URL: https://github.com/apache/datafusion-ballista/pull/1779
## Summary Ports Spark's [`OptimizeSkewedJoin`](https://github.com/apache/spark/blob/v3.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala) AQE rule into Ballista. Replaces the previously-attempted file-list sharding approach (closed in #1718) with Spark's split-and-replicate pattern, which directly addresses the highest-value skew shape (skewed joins) and is correct by construction without needing aggregate-aware rewrites or salting. **Draft** — landing incrementally as a stack of commits on this single PR: - **C1** (this commit): `BallistaConfig` knobs + `SessionConfigExt` accessors + `SkewJoinPlan` / `SkewJoinShard` carrier types. No rule, no behavior change. - **C2**: `ShuffleReaderExec::try_new_skew_join` + adapter wiring. - **C3**: `OptimizeSkewedJoinRule` — detection, bin-pack split, cartesian replication, join-type allowlist, bilateral attach. - **C4**: `is_skew_join` flag for `SortMergeJoinExec` (may require an upstream DataFusion patch). - **C5**: TPC-H end-to-end synthetic-skew test. ## Design Concept mapping (full design in PR description as it evolves): | Spark | Ballista equivalent | |---|---| | `OptimizeSkewedJoin` | `OptimizeSkewedJoinRule` in `state/aqe/optimizer_rule/` | | `MapOutputStatistics` | `ExchangeExec::shuffle_partitions()` + `PartitionLocation::partition_stats.num_bytes()` | | `PartialReducerPartitionSpec(reducerIdx, [startMap, endMap), size)` | `SkewJoinShard { upstream_idx, start_map_idx, end_map_idx }` | | Cartesian pair-up across legs | Pre-computed `Vec<SkewJoinShard>` of length `K'` on both legs | | `isSkewJoin = true` flag | TBD — may need a flag on `SortMergeJoinExec` | Defaults match Spark; rule is **opt-in** via `ballista.planner.skew_join.enabled=false`. ## Why split+replicate over file-list sharding (the closed approach) File-list sharding (#1718) had to bail on any `HashPartitioned` / `SinglePartition` consumer — joins, `FinalPartitioned` aggregates, global limits — because it scattered rows that the consumer assumed colocated. That covered most realistic workloads. Spark's split+replicate pattern handles the most common skew shape (joins) without that limitation: replicating the non-skewed side N times preserves the join's correctness while distributing the skewed side's work across N tasks. ## Test plan - [ ] C1: `cargo check --workspace` passes (done locally). - [ ] C2: synthetic adapter test — reader serves paired `(upstream_idx, map_range)` tuples correctly. - [ ] C3: rule unit tests (detection edge cases, bin-pack, allowlist, bilateral attach); SQL-level integration tests with synthetic skewed PartitionLocation stats. - [ ] C4: integration test with skewed sort-merge join. - [ ] C5: TPC-H end-to-end ≥30% wall-clock improvement on a synthetically-skewed join key vs rule disabled; row-identical results. - [ ] Regression: existing coalesce tests stay green; full TPC-H suite produces row-identical results with rule on vs off. Closes #1718 in favor of this approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
