[
https://issues.apache.org/jira/browse/SPARK-15415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Reynold Xin reopened SPARK-15415:
---------------------------------
Assignee: (was: Davies Liu)
I discussed with [~davies] and [~marmbrus] offline. We now actually think this
is worth fixing, not for just this case, but to make broadcast hint more
deterministic.
In Spark branch-2.0, the way broadcast hint works is by setting the size
estimate to 1, and that's why your test case no longer works. There is another
problem that if somebody sets the broadcast threshold to a low value (say 2),
then the broadcast hint might not work either as long as the user adds a
project that increases the size. The problem comes from overloading statistics
to apply hints.
So the proposed fix is to add another field to Statistics called
"isBroadcastable", and then broadcast hint simply sets that instead. We would
need to change operators so isBroadcastable is propagated throughout query
plans. This can be best done by using the copy ctor on Statistics (e.g.
child.statistics.copy(sizeInBytes = ...)).
One thing is that we should make sure in joins we don't propagate
isBroadcastable, because join could explode the size.
[~jurriaanpruis] do you think you can work on this in the next few days so we
can commit this into 2.0?
> Marking partitions for broadcast broken
> ---------------------------------------
>
> Key: SPARK-15415
> URL: https://issues.apache.org/jira/browse/SPARK-15415
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Jurriaan Pruis
>
> I couldn't get the broadcast(DataFrame) sql function to work in Spark 2.0.
> It does work in Spark 1.6.1:
> {code}
> $ pyspark --conf spark.sql.autoBroadcastJoinThreshold=0
> >>> df = sqlCtx.range(1000);df2 =
> >>> sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2),
> >>> 'id').explain()
> == Physical Plan ==
> Project [id#0L]
> +- BroadcastHashJoin [id#0L], [id#1L], BuildRight
> :- ConvertToUnsafe
> : +- Scan ExistingRDD[id#0L]
> +- ConvertToUnsafe
> +- Scan ExistingRDD[id#1L]
> {code}
> While in Spark 2.0 this results in:
> {code}
> >>> df = sqlCtx.range(1000);df2 =
> >>> sqlCtx.range(1000);df.join(pyspark.sql.functions.broadcast(df2),
> >>> 'id').explain()
> == Physical Plan ==
> WholeStageCodegen
> : +- Project [id#6L]
> : +- SortMergeJoin [id#6L], [id#9L], Inner, None
> : :- INPUT
> : +- INPUT
> :- WholeStageCodegen
> : : +- Sort [id#6L ASC], false, 0
> : : +- INPUT
> : +- Exchange hashpartitioning(id#6L, 200), None
> : +- WholeStageCodegen
> : : +- Range 0, 1, 8, 1000, [id#6L]
> +- WholeStageCodegen
> : +- Sort [id#9L ASC], false, 0
> : +- INPUT
> +- ReusedExchange [id#9L], Exchange hashpartitioning(id#6L, 200), None
> {code}
>
> While it should look like (output when you remove the
> spark.sql.autoBroadcastJoinThreshold conf):
> {code}
> == Physical Plan ==
> WholeStageCodegen
> : +- Project [id#0L]
> : +- BroadcastHashJoin [id#0L], [id#3L], Inner, BuildRight, None
> : :- Range 0, 1, 8, 1000, [id#0L]
> : +- INPUT
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint]))
> +- WholeStageCodegen
> : +- Range 0, 1, 8, 1000, [id#3L]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]