[ 
https://issues.apache.org/jira/browse/SPARK-19091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josh Rosen updated SPARK-19091:
-------------------------------
    Description: It turns out that spark.createDataset(sc.parallelize(x: Seq)) 
and spark.createaDataSet(x: Seq) produce different plans, where the former is 
much less efficient due to a lack of accurate size estimation. We should modify 
SparkSession to special-case the situation where createDataset is called on a 
ParallelCollectionRDD in order to remove this source of performance variation 
between the two plans.  (was: 

The Catalyst optimizer uses LogicalRDD to represent scans from existing RDDs. 
In general, it's hard to produce size estimates for arbitrary RDDs, which is 
why the current implementation simply estimates these relations sizes using the 
default size (see the TODO at 
https://github.com/apache/spark/blob/f5d18af6a8a0b9f8c2e9677f9d8ae1712eb701c6/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala#L174)

In the special case where data has been parallelized with {{sc.parallelize()}} 
we'll wind up with a ParallelCollectionRDD whose number of elements is known. 
When we construct a LogicalRDD plan node in {{SparkSession.createDataFrame()}} 
we'll probably be using an RDD which is a one-to-one transformation of a 
parallel collection RDD (e.g. mapping an encoder to convert case classes to 
internal rows). Thus we can have LogicalRDD's statistics method pattern-match 
on cases where we have a MappedPartitionsRDD stacked immediately on top of a 
ParallelCollectionRDD, then walk up the RDD parent chain to determine the 
number of elements and we can combine this with the schema and a conservative 
"fudge factor" to produce an over-estimate of the LogicalRDD's size which will 
be more accurate than the default size.

I believe that this will help us to avoid falling into pathologically bad plans 
when joining tiny parallelize()d data sets against huge tables and have one of 
my own production use-cases which would have benefitted directly from such an 
optimization. )

> createDataset(sc.parallelize(x: Seq)) should be equivalent to 
> createDataset(x: Seq)
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-19091
>                 URL: https://issues.apache.org/jira/browse/SPARK-19091
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Josh Rosen
>
> It turns out that spark.createDataset(sc.parallelize(x: Seq)) and 
> spark.createaDataSet(x: Seq) produce different plans, where the former is 
> much less efficient due to a lack of accurate size estimation. We should 
> modify SparkSession to special-case the situation where createDataset is 
> called on a ParallelCollectionRDD in order to remove this source of 
> performance variation between the two plans.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to