[ 
https://issues.apache.org/jira/browse/SPARK-21505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179202#comment-17179202
 ] 

Cheng Su commented on SPARK-21505:
----------------------------------

Just FYI - a new proposal is in 
https://issues.apache.org/jira/browse/SPARK-32634 for the same functionality. 
Please follow discussion and progress there, thanks.

> A dynamic join operator to improve the join reliability
> -------------------------------------------------------
>
>                 Key: SPARK-21505
>                 URL: https://issues.apache.org/jira/browse/SPARK-21505
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 2.2.0, 2.3.0, 3.0.0
>            Reporter: Lin
>            Priority: Major
>              Labels: features
>
> As we know, hash join is more efficient than sort merge join. But today hash 
> join is not so widely used because it may fail with OutOfMemory (OOM) error 
> due to limited memory resource, data skew, statistics mis-estimation and so 
> on. For example, if we apply shuffle hash join on an uneven distributed 
> dataset, some partitions might be so large  that we cannot make a Hash table 
> for this particular partition causing OOM error. When OOM happens, current 
> Spark technology will throw an Exception, resulting in job failure. On the 
> other hand, if sort-merge join is used, there will be shuffle, sorting and 
> extra spill, causing the degradation of the join. Considering the efficiency 
> of hash join, we want to propose a fallback mechanism to dynamically use hash 
> join or sort-merge join at runtime at task level to provide a more reliable 
> join operation.
> This new dynamic join operator internally implements the logic of HashJoin, 
> Iterator Reconstruct, Sort, and MergeJoin.  We show the process of this 
> dynamic join method as following:
> HashJoin: We start from building  Hash table on one side of join partitions. 
> If Hash table is built successfully, it would be the same as the current 
> ShuffledHashJoin operator. 
> Sort: If we fail to build Hash table due to the large partition size, we do 
> SortMergeJoin only on this partition. But we need to rebuild the   When OOM 
> happens, a Hash table corresponding to partial part of this partition has 
> been built successfully (e.g. first 4000 rows of RDD), and the iterator of 
> this partition is now pointing to the 4001st row of partition. We reuse this 
> hash table to reconstruct the iterator for the first 4000 rows and 
> concatenate  with the rest rows of this partition so that we can rebuild this 
> partition completely. On this re-built partition, we apply sorting based on 
> key values.
> MergeJoin: After getting two sorted Iterators, we perform regular merge join 
> against them and emits the records to downstream operators.
> Iterator Reconstruct:  BytesToBytesMap has to be spilled to disk to release 
> the memory for other operators, such as Sort, Join, etc. In addition, it has 
> to be converted to Iterator, so that it can be concatenated with remaining 
> items in the original iterator that is used to build the hash table.
> Meta Data Population: Necessary metadata, such as sorting keys, jointype, 
> etc,  has to be populated, so that they are used for potential Sort and 
> MergeJoin operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to