[
https://issues.apache.org/jira/browse/SPARK-21505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236365#comment-16236365
]
Zhan Zhang commented on SPARK-21505:
------------------------------------
Any comments on this feature? Do you think the design is OK? If so, we are
going to submit a PR.
> A dynamic join operator to improve the join reliability
> -------------------------------------------------------
>
> Key: SPARK-21505
> URL: https://issues.apache.org/jira/browse/SPARK-21505
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 2.2.0, 2.3.0, 3.0.0
> Reporter: Lin
> Priority: Major
> Labels: features
>
> As we know, hash join is more efficient than sort merge join. But today hash
> join is not so widely used because it may fail with OutOfMemory (OOM) error
> due to limited memory resource, data skew, statistics mis-estimation and so
> on. For example, if we apply shuffle hash join on an uneven distributed
> dataset, some partitions might be so large that we cannot make a Hash table
> for this particular partition causing OOM error. When OOM happens, current
> Spark technology will throw an Exception, resulting in job failure. On the
> other hand, if sort-merge join is used, there will be shuffle, sorting and
> extra spill, causing the degradation of the join. Considering the efficiency
> of hash join, we want to propose a fallback mechanism to dynamically use hash
> join or sort-merge join at runtime at task level to provide a more reliable
> join operation.
> This new dynamic join operator internally implements the logic of HashJoin,
> Iterator Reconstruct, Sort, and MergeJoin. We show the process of this
> dynamic join method as following:
> HashJoin: We start from building Hash table on one side of join partitions.
> If Hash table is built successfully, it would be the same as the current
> ShuffledHashJoin operator.
> Sort: If we fail to build Hash table due to the large partition size, we do
> SortMergeJoin only on this partition. But we need to rebuild the When OOM
> happens, a Hash table corresponding to partial part of this partition has
> been built successfully (e.g. first 4000 rows of RDD), and the iterator of
> this partition is now pointing to the 4001st row of partition. We reuse this
> hash table to reconstruct the iterator for the first 4000 rows and
> concatenate with the rest rows of this partition so that we can rebuild this
> partition completely. On this re-built partition, we apply sorting based on
> key values.
> MergeJoin: After getting two sorted Iterators, we perform regular merge join
> against them and emits the records to downstream operators.
> Iterator Reconstruct: BytesToBytesMap has to be spilled to disk to release
> the memory for other operators, such as Sort, Join, etc. In addition, it has
> to be converted to Iterator, so that it can be concatenated with remaining
> items in the original iterator that is used to build the hash table.
> Meta Data Population: Necessary metadata, such as sorting keys, jointype,
> etc, has to be populated, so that they are used for potential Sort and
> MergeJoin operator.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]