wirybeaver opened a new pull request, #1779:
URL: https://github.com/apache/datafusion-ballista/pull/1779

   ## Summary
   
   Ports Spark's 
[`OptimizeSkewedJoin`](https://github.com/apache/spark/blob/v3.5.1/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala)
 AQE rule into Ballista. Replaces the previously-attempted file-list sharding 
approach (closed in #1718) with Spark's split-and-replicate pattern, which 
directly addresses the highest-value skew shape (skewed joins) and is correct 
by construction without needing aggregate-aware rewrites or salting.
   
   **Draft** — landing incrementally as a stack of commits on this single PR:
   
   - **C1** (this commit): `BallistaConfig` knobs + `SessionConfigExt` 
accessors + `SkewJoinPlan` / `SkewJoinShard` carrier types. No rule, no 
behavior change.
   - **C2**: `ShuffleReaderExec::try_new_skew_join` + adapter wiring.
   - **C3**: `OptimizeSkewedJoinRule` — detection, bin-pack split, cartesian 
replication, join-type allowlist, bilateral attach.
   - **C4**: `is_skew_join` flag for `SortMergeJoinExec` (may require an 
upstream DataFusion patch).
   - **C5**: TPC-H end-to-end synthetic-skew test.
   
   ## Design
   
   Concept mapping (full design in PR description as it evolves):
   
   | Spark | Ballista equivalent |
   |---|---|
   | `OptimizeSkewedJoin` | `OptimizeSkewedJoinRule` in 
`state/aqe/optimizer_rule/` |
   | `MapOutputStatistics` | `ExchangeExec::shuffle_partitions()` + 
`PartitionLocation::partition_stats.num_bytes()` |
   | `PartialReducerPartitionSpec(reducerIdx, [startMap, endMap), size)` | 
`SkewJoinShard { upstream_idx, start_map_idx, end_map_idx }` |
   | Cartesian pair-up across legs | Pre-computed `Vec<SkewJoinShard>` of 
length `K'` on both legs |
   | `isSkewJoin = true` flag | TBD — may need a flag on `SortMergeJoinExec` |
   
   Defaults match Spark; rule is **opt-in** via 
`ballista.planner.skew_join.enabled=false`.
   
   ## Why split+replicate over file-list sharding (the closed approach)
   
   File-list sharding (#1718) had to bail on any `HashPartitioned` / 
`SinglePartition` consumer — joins, `FinalPartitioned` aggregates, global 
limits — because it scattered rows that the consumer assumed colocated. That 
covered most realistic workloads. Spark's split+replicate pattern handles the 
most common skew shape (joins) without that limitation: replicating the 
non-skewed side N times preserves the join's correctness while distributing the 
skewed side's work across N tasks.
   
   ## Test plan
   
   - [ ] C1: `cargo check --workspace` passes (done locally).
   - [ ] C2: synthetic adapter test — reader serves paired `(upstream_idx, 
map_range)` tuples correctly.
   - [ ] C3: rule unit tests (detection edge cases, bin-pack, allowlist, 
bilateral attach); SQL-level integration tests with synthetic skewed 
PartitionLocation stats.
   - [ ] C4: integration test with skewed sort-merge join.
   - [ ] C5: TPC-H end-to-end ≥30% wall-clock improvement on a 
synthetically-skewed join key vs rule disabled; row-identical results.
   - [ ] Regression: existing coalesce tests stay green; full TPC-H suite 
produces row-identical results with rule on vs off.
   
   Closes #1718 in favor of this approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to