[
https://issues.apache.org/jira/browse/SPARK-29852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean R. Owen resolved SPARK-29852.
----------------------------------
Resolution: Duplicate
> Implement parallel preemptive RDD.toLocalIterator and Dataset.toLocalIterator
> -----------------------------------------------------------------------------
>
> Key: SPARK-29852
> URL: https://issues.apache.org/jira/browse/SPARK-29852
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core, SQL
> Affects Versions: 2.4.4, 3.0.0
> Reporter: Peng Cheng
> Priority: Major
> Original Estimate: 0h
> Remaining Estimate: 0h
>
> Both RDD and Dataset APIs have 2 methods of collecting data from executors to
> driver:
>
> # .collect() setup multiple threads in a job and dump all data from executor
> into drivers memory. This is great if data on driver needs to be accessible
> ASAP, but not as efficient if access to partitions can only happen
> sequentially, and outright risky if driver doesn't have enough memory to hold
> all data.
> - the solution for issue SPARK-25224 partially alleviate this by delaying
> deserialisation of data in InternalRow format, such that only the much
> smaller serialised data needs to be entirely hold by driver memory. This
> solution does not abide O(1) memory consumption, thus does not scale to
> arbitrarily large dataset
> # .toLocalIterator() fetch one partition in 1 job at a time, and fetching of
> the next partition does not start until sequential access to previous
> partition has concluded. This action abides O(1) memory consumption and is
> great if access to data is sequential and significantly slower than the speed
> where partitions can be shipped from a single executor, with 1 thread. It
> becomes inefficient when the sequential access to data has to wait for a
> relatively long time for the shipping of the next partition
> The proposed solution is a crossover between two existing implementations: a
> concurrent subroutine that is both CPU and memory bounded. The solution
> allocate a fixed sized resource pool (by default = number of available CPU
> cores) that serves the shipping of partitions concurrently, and block
> sequential access to partitions' data until shipping is finished (which
> usually happens without blocking for partitionID >=2 due to the fact that
> shipping start much earlier and preemptively). Tenants of the resource pool
> can be GC'ed and evicted once sequential access to it's data has finished,
> which allows more partitions to be fetched much earlier than they are
> accessed. The maximum memory consumption is O(m * n), where m is the
> predefined concurrency and n is the size of the largest partition.
> The following scala code snippet demonstrates a simple implementation:
>
> (requires scala 2.11 + and ScalaTests)
>
> {code:java}
> package org.apache.spark.spike
> import java.util.concurrent.ArrayBlockingQueue
> import org.apache.spark.rdd.RDD
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.{FutureAction, SparkContext}
> import org.scalatest.FunSpec
> import scala.concurrent.Future
> import scala.language.implicitConversions
> import scala.reflect.ClassTag
> import scala.util.{Failure, Success, Try}
> class ToLocalIteratorPreemptivelySpike extends FunSpec {
> import ToLocalIteratorPreemptivelySpike._
> lazy val sc: SparkContext =
> SparkSession.builder().master("local[*]").getOrCreate().sparkContext
> it("can be much faster than toLocalIterator") {
> val max = 80
> val delay = 100
> val slowRDD = sc.parallelize(1 to max, 8).map { v =>
> Thread.sleep(delay)
> v
> }
> val (r1, t1) = timed {
> slowRDD.toLocalIterator.toList
> }
> val capacity = 4
> val (r2, t2) = timed {
> slowRDD.toLocalIteratorPreemptively(capacity).toList
> }
> assert(r1 == r2)
> println(s"linear: $t1, preemptive: $t2")
> assert(t1 > t2 * 2)
> assert(t2 > max * delay / capacity)
> }
> }
> object ToLocalIteratorPreemptivelySpike {
> case class PartitionExecution[T: ClassTag](
> @transient self: RDD[T],
> id: Int
> ) {
> def eager: this.type = {
> AsArray.future
> this
> }
> case object AsArray {
> @transient lazy val future: FutureAction[Array[T]] = {
> var result: Array[T] = null
> val future = self.context.submitJob[T, Array[T], Array[T]](
> self,
> _.toArray,
> Seq(id), { (_, data) =>
> result = data
> },
> result
> )
> future
> }
> @transient lazy val now: Array[T] = future.get()
> }
> }
> implicit class RDDFunctions[T: ClassTag](self: RDD[T]) {
> import scala.concurrent.ExecutionContext.Implicits.global
> def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = {
> val executions = self.partitions.indices.map { ii =>
> PartitionExecution(self, ii)
> }
> val buffer = new
> ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity)
> Future {
> executions.foreach { exe =>
> buffer.put(Success(exe)) // may be blocking due to capacity
> exe.eager // non-blocking
> }
> }.onFailure {
> case e: Throwable =>
> buffer.put(Failure(e))
> }
> self.partitions.indices.toIterator.map { _ =>
> val exe = buffer.take().get
> exe.AsArray.now
> }
> }
> def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = {
> _toLocalIteratorPreemptively(capacity).flatten
> }
> }
> def timed[T](fn: => T): (T, Long) = {
> val startTime = System.currentTimeMillis()
> val result = fn
> val endTime = System.currentTimeMillis()
> (result, endTime - startTime)
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]