[ 
https://issues.apache.org/jira/browse/SPARK-57487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Gan updated SPARK-57487:
---------------------------
    Description: 
Broadcast hash join is limited by driver memory and executor broadcast 
threshold — tables between ~200MB and 2GB often fall back to expensive shuffle 
joins, causing unnecessary data movement on the probe side.

  This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy that 
avoids full shuffle by:
  1. Hash-partitioning the build side into a fixed number of shards across 
executors
  2. Building a HashedRelation per shard with replica placement for fault 
tolerance
  3. Probe side performs batched RPC lookups (with bloom filter pre-filtering) 
instead of shuffling

  The strategy is triggered explicitly via SQL hint:
{code:java}
  SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
  FROM probe_table JOIN build_table ON ... {code}
h3.   Production benchmark

  A LEFT JOIN workload in our production environment (16k cores, 120TB memory, 
DRA enabled):
  - Probe side: ~5PB
  - Build side: ~2GB
  - Before (shuffle hash join): probe-side shuffle write + join stages took *~5 
hours*
  - After (distributed map join): single probe-side join stage took *~2 hours*
  - *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
h3.   Supported join types

  Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
h3.   Key components

  - *ShardManager / ShardManagerMaster* (core): manages shard lifecycle, 
location tracking, and replica placement
  - {*}ShardExchangeExec{*}: physical exchange operator that partitions build 
side into shards
  - {*}DistributedMapJoinExec{*}: physical join operator with async batched RPC 
lookups
  - {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer 
(pluggable via {{{}spark.shard.service{}}})
  - Full AQE integration via ShardQueryStageExec

  This is similar in spirit to Hive's map join but distributed — no single-node 
bottleneck.

  was:
Broadcast hash join is limited by driver memory and executor broadcast 
threshold — tables between ~200MB and 2GB often fall back to expensive shuffle 
joins, causing unnecessary data movement on the probe side.

  This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy that 
avoids full shuffle by:
  1. Hash-partitioning the build side into a fixed number of shards across 
executors
  2. Building a HashedRelation per shard with replica placement for fault 
tolerance
  3. Probe side performs batched RPC lookups (with bloom filter pre-filtering) 
instead of shuffling

  The strategy is triggered explicitly via SQL hint:
{code:java}
  SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
  FROM probe_table JOIN build_table ON ... {code}
h3.   Production benchmark

  A LEFT JOIN workload in our production environment (16k cores, 120TB memory):
  - Probe side: ~5PB
  - Build side: ~2GB
  - Before (shuffle hash join): probe-side shuffle write + join stages took *~5 
hours*
  - After (distributed map join): single probe-side join stage took *~2 hours*
  - *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
h3.   Supported join types

  Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
h3.   Key components

  - *ShardManager / ShardManagerMaster* (core): manages shard lifecycle, 
location tracking, and replica placement
  - {*}ShardExchangeExec{*}: physical exchange operator that partitions build 
side into shards
  - {*}DistributedMapJoinExec{*}: physical join operator with async batched RPC 
lookups
  - {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer 
(pluggable via {{{}spark.shard.service{}}})
  - Full AQE integration via ShardQueryStageExec

  This is similar in spirit to Hive's map join but distributed — no single-node 
bottleneck.


> Support distributed map join for medium-sized build tables via SQL hint
> -----------------------------------------------------------------------
>
>                 Key: SPARK-57487
>                 URL: https://issues.apache.org/jira/browse/SPARK-57487
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 5.0.0
>            Reporter: Yu Gan
>            Priority: Major
>              Labels: pull-request-available
>
> Broadcast hash join is limited by driver memory and executor broadcast 
> threshold — tables between ~200MB and 2GB often fall back to expensive 
> shuffle joins, causing unnecessary data movement on the probe side.
>   This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy 
> that avoids full shuffle by:
>   1. Hash-partitioning the build side into a fixed number of shards across 
> executors
>   2. Building a HashedRelation per shard with replica placement for fault 
> tolerance
>   3. Probe side performs batched RPC lookups (with bloom filter 
> pre-filtering) instead of shuffling
>   The strategy is triggered explicitly via SQL hint:
> {code:java}
>   SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
>   FROM probe_table JOIN build_table ON ... {code}
> h3.   Production benchmark
>   A LEFT JOIN workload in our production environment (16k cores, 120TB 
> memory, DRA enabled):
>   - Probe side: ~5PB
>   - Build side: ~2GB
>   - Before (shuffle hash join): probe-side shuffle write + join stages took 
> *~5 hours*
>   - After (distributed map join): single probe-side join stage took *~2 hours*
>   - *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
> h3.   Supported join types
>   Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
> h3.   Key components
>   - *ShardManager / ShardManagerMaster* (core): manages shard lifecycle, 
> location tracking, and replica placement
>   - {*}ShardExchangeExec{*}: physical exchange operator that partitions build 
> side into shards
>   - {*}DistributedMapJoinExec{*}: physical join operator with async batched 
> RPC lookups
>   - {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer 
> (pluggable via {{{}spark.shard.service{}}})
>   - Full AQE integration via ShardQueryStageExec
>   This is similar in spirit to Hive's map join but distributed — no 
> single-node bottleneck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to