[ 
https://issues.apache.org/jira/browse/SPARK-15415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin reopened SPARK-15415:
---------------------------------
      Assignee:     (was: Davies Liu)

I discussed with [~davies] and [~marmbrus] offline. We now actually think this 
is worth fixing, not for just this case, but to make broadcast hint more 
deterministic. 

In Spark branch-2.0, the way broadcast hint works is by setting the size 
estimate to 1, and that's why your test case no longer works. There is another 
problem that if somebody sets the broadcast threshold to a low value (say 2), 
then the broadcast hint might not work either as long as the user adds a 
project that increases the size. The problem comes from overloading statistics 
to apply hints.

So the proposed fix is to add another field to Statistics called 
"isBroadcastable", and then broadcast hint simply sets that instead. We would 
need to change operators so isBroadcastable is propagated throughout query 
plans. This can be best done by using the copy ctor on Statistics (e.g. 
child.statistics.copy(sizeInBytes = ...)).

One thing is that we should make sure in joins we don't propagate 
isBroadcastable, because join could explode the size. 

[~jurriaanpruis] do you think you can work on this in the next few days so we 
can commit this into 2.0?




> Marking partitions for broadcast broken
> ---------------------------------------
>
>                 Key: SPARK-15415
>                 URL: https://issues.apache.org/jira/browse/SPARK-15415
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Jurriaan Pruis
>
> I couldn't get the broadcast(DataFrame) sql function to work in Spark 2.0.
> It does work in Spark 1.6.1:
> {code}
> $ pyspark --conf spark.sql.autoBroadcastJoinThreshold=0
> >>> df = sqlCtx.range(1000);df2 = 
> >>> sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2), 
> >>> 'id').explain()
> == Physical Plan ==
> Project [id#0L]
> +- BroadcastHashJoin [id#0L], [id#1L], BuildRight
>    :- ConvertToUnsafe
>    :  +- Scan ExistingRDD[id#0L]
>    +- ConvertToUnsafe
>       +- Scan ExistingRDD[id#1L]
> {code}
> While in Spark 2.0 this results in:
> {code}
> >>> df = sqlCtx.range(1000);df2 = 
> >>> sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2), 
> >>> 'id').explain()
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#6L]
> :     +- SortMergeJoin [id#6L], [id#9L], Inner, None
> :        :- INPUT
> :        +- INPUT
> :- WholeStageCodegen
> :  :  +- Sort [id#6L ASC], false, 0
> :  :     +- INPUT
> :  +- Exchange hashpartitioning(id#6L, 200), None
> :     +- WholeStageCodegen
> :        :  +- Range 0, 1, 8, 1000, [id#6L]
> +- WholeStageCodegen
>    :  +- Sort [id#9L ASC], false, 0
>    :     +- INPUT
>    +- ReusedExchange [id#9L], Exchange hashpartitioning(id#6L, 200), None
> {code}
>  
> While it should look like (output when you remove the 
> spark.sql.autoBroadcastJoinThreshold conf):
> {code}
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#0L]
> :     +- BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight, None
> :        :- Range 0, 1, 8, 1000, [id#0L]
> :        +- INPUT
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
>    +- WholeStageCodegen
>       :  +- Range 0, 1, 8, 1000, [id#3L]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to