[ 
https://issues.apache.org/jira/browse/SPARK-31635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17147653#comment-17147653
 ] 

Chen Zhang edited comment on SPARK-31635 at 7/7/20, 10:12 AM:
--------------------------------------------------------------

In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK in each RDD 
partition in the executor process(by QuickSelect), and then return each TOPK 
result to the driver process for merging(by PriorityQueue).

To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0 || partitions.length == 0) {
      Array.empty
    } else {
      if (conf.getBoolean("spark.rdd.takeOrdered.mergeInDriver", true)) {
        val mapRDDs = mapPartitions { items =>
          // Priority keeps the largest elements, so let's reverse the ordering.
          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
          queue ++= collectionUtils.takeOrdered(items, num)(ord)
          Iterator.single(queue)
        }
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      } else {
        mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.repartition(1).mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.collect()
      }
    }
  }
{code}


was (Author: chen zhang):
In fact, the RDD API corresponding to _DF.sort().take()_ is _RDD.takeOrdered()_

The execution logic of _RDD.sortBy().take()_ is the reservoir sampling + global 
bucket Sorting, and the required number of data is returned after the global 
sorting result is obtained.All major computation are performed in the executor 
process.

The execution logic of _RDD.takeOrdered()_ is to compute TOPK(by PriorityQueue) 
in each RDD partition in the executor process, and then return each TOPK result 
to the driver process for merging.

To get the same result, it is obvious that the second method based on 
PriorityQueue has better performance.

I think that the implementation of _RDD.takeOrdered()_ can be improved, using a 
configurable option to decide whether the TOPK data merge process occurs in the 
driver process or the executor process. If it occurs in the driver process, it 
can reduce the time for waiting for computation. If it occurs in the executor 
process, it can reduce the memory pressure of the driver process.

something like:
 (org.apache.spark.rdd.RDD class)
{code:scala}
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0 || partitions.length == 0) {
      Array.empty
    } else {
      if (conf.getBoolean("spark.rdd.take.ordered.driver.merge", true)) {
        val mapRDDs = mapPartitions { items =>
          // Priority keeps the largest elements, so let's reverse the ordering.
          val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
          queue ++= collectionUtils.takeOrdered(items, num)(ord)
          Iterator.single(queue)
        }
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      } else {
        mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.repartition(1).mapPartitions { items =>
          collectionUtils.takeOrdered(items, num)(ord)
        }.collect()
      }
    }
  }
{code}

> Spark SQL Sort fails when sorting big data points
> -------------------------------------------------
>
>                 Key: SPARK-31635
>                 URL: https://issues.apache.org/jira/browse/SPARK-31635
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.2
>            Reporter: George George
>            Priority: Major
>
>  Please have a look at the example below: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](250000)(Point(1,2)))), 100)
> test.toDF().as[Nested].sort("a").take(1)
> {code}
>  *Sorting* big data objects using *Spark Dataframe* is failing with following 
> exception: 
> {code:java}
> 2020-05-04 08:01:00 ERROR TaskSetManager:70 - Total size of serialized 
> results of 14 tasks (107.8 MB) is bigger than spark.driver.maxResultSize 
> (100.0 MB)
> [Stage 0:======>                                                 (12 + 3) / 
> 100]org.apache.spark.SparkException: Job aborted due to stage failure: Total 
> size of serialized results of 13 tasks (100.1 MB) is bigger than 
> spark.driver.maxResu
> {code}
> However using the *RDD API* is working and no exception is thrown: 
> {code:java}
> case class Point(x:Double, y:Double)
> case class Nested(a: Long, b: Seq[Point])
> val test = spark.sparkContext.parallelize((1L to 100L).map(a => 
> Nested(a,Seq.fill[Point](250000)(Point(1,2)))), 100)
> test.sortBy(_.a).take(1)
> {code}
> For both code snippets we started the spark shell with exactly the same 
> arguments:
> {code:java}
> spark-shell --driver-memory 6G --conf "spark.driver.maxResultSize=100MB"
> {code}
> Even if we increase the spark.driver.maxResultSize, the executors still get 
> killed for our use case. The interesting thing is that when using the RDD API 
> directly the problem is not there. *Looks like there is a bug in dataframe 
> sort because is shuffling too much data to the driver?* 
> Note: this is a small example and I reduced the spark.driver.maxResultSize to 
> a smaller size, but in our application I've tried setting it to 8GB but as 
> mentioned above the job was killed. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to