[
https://issues.apache.org/jira/browse/SPARK-27792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-27792:
----------------------------------
Affects Version/s: (was: 2.4.3)
3.0.0
> SkewJoin--handle only skewed keys with broadcastjoin and other keys with
> normal join
> ------------------------------------------------------------------------------------
>
> Key: SPARK-27792
> URL: https://issues.apache.org/jira/browse/SPARK-27792
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: Jason Guo
> Priority: Major
> Attachments: SMJ DAG.png, SMJ tasks.png, skew join DAG.png, sql.png,
> time.png
>
>
> This feature is designed to handle data skew in Join
>
> *Senario*
> * A big table (big_skewed) which contains a a few skewed key
> * A small table (small_even) which has no skewed key and is larger than the
> broadcast threshold
> * When big_skewed.join(small_even), a few tasks will be much slower than
> other tasks because they need to handle the skewed key
> *Solution*
> * Provide a hint to indicate which keys are skewed keys
> * Handle the skewed keys with broadcastjoin and join the non-skewed keys
> with normal joint method
> * For the small table, the whole table is larger than the broadcast
> threshold. But total size of the records with the same keys which is skewed
> keys in the big table is smaller than the broadcast threshold, so these
> records can be joint with the big table with broadcast join
> * For other records with non-skewed keys, they can be joint with normal join
> method
> * We can get the final result with union the above two parts
> *Effect*
> This feature reduce the join time from 5.7 minutes to 2.1 minutes
> !time.png!
> !sql.png!
>
> *Experiment*
> *Without this feature, the whole job took 5.7 minutes*
> tableA has 2 skewed keys 9500048 and 9500096
> {code:java}
> INSERT OVERWRITE TABLE big_skewed
> SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS
> INT) + 1) * 48 )
> ELSE CAST(id/100 AS INT) END AS STRING), 'A'
> name
> FROM ids
> WHERE id BETWEEN 900000000 AND 1050000000;{code}
> tableB has no skewed keys
> {code:java}
> INSERT OVERWRITE TABLE small_even
> SELECT CAST(CAST(id/100 AS INT) AS STRING), 'B'
> name
> FROM ids
> WHERE id BETWEEN 950000000 AND 950500000;{code}
>
> Join them with setting spark.sql.autoBroadcastJoinThreshold to 3000
> {code:java}
> insert overwrite table result_with_skew
> select big_skewed.id, tabig_skewed.value, small_even.value
> from big_skewed
> join small_even
> on small_even.id=big_skewed.id;
> {code}
>
> The sort merge join is slow with 2 straggle tasks
> !SMJ DAG.png!
> !SMJ tasks.png!
>
> *With this feature, the job took only 2.1 minutes*
> The skewed keys are joint with broadcast join and the non-skewed keys are
> joint with sort merge join
> !skew join DAG.png!
>
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]