Lin created SPARK-21505:
---------------------------
Summary: A dynamic join operator to improve the join reliability
Key: SPARK-21505
URL: https://issues.apache.org/jira/browse/SPARK-21505
Project: Spark
Issue Type: New Feature
Components: SQL
Affects Versions: 2.2.0, 2.3.0, 3.0.0
Reporter: Lin
As we know, hash join is more efficient than sort merge join. But today hash
join is not so widely used because it may fail with OutOfMemory (OOM) error due
to limited memory resource, data skew, statistics mis-estimation and so on. For
example, if we apply shuffle hash join on an uneven distributed dataset, some
partitions might be so large that we cannot make a Hash table for this
particular partition causing OOM error. When OOM happens, current Spark
technology will throw an Exception, resulting in job failure. On the other
hand, if sort-merge join is used, there will be shuffle, sorting and extra
spill, causing the degradation of the join. Considering the efficiency of hash
join, we want to propose a fallback mechanism to dynamically use hash join or
sort-merge join at runtime at task level to provide a more reliable join
operation.
This new dynamic join operator internally implements the logic of HashJoin,
Iterator Reconstruct, Sort, and MergeJoin. We show the process of this dynamic
join method as following:
HashJoin: We start from building Hash table on one side of join partitions. If
Hash table is built successfully, it would be the same as the current
ShuffledHashJoin operator.
Sort: If we fail to build Hash table due to the large partition size, we do
SortMergeJoin only on this partition. But we need to rebuild the When OOM
happens, a Hash table corresponding to partial part of this partition has been
built successfully (e.g. first 4000 rows of RDD), and the iterator of this
partition is now pointing to the 4001st row of partition. We reuse this hash
table to reconstruct the iterator for the first 4000 rows and concatenate with
the rest rows of this partition so that we can rebuild this partition
completely. On this re-built partition, we apply sorting based on key values.
MergeJoin: After getting two sorted Iterators, we perform regular merge join
against them and emits the records to downstream operators.
Iterator Reconstruct: BytesToBytesMap has to be spilled to disk to release the
memory for other operators, such as Sort, Join, etc. In addition, it has to be
converted to Iterator, so that it can be concatenated with remaining items in
the original iterator that is used to build the hash table.
Meta Data Population: Necessary metadata, such as sorting keys, jointype, etc,
has to be populated, so that they are used for potential Sort and MergeJoin
operator.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]