Cheng Su created SPARK-32634:
--------------------------------
Summary: Introduce sort-based fallback mechanism for shuffled hash
join
Key: SPARK-32634
URL: https://issues.apache.org/jira/browse/SPARK-32634
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.1.0
Reporter: Cheng Su
A major pain point for spark users to stay away from using shuffled hash join
is out of memory issue. Shuffled hash join tends to have OOM issue because it
allocates in-memory hashed relation (`UnsafeHashedRelation` or
`LongHashedRelation`) for build side, and there's no recovery once the size of
hashed relation grows and cannot fit in memory.On the other hand, shuffled hash
join is more CPU and IO efficient then sort merge join when joining one large
table and a small table (but small table is too large to be broadcasted), as
SHJ does not sort the large table, but SMJ needs to do that.
To improve the reliability of shuffled hash join, a fallback mechanism can be
introduced to avoid shuffled hash join OOM issue completely. Similarly we
already have a fallback to sort-based aggregation for hash aggregate. The idea
is:
(1).Build hashed relation as current, but monitor the hashed relation size when
inserting each build side row. If size of hashed relation being always smaller
than a configurable threshold, go to (2.1), else go to (2.2).
(2.1).Current shuffled hash join logic: reading stream side rows and probing
hashed relation.
(2.2).Fall back to sort merge join: Sort stream side rows, and sort build side
rows (iterate rows already in hashed relation (e.g. through
`BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build side
rows). Then doing sort merge join for stream + build side rows.
Note:
(1).the fallback is dynamic and happened per task, which means task 0 can incur
the fallback e.g. if it has a big build side, but task 1,2 don't need to incur
the fallback depending on the size of hashed relation.
(2).there's no major code change for SHJ and SMJ. Major change is around
HashedRelation to introduce some new methods, e.g.
`HashedRelation.destructiveValues()` to return an Iterator of build side rows
in hashed relation and cleaning up hashed relation along the way.
(3).we have run this feature by default in our internal fork more than 2 years,
and we benefit a lot from it with users can choose to use SHJ, and we don't
need to worry about SHJ reliability (see
https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal
from our side, I tweak here to make it less intrusive and more acceptable, e.g.
not introducing a separate join operator, but doing the fallback automatically
inside SHJ operator itself).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]