[ 
https://issues.apache.org/jira/browse/SPARK-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15343175#comment-15343175
 ] 

Herman van Hovell commented on SPARK-15326:
-------------------------------------------

So we have code that will flatten nested Unions. For example {{UNION(UNION(a, 
b), UNION(c, d))}} will be converted into {{UNION(a, b, c, d}}.

This case however is a bit different. You are joining the {{right}} table twice 
here once to the skewed values and once to the regular values, and then you are 
joining them back together. So you are incorporating the right hand side twice 
- which will cause a nice blow-up if you do this a few times. You may be able 
to rewrite without the union by doing something in the line of {{select * from 
right left join broadcast(skewed) left join not-skewed}}.

> Doing multiple unions on a Dataframe will result in a very inefficient query 
> plan
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-15326
>                 URL: https://issues.apache.org/jira/browse/SPARK-15326
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1, 2.0.0
>            Reporter: Jurriaan Pruis
>         Attachments: Query Plan.pdf, skewed_join.py, skewed_join_plan.txt
>
>
> While working with a very skewed dataset I noticed that repeated unions on a 
> dataframe will result in a query plan with 2^(union) - 1 unions. With large 
> datasets this will be very inefficient.
> I tried to replicate this behaviour using a PySpark example (I've attached 
> the output of the explain() to this JIRA):
> {code}
> df = sqlCtx.range(10000000)
> def r(name, max_val=100):
>     return F.round(F.lit(max_val) * F.pow(F.rand(), 
> 4)).cast('integer').alias(name)
> # Create a skewed dataset
> skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f'))
> # Find the skewed values in the dataset
> top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], 
> 0.10).collect()[0]
> def skewjoin(skewed, right, column, freqItems):
>     freqItems = freqItems[column + '_freqItems']
>     skewed = skewed.alias('skewed')
>     cond = F.col(column).isin(freqItems)
>     # First broadcast join the frequent (skewed) values
>     filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), 
> column, 'left_outer')
>     # Use a regular join for the non skewed values (with big tables this will 
> use a SortMergeJoin)
>     non_skewed = skewed.filter(cond == False).join(right.filter(cond == 
> False), column, 'left_outer')
>     # join them together and replace the column with the column found in the 
> right DataFrame
>     return filtered.unionAll(non_skewed).select('skewed.*', 
> right['id'].alias(column + '_key')).drop(column)
> # Create the dataframes that will be joined to the skewed dataframe
> right_size = 100
> df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a'))
> df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b'))
> df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c'))
> df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d'))
> df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e'))
> df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f'))
> # Join everything together
> df = skewed
> df = skewjoin(df, df_a, 'a', top_10_percent)
> df = skewjoin(df, df_b, 'b', top_10_percent)
> df = skewjoin(df, df_c, 'c', top_10_percent)
> df = skewjoin(df, df_d, 'd', top_10_percent)
> df = skewjoin(df, df_e, 'e', top_10_percent)
> df = skewjoin(df, df_f, 'f', top_10_percent)
> # df.explain() shows the plan where it does 63 unions 
> (2^(number_of_skewjoins) - 1)
> # which will be very inefficient and slow
> df.explain(True)
> # Evaluate the plan
> # You'd expect this to return 10000000, but it does not, it returned 10000140 
> on my system
> # (probably because it will recalculate the random columns? Not sure though)
> print(df.count())
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to