[
https://issues.apache.org/jira/browse/SPARK-57487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yu Gan updated SPARK-57487:
---------------------------
Description:
Broadcast hash join is limited by driver memory and executor broadcast
threshold — tables between ~200MB and 2GB often fall back to expensive shuffle
joins, causing unnecessary data movement on the probe side.
This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy that
avoids full shuffle by:
1. Hash-partitioning the build side into a fixed number of shards across
executors
2. Building a HashedRelation per shard with replica placement for fault
tolerance
3. Probe side performs batched RPC lookups (with bloom filter pre-filtering)
instead of shuffling
The strategy is triggered explicitly via SQL hint:
{code:java}
SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
FROM probe_table JOIN build_table ON ... {code}
h3. Production benchmark
A LEFT JOIN workload in our production environment (16k cores, 120TB memory,
DRA enabled):
- Probe side: ~5PB
- Build side: ~2GB
- Before (shuffle hash join): probe-side shuffle write + join stages took *~5
hours*
- After (distributed map join): single probe-side join stage took *~2 hours*
- *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
h3. Supported join types
Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
h3. Key components
- *ShardManager / ShardManagerMaster* (core): manages shard lifecycle,
location tracking, and replica placement
- {*}ShardExchangeExec{*}: physical exchange operator that partitions build
side into shards
- {*}DistributedMapJoinExec{*}: physical join operator with async batched RPC
lookups
- {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer
(pluggable via {{{}spark.shard.service{}}})
- Full AQE integration via ShardQueryStageExec
This is similar in spirit to Hive's map join but distributed — no single-node
bottleneck.
was:
Broadcast hash join is limited by driver memory and executor broadcast
threshold — tables between ~200MB and 2GB often fall back to expensive shuffle
joins, causing unnecessary data movement on the probe side.
This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy that
avoids full shuffle by:
1. Hash-partitioning the build side into a fixed number of shards across
executors
2. Building a HashedRelation per shard with replica placement for fault
tolerance
3. Probe side performs batched RPC lookups (with bloom filter pre-filtering)
instead of shuffling
The strategy is triggered explicitly via SQL hint:
{code:java}
SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
FROM probe_table JOIN build_table ON ... {code}
h3. Production benchmark
A LEFT JOIN workload in our production environment (16k cores, 120TB memory):
- Probe side: ~5PB
- Build side: ~2GB
- Before (shuffle hash join): probe-side shuffle write + join stages took *~5
hours*
- After (distributed map join): single probe-side join stage took *~2 hours*
- *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
h3. Supported join types
Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
h3. Key components
- *ShardManager / ShardManagerMaster* (core): manages shard lifecycle,
location tracking, and replica placement
- {*}ShardExchangeExec{*}: physical exchange operator that partitions build
side into shards
- {*}DistributedMapJoinExec{*}: physical join operator with async batched RPC
lookups
- {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer
(pluggable via {{{}spark.shard.service{}}})
- Full AQE integration via ShardQueryStageExec
This is similar in spirit to Hive's map join but distributed — no single-node
bottleneck.
> Support distributed map join for medium-sized build tables via SQL hint
> -----------------------------------------------------------------------
>
> Key: SPARK-57487
> URL: https://issues.apache.org/jira/browse/SPARK-57487
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 5.0.0
> Reporter: Yu Gan
> Priority: Major
> Labels: pull-request-available
>
> Broadcast hash join is limited by driver memory and executor broadcast
> threshold — tables between ~200MB and 2GB often fall back to expensive
> shuffle joins, causing unnecessary data movement on the probe side.
> This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy
> that avoids full shuffle by:
> 1. Hash-partitioning the build side into a fixed number of shards across
> executors
> 2. Building a HashedRelation per shard with replica placement for fault
> tolerance
> 3. Probe side performs batched RPC lookups (with bloom filter
> pre-filtering) instead of shuffling
> The strategy is triggered explicitly via SQL hint:
> {code:java}
> SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
> FROM probe_table JOIN build_table ON ... {code}
> h3. Production benchmark
> A LEFT JOIN workload in our production environment (16k cores, 120TB
> memory, DRA enabled):
> - Probe side: ~5PB
> - Build side: ~2GB
> - Before (shuffle hash join): probe-side shuffle write + join stages took
> *~5 hours*
> - After (distributed map join): single probe-side join stage took *~2 hours*
> - *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
> h3. Supported join types
> Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
> h3. Key components
> - *ShardManager / ShardManagerMaster* (core): manages shard lifecycle,
> location tracking, and replica placement
> - {*}ShardExchangeExec{*}: physical exchange operator that partitions build
> side into shards
> - {*}DistributedMapJoinExec{*}: physical join operator with async batched
> RPC lookups
> - {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer
> (pluggable via {{{}spark.shard.service{}}})
> - Full AQE integration via ShardQueryStageExec
> This is similar in spirit to Hive's map join but distributed — no
> single-node bottleneck.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]