Yu Gan created SPARK-57487:
------------------------------

             Summary: Support distributed map join for medium-sized build 
tables via SQL hint
                 Key: SPARK-57487
                 URL: https://issues.apache.org/jira/browse/SPARK-57487
             Project: Spark
          Issue Type: New Feature
          Components: SQL
    Affects Versions: 5.0.0
            Reporter: Yu Gan


Broadcast hash join is limited by driver memory and executor broadcast 
threshold — tables between ~200MB and 2GB often fall back to expensive shuffle 
joins, causing unnecessary data movement on the probe side.

  This PR introduces *Distributed Map Join (DMJ)*, a new join strategy that 
avoids full shuffle by:
  1. Hash-partitioning the build side into a fixed number of shards across 
executors
  2. Building a HashedRelation per shard with replica placement for fault 
tolerance
  3. Probe side performs batched RPC lookups (with bloom filter pre-filtering) 
instead of shuffling

  The strategy is triggered explicitly via SQL hint:
  \{code}
  SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
  FROM probe_table JOIN build_table ON ...
  \{code}

  h3. Production benchmark

  A LEFT JOIN workload in our production environment (16k cores, 120TB memory):
  - Probe side: ~5PB
  - Build side: ~2GB
  - Before (shuffle hash join): probe-side shuffle write + join stages took *~5 
hours*
  - After (distributed map join): single probe-side join stage took *~2 hours*
  - *~60% wall-time reduction* by eliminating the probe-side shuffle entirely

  h3. Supported join types

  Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.

  h3. Key components

  - *ShardManager / ShardManagerMaster* (core): manages shard lifecycle, 
location tracking, and replica placement
  - *ShardExchangeExec*: physical exchange operator that partitions build side 
into shards
  - *DistributedMapJoinExec*: physical join operator with async batched RPC 
lookups
  - *ShardLookupService*: Netty-based RPC layer for shard data transfer 
(pluggable via \{{spark.shard.service}})
  - Full AQE integration via ShardQueryStageExec

  This is similar in spirit to Hive's map join but distributed — no single-node 
bottleneck.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to