[
https://issues.apache.org/jira/browse/SPARK-32634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cheng Su updated SPARK-32634:
-----------------------------
Parent: SPARK-32461
Issue Type: Sub-task (was: Improvement)
> Introduce sort-based fallback mechanism for shuffled hash join
> ---------------------------------------------------------------
>
> Key: SPARK-32634
> URL: https://issues.apache.org/jira/browse/SPARK-32634
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Affects Versions: 3.1.0
> Reporter: Cheng Su
> Priority: Minor
>
> A major pain point for spark users to stay away from using shuffled hash join
> is out of memory issue. Shuffled hash join tends to have OOM issue because it
> allocates in-memory hashed relation (`UnsafeHashedRelation` or
> `LongHashedRelation`) for build side, and there's no recovery once the size
> of hashed relation grows and cannot fit in memory.On the other hand, shuffled
> hash join is more CPU and IO efficient then sort merge join when joining one
> large table and a small table (but small table is too large to be
> broadcasted), as SHJ does not sort the large table, but SMJ needs to do that.
> To improve the reliability of shuffled hash join, a fallback mechanism can be
> introduced to avoid shuffled hash join OOM issue completely. Similarly we
> already have a fallback to sort-based aggregation for hash aggregate. The
> idea is:
> (1).Build hashed relation as current, but monitor the hashed relation size
> when inserting each build side row. If size of hashed relation being always
> smaller than a configurable threshold, go to (2.1), else go to (2.2).
> (2.1).Current shuffled hash join logic: reading stream side rows and probing
> hashed relation.
> (2.2).Fall back to sort merge join: Sort stream side rows, and sort build
> side rows (iterate rows already in hashed relation (e.g. through
> `BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build
> side rows). Then doing sort merge join for stream + build side rows.
>
> Note:
> (1).the fallback is dynamic and happened per task, which means task 0 can
> incur the fallback e.g. if it has a big build side, but task 1,2 don't need
> to incur the fallback depending on the size of hashed relation.
> (2).there's no major code change for SHJ and SMJ. Major change is around
> HashedRelation to introduce some new methods, e.g.
> `HashedRelation.destructiveValues()` to return an Iterator of build side rows
> in hashed relation and cleaning up hashed relation along the way.
> (3).we have run this feature by default in our internal fork more than 2
> years, and we benefit a lot from it with users can choose to use SHJ, and we
> don't need to worry about SHJ reliability (see
> https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal
> from our side, I tweak here to make it less intrusive and more acceptable,
> e.g. not introducing a separate join operator, but doing the fallback
> automatically inside SHJ operator itself).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]