Yu Gan created SPARK-57487:
------------------------------
Summary: Support distributed map join for medium-sized build
tables via SQL hint
Key: SPARK-57487
URL: https://issues.apache.org/jira/browse/SPARK-57487
Project: Spark
Issue Type: New Feature
Components: SQL
Affects Versions: 5.0.0
Reporter: Yu Gan
Broadcast hash join is limited by driver memory and executor broadcast
threshold — tables between ~200MB and 2GB often fall back to expensive shuffle
joins, causing unnecessary data movement on the probe side.
This PR introduces *Distributed Map Join (DMJ)*, a new join strategy that
avoids full shuffle by:
1. Hash-partitioning the build side into a fixed number of shards across
executors
2. Building a HashedRelation per shard with replica placement for fault
tolerance
3. Probe side performs batched RPC lookups (with bloom filter pre-filtering)
instead of shuffling
The strategy is triggered explicitly via SQL hint:
\{code}
SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
FROM probe_table JOIN build_table ON ...
\{code}
h3. Production benchmark
A LEFT JOIN workload in our production environment (16k cores, 120TB memory):
- Probe side: ~5PB
- Build side: ~2GB
- Before (shuffle hash join): probe-side shuffle write + join stages took *~5
hours*
- After (distributed map join): single probe-side join stage took *~2 hours*
- *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
h3. Supported join types
Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
h3. Key components
- *ShardManager / ShardManagerMaster* (core): manages shard lifecycle,
location tracking, and replica placement
- *ShardExchangeExec*: physical exchange operator that partitions build side
into shards
- *DistributedMapJoinExec*: physical join operator with async batched RPC
lookups
- *ShardLookupService*: Netty-based RPC layer for shard data transfer
(pluggable via \{{spark.shard.service}})
- Full AQE integration via ShardQueryStageExec
This is similar in spirit to Hive's map join but distributed — no single-node
bottleneck.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]