[ 
https://issues.apache.org/jira/browse/SPARK-32634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-32634:
------------------------------------

    Assignee:     (was: Apache Spark)

> Introduce sort-based fallback mechanism for shuffled hash join 
> ---------------------------------------------------------------
>
>                 Key: SPARK-32634
>                 URL: https://issues.apache.org/jira/browse/SPARK-32634
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Minor
>
> A major pain point for spark users to stay away from using shuffled hash join 
> is out of memory issue. Shuffled hash join tends to have OOM issue because it 
> allocates in-memory hashed relation (`UnsafeHashedRelation` or 
> `LongHashedRelation`) for build side, and there's no recovery (e.g. 
> fallback/spill) once the size of hashed relation grows and cannot fit in 
> memory. On the other hand, shuffled hash join is more CPU and IO efficient 
> than sort merge join when joining one large table and a small table (but 
> small table is too large to be broadcasted), as SHJ does not sort the large 
> table, but SMJ needs to do that.
> To improve the reliability of shuffled hash join, a fallback mechanism can be 
> introduced to avoid shuffled hash join OOM issue completely. Similarly we 
> already have a fallback to sort-based aggregation for hash aggregate. The 
> idea is:
> (1).Build hashed relation as current, but monitor the hashed relation size 
> when inserting each build side row. If size of hashed relation being always 
> smaller than a configurable threshold, go to (2.1), else go to (2.2).
> (2.1).Current shuffled hash join logic: reading stream side rows and probing 
> hashed relation.
> (2.2).Fall back to sort merge join: Sort stream side rows, and sort build 
> side rows (iterate rows already in hashed relation (e.g. through 
> `BytesToBytesMap.destructiveIterator`), then iterate rest of un-read build 
> side rows). Then doing sort merge join for stream + build side rows.
>  
> Note:
> (1).the fallback is dynamic and happened per task, which means task 0 can 
> incur the fallback e.g. if it has a big build side, but task 1,2 don't need 
> to incur the fallback depending on the size of hashed relation.
> (2).there's no major code change for SHJ and SMJ. Major change is around 
> HashedRelation to introduce some new methods, e.g. 
> `HashedRelation.destructiveValues()` to return an Iterator of build side rows 
> in hashed relation and cleaning up hashed relation along the way.
> (3).we have run this feature by default in our internal fork more than 2 
> years, and we benefit a lot from it with users can choose to use SHJ, and we 
> don't need to worry about SHJ reliability (see 
> https://issues.apache.org/jira/browse/SPARK-21505 for the original proposal 
> from our side, I tweak here to make it less intrusive and more acceptable, 
> e.g. not introducing a separate join operator, but doing the fallback 
> automatically inside SHJ operator itself).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to