[
https://issues.apache.org/jira/browse/SPARK-57487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-57487:
-----------------------------------
Labels: pull-request-available (was: )
> Support distributed map join for medium-sized build tables via SQL hint
> -----------------------------------------------------------------------
>
> Key: SPARK-57487
> URL: https://issues.apache.org/jira/browse/SPARK-57487
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 5.0.0
> Reporter: Yu Gan
> Priority: Major
> Labels: pull-request-available
>
> Broadcast hash join is limited by driver memory and executor broadcast
> threshold — tables between ~200MB and 2GB often fall back to expensive
> shuffle joins, causing unnecessary data movement on the probe side.
> This PR introduces {*}Distributed Map Join (DMJ){*}, a new join strategy
> that avoids full shuffle by:
> 1. Hash-partitioning the build side into a fixed number of shards across
> executors
> 2. Building a HashedRelation per shard with replica placement for fault
> tolerance
> 3. Probe side performs batched RPC lookups (with bloom filter
> pre-filtering) instead of shuffling
> The strategy is triggered explicitly via SQL hint:
> {code:java}
> SELECT /*+ DISTMAPJOIN(build_table(shard_count=5, replica_count=3)) */ ...
> FROM probe_table JOIN build_table ON ... {code}
> h3. Production benchmark
> A LEFT JOIN workload in our production environment (16k cores, 120TB
> memory):
> - Probe side: ~5PB
> - Build side: ~2GB
> - Before (shuffle hash join): probe-side shuffle write + join stages took
> *~5 hours*
> - After (distributed map join): single probe-side join stage took *~2 hours*
> - *~60% wall-time reduction* by eliminating the probe-side shuffle entirely
> h3. Supported join types
> Inner, LeftOuter, RightOuter, LeftSemi, LeftAnti, ExistenceJoin.
> h3. Key components
> - *ShardManager / ShardManagerMaster* (core): manages shard lifecycle,
> location tracking, and replica placement
> - {*}ShardExchangeExec{*}: physical exchange operator that partitions build
> side into shards
> - {*}DistributedMapJoinExec{*}: physical join operator with async batched
> RPC lookups
> - {*}ShardLookupService{*}: Netty-based RPC layer for shard data transfer
> (pluggable via {{{}spark.shard.service{}}})
> - Full AQE integration via ShardQueryStageExec
> This is similar in spirit to Hive's map join but distributed — no
> single-node bottleneck.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]