James Xu created SPARK-56906:
--------------------------------
Summary: [SQL] Add SKEW_JOIN hint with SkewKeyJoinExec to
parallelize output-skewed joins
Key: SPARK-56906
URL: https://issues.apache.org/jira/browse/SPARK-56906
Project: Spark
Issue Type: Improvement
Components: Optimizer
Affects Versions: 4.2.0
Reporter: James Xu
h2. Problem:
In production fact-to-fact joins, a single reducer can become a straggler even
when all shuffle partitions appear balanced. This happens when a hot key appears
with high frequency on *both* sides of the join, producing a Cartesian
explosion of output rows concentrated on one reducer. Adaptive Query Execution
(AQE) sees no oversized shuffle blocks and does nothing, yet one reducer
produces 50M+ rows while others finish in seconds.
Example query pattern:
{code:java}
SELECT /*+ SKEW_JOIN(orders(user_id)) */
o.order_id, o.amount, e.event_type, e.ts
FROM orders o
JOIN order_events e ON o.user_id = e.user_id;
-- With data:
-- orders: 5,000 rows for user_id = 42, scattered across ~1,000 mappers
-- order_events: 10,000 rows for user_id = 42, scattered across ~1,000 mappers
-- Output for user_id = 42: 5,000 × 10,000 = 50,000,000 rows on one reducer
{code}
This pattern is common in:
- Fact-to-fact joins (orders JOIN page_views ON user_id)
- Event correlation (impressions JOIN conversions ON campaign_id)
- Graph traversals (nodes JOIN edges on hub nodes)
- Sessionization (events JOIN sessions on session_id)
h2. Root Cause:
AQE's OptimizeSkewedJoin rule inspects MapOutputStatistics (shuffle block
sizes) and splits partitions that exceed a size threshold. The rule has no
visibility into key frequencies within blocks. When a hot key's rows are
spread thinly across many mappers, each per-mapper block is small and the
partition size appears normal. AQE never triggers.
Even if AQE did detect the case, its remedy — splitting the skewed partition
along mapper ranges and replicating the entire other-side partition K times —
would multiply read traffic for all cold keys in the same hash bucket, not
just the hot key. The actual problem is a Cartesian product on a single key,
not a large shuffle block.
h2. Solution:
Add a SKEW_JOIN hint that triggers a new physical operator,
SkewKeyJoinExec, as an alternative join strategy. The operator:
1. Samples the skewed side at prepare time (configurable fraction, default 10%)
2. Identifies hot keys by frequency, using an adaptive threshold based on
estimated row count and shuffle partition count
3. Rewrites the join into two branches:
Hot branch: BroadcastHashJoinExec(A_hot, B_hot)
-- streamed side filtered to hot keys, broadcast side filtered
-- to matching hot keys; executes on all executors holding
-- hot-key rows (output parallelization)
Cold branch: SortMergeJoinExec(A_cold, B_cold)
-- both sides exclude hot keys; shuffle partitions are now
-- balanced with no residual skew
Union: UnionExec(hot_result, cold_result)
Algebraic identity (inner join, A is skewed side):
A ⋈ B = (A.filter(k ∈ H) ⋈ B.filter(k ∈ H))
∪ (A.filter(k ∉ H) ⋈ B.filter(k ∉ H))
The rewrite is correct for inner and outer joins where the skewed side is the
streamed/large side. Safety guards:
- NULL keys are excluded from hot-key sampling and routed through the SMJ branch
- If broadcast-side join key types do not match skew column types (e.g. due to
implicit cast), falls back to full SortMergeJoin with a warning
- Non-deterministic expressions on either side cause rejection with
INVALID_SKEW_JOIN_HINT, since reading the side twice could produce
different values
Hint syntax:
{code:java}
/*+ SKEW_JOIN(table_name(col1, col2)) */
/*+ SKEW_JOIN(orders(user_id)) */
/*+ SKEW_JOIN(t1(a, b)) */ -- multi-column equi-join {code}
SKEWJOIN (no underscore) is accepted as an alias.
h2. Expected Impact:
In a synthetic workload matching the fact-to-fact join pattern above (5K × 10K
hot key rows, 1,000 mappers, 200 reducers), projected metrics:
- Before: one reducer produces 50M rows; stage wall-clock dominated by
straggler running ~45–60 minutes; single executor OOM risk
- After: hot-key rows distributed across all executors via broadcast join;
cold branch SMJ has no skew; stage wall-clock drops to ~2–3 minutes;
no single-point bottleneck
AQE remains the default for partition-size skew. SKEW_JOIN is an opt-in hint
for key-frequency skew that AQE cannot see.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]