Spark does optimise subsequent limits, for example: scala> df1.limit(3).limit(1).explain == Physical Plan == CollectLimit 1 +- *SerializeFromObject [assertnotnull(input[0, $line14.$read$$iw$$iw$my, true], top level non-flat input object).x AS x#2] +- Scan ExternalRDDScan[obj#1]
However, limit can not be simply pushes down across mapping functions, because the number of rows may change across functions. for example, flatMap() It seems that limit can be pushed across map() which won’t change the number of rows. Maybe this is a room for Spark optimisation. > On Aug 2, 2016, at 18:51, Maciej Szymkiewicz <mszymkiew...@gmail.com> wrote: > > Thank you for your prompt response and great examples Sun Rui but I am > still confused about one thing. Do you see any particular reason to not > to merge subsequent limits? Following case > > (limit n (map f (limit m ds))) > > could be optimized to: > > (map f (limit n (limit m ds))) > > and further to > > (map f (limit (min n m) ds)) > > couldn't it? > > > On 08/02/2016 11:57 AM, Sun Rui wrote: >> Based on your code, here is simpler test case on Spark 2.0 >> >> case class my (x: Int) >> val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) } >> val df1 = spark.createDataFrame(rdd) >> val df2 = df1.limit(1) >> df1.map { r => r.getAs[Int](0) }.first >> df2.map { r => r.getAs[Int](0) }.first // Much slower than the >> previous line >> >> Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so >> check the physical plan of the two cases: >> >> scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain >> == Physical Plan == >> CollectLimit 1 >> +- *SerializeFromObject [input[0, int, true] AS value#124] >> +- *MapElements <function1>, obj#123: int >> +- *DeserializeToObject createexternalrow(x#74, >> StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row >> +- Scan ExistingRDD[x#74] >> >> scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain >> == Physical Plan == >> CollectLimit 1 >> +- *SerializeFromObject [input[0, int, true] AS value#131] >> +- *MapElements <function1>, obj#130: int >> +- *DeserializeToObject createexternalrow(x#74, >> StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row >> +- *GlobalLimit 1 >> +- Exchange SinglePartition >> +- *LocalLimit 1 >> +- Scan ExistingRDD[x#74] >> >> >> For the first case, it is related to an optimisation in >> the CollectLimitExec physical operator. That is, it will first fetch >> the first partition to get limit number of row, 1 in this case, if not >> satisfied, then fetch more partitions, until the desired limit is >> reached. So generally, if the first partition is not empty, only the >> first partition will be calculated and fetched. Other partitions will >> even not be computed. >> >> However, in the second case, the optimisation in the CollectLimitExec >> does not help, because the previous limit operation involves a shuffle >> operation. All partitions will be computed, and running LocalLimit(1) >> on each partition to get 1 row, and then all partitions are shuffled >> into a single partition. CollectLimitExec will fetch 1 row from the >> resulted single partition. >> >> >>> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz <mszymkiew...@gmail.com >>> <mailto:mszymkiew...@gmail.com <mailto:mszymkiew...@gmail.com>>> wrote: >>> >>> Hi everyone, >>> >>> This doesn't look like something expected, does it? >>> >>> http://stackoverflow.com/q/38710018/1560062 >>> >>> Quick glance at the UI suggest that there is a shuffle involved and >>> input for first is ShuffledRowRDD. >>> -- >>> Best regards, >>> Maciej Szymkiewicz >> > > -- > Maciej Szymkiewicz