Thank you for your prompt response and great examples Sun Rui but I am
still confused about one thing. Do you see any particular reason to not
to merge subsequent limits? Following case
(limit n (map f (limit m ds)))
could be optimized to:
(map f (limit n (limit m ds)))
and further to
(map f (limit (min n m) ds))
couldn't it?
On 08/02/2016 11:57 AM, Sun Rui wrote:
> Based on your code, here is simpler test case on Spark 2.0
>
> case class my (x: Int)
> val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
> val df1 = spark.createDataFrame(rdd)
> val df2 = df1.limit(1)
> df1.map { r => r.getAs[Int](0) }.first
> df2.map { r => r.getAs[Int](0) }.first // Much slower than the
> previous line
>
> Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so
> check the physical plan of the two cases:
>
> scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
> == Physical Plan ==
> CollectLimit 1
> +- *SerializeFromObject [input[0, int, true] AS value#124]
> +- *MapElements <function1>, obj#123: int
> +- *DeserializeToObject createexternalrow(x#74,
> StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
> +- Scan ExistingRDD[x#74]
>
> scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
> == Physical Plan ==
> CollectLimit 1
> +- *SerializeFromObject [input[0, int, true] AS value#131]
> +- *MapElements <function1>, obj#130: int
> +- *DeserializeToObject createexternalrow(x#74,
> StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
> +- *GlobalLimit 1
> +- Exchange SinglePartition
> +- *LocalLimit 1
> +- Scan ExistingRDD[x#74]
>
>
> For the first case, it is related to an optimisation in
> the CollectLimitExec physical operator. That is, it will first fetch
> the first partition to get limit number of row, 1 in this case, if not
> satisfied, then fetch more partitions, until the desired limit is
> reached. So generally, if the first partition is not empty, only the
> first partition will be calculated and fetched. Other partitions will
> even not be computed.
>
> However, in the second case, the optimisation in the CollectLimitExec
> does not help, because the previous limit operation involves a shuffle
> operation. All partitions will be computed, and running LocalLimit(1)
> on each partition to get 1 row, and then all partitions are shuffled
> into a single partition. CollectLimitExec will fetch 1 row from the
> resulted single partition.
>
>
>> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi everyone,
>>
>> This doesn't look like something expected, does it?
>>
>> http://stackoverflow.com/q/38710018/1560062
>>
>> Quick glance at the UI suggest that there is a shuffle involved and
>> input for first is ShuffledRowRDD.
>> --
>> Best regards,
>> Maciej Szymkiewicz
>
--
Maciej Szymkiewicz
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]