Cheng Su created SPARK-32634:
--------------------------------

             Summary: Introduce sort-based fallback mechanism for shuffled hash 
join 
                 Key: SPARK-32634
                 URL: https://issues.apache.org/jira/browse/SPARK-32634
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Cheng Su


A major pain point for spark users to stay away from using shuffled hash join 
is out of memory issue. Shuffled hash join tends to have OOM issue because it 
allocates in-memory hashed relation (`UnsafeHashedRelation` or 
`LongHashedRelation`) for build side, and there's no recovery once the size of 
hashed relation grows and cannot fit in memory.On the other hand, shuffled hash 
join is more CPU and IO efficient then sort merge join when joining one large 
table and a small table (but small table is too large to be broadcasted), as 
SHJ does not sort the large table, but SMJ needs to do that.

To improve the reliability of shuffled hash join, a fallback mechanism can be 
introduced to avoid shuffled hash join OOM issue completely. Similarly we 
already have a fallback to sort-based aggregation for hash aggregate. The idea 
is:

(1).Build hashed relation as current, but monitor the hashed relation size when 
inserting each build side row. If size of hashed relation being always smaller 
than a configurable threshold, go to (2.1), else go to (2.2).

(2.1).Current shuffled hash join logic: reading stream side rows and probing 
hashed relation.

(2.2).Fall back to sort merge join: Sort stream side rows, and sort build side 
rows (iterate rows already in hashed relation (e.g. through 
`BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build side 
rows). Then doing sort merge join for stream + build side rows.

 

Note:

(1).the fallback is dynamic and happened per task, which means task 0 can incur 
the fallback e.g. if it has a big build side, but task 1,2 don't need to incur 
the fallback depending on the size of hashed relation.

(2).there's no major code change for SHJ and SMJ. Major change is around 
HashedRelation to introduce some new methods, e.g. 
`HashedRelation.destructiveValues()` to return an Iterator of build side rows 
in hashed relation and cleaning up hashed relation along the way.

(3).we have run this feature by default in our internal fork more than 2 years, 
and we benefit a lot from it with users can choose to use SHJ, and we don't 
need to worry about SHJ reliability (see 
https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal 
from our side, I tweak here to make it less intrusive and more acceptable, e.g. 
not introducing a separate join operator, but doing the fallback automatically 
inside SHJ operator itself).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to