[ 
https://issues.apache.org/jira/browse/SPARK-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell closed SPARK-15326.
-------------------------------------
    Resolution: Not A Problem

> Doing multiple unions on a Dataframe will result in a very inefficient query 
> plan
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-15326
>                 URL: https://issues.apache.org/jira/browse/SPARK-15326
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.1, 2.0.0
>            Reporter: Jurriaan Pruis
>         Attachments: Query Plan.pdf, skewed_join.py, skewed_join_plan.txt
>
>
> While working with a very skewed dataset I noticed that repeated unions on a 
> dataframe will result in a query plan with 2^(union) - 1 unions. With large 
> datasets this will be very inefficient.
> I tried to replicate this behaviour using a PySpark example (I've attached 
> the output of the explain() to this JIRA):
> {code}
> df = sqlCtx.range(10000000)
> def r(name, max_val=100):
>     return F.round(F.lit(max_val) * F.pow(F.rand(), 
> 4)).cast('integer').alias(name)
> # Create a skewed dataset
> skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f'))
> # Find the skewed values in the dataset
> top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], 
> 0.10).collect()[0]
> def skewjoin(skewed, right, column, freqItems):
>     freqItems = freqItems[column + '_freqItems']
>     skewed = skewed.alias('skewed')
>     cond = F.col(column).isin(freqItems)
>     # First broadcast join the frequent (skewed) values
>     filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), 
> column, 'left_outer')
>     # Use a regular join for the non skewed values (with big tables this will 
> use a SortMergeJoin)
>     non_skewed = skewed.filter(cond == False).join(right.filter(cond == 
> False), column, 'left_outer')
>     # join them together and replace the column with the column found in the 
> right DataFrame
>     return filtered.unionAll(non_skewed).select('skewed.*', 
> right['id'].alias(column + '_key')).drop(column)
> # Create the dataframes that will be joined to the skewed dataframe
> right_size = 100
> df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a'))
> df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b'))
> df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c'))
> df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d'))
> df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e'))
> df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f'))
> # Join everything together
> df = skewed
> df = skewjoin(df, df_a, 'a', top_10_percent)
> df = skewjoin(df, df_b, 'b', top_10_percent)
> df = skewjoin(df, df_c, 'c', top_10_percent)
> df = skewjoin(df, df_d, 'd', top_10_percent)
> df = skewjoin(df, df_e, 'e', top_10_percent)
> df = skewjoin(df, df_f, 'f', top_10_percent)
> # df.explain() shows the plan where it does 63 unions 
> (2^(number_of_skewjoins) - 1)
> # which will be very inefficient and slow
> df.explain(True)
> # Evaluate the plan
> # You'd expect this to return 10000000, but it does not, it returned 10000140 
> on my system
> # (probably because it will recalculate the random columns? Not sure though)
> print(df.count())
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to