[ https://issues.apache.org/jira/browse/SPARK-21505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179202#comment-17179202 ]
Cheng Su commented on SPARK-21505: ---------------------------------- Just FYI - a new proposal is in https://issues.apache.org/jira/browse/SPARK-32634 for the same functionality. Please follow discussion and progress there, thanks. > A dynamic join operator to improve the join reliability > ------------------------------------------------------- > > Key: SPARK-21505 > URL: https://issues.apache.org/jira/browse/SPARK-21505 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 2.2.0, 2.3.0, 3.0.0 > Reporter: Lin > Priority: Major > Labels: features > > As we know, hash join is more efficient than sort merge join. But today hash > join is not so widely used because it may fail with OutOfMemory (OOM) error > due to limited memory resource, data skew, statistics mis-estimation and so > on. For example, if we apply shuffle hash join on an uneven distributed > dataset, some partitions might be so large that we cannot make a Hash table > for this particular partition causing OOM error. When OOM happens, current > Spark technology will throw an Exception, resulting in job failure. On the > other hand, if sort-merge join is used, there will be shuffle, sorting and > extra spill, causing the degradation of the join. Considering the efficiency > of hash join, we want to propose a fallback mechanism to dynamically use hash > join or sort-merge join at runtime at task level to provide a more reliable > join operation. > This new dynamic join operator internally implements the logic of HashJoin, > Iterator Reconstruct, Sort, and MergeJoin. We show the process of this > dynamic join method as following: > HashJoin: We start from building Hash table on one side of join partitions. > If Hash table is built successfully, it would be the same as the current > ShuffledHashJoin operator. > Sort: If we fail to build Hash table due to the large partition size, we do > SortMergeJoin only on this partition. But we need to rebuild the When OOM > happens, a Hash table corresponding to partial part of this partition has > been built successfully (e.g. first 4000 rows of RDD), and the iterator of > this partition is now pointing to the 4001st row of partition. We reuse this > hash table to reconstruct the iterator for the first 4000 rows and > concatenate with the rest rows of this partition so that we can rebuild this > partition completely. On this re-built partition, we apply sorting based on > key values. > MergeJoin: After getting two sorted Iterators, we perform regular merge join > against them and emits the records to downstream operators. > Iterator Reconstruct: BytesToBytesMap has to be spilled to disk to release > the memory for other operators, such as Sort, Join, etc. In addition, it has > to be converted to Iterator, so that it can be concatenated with remaining > items in the original iterator that is used to build the hash table. > Meta Data Population: Necessary metadata, such as sorting keys, jointype, > etc, has to be populated, so that they are used for potential Sort and > MergeJoin operator. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org