Peng Cheng created SPARK-29852:
----------------------------------

             Summary: Implement parallel preemptive RDD.toLocalIterator and 
Dataset.toLocalIterator
                 Key: SPARK-29852
                 URL: https://issues.apache.org/jira/browse/SPARK-29852
             Project: Spark
          Issue Type: New Feature
          Components: Spark Core, SQL
    Affects Versions: 2.4.4, 3.0.0
            Reporter: Peng Cheng


Both RDD and Dataset APIs have 2 methods of collecting data from executors to 
driver:

 
 # .collect() setup multiple threads in a job and dump all data from executor 
into drivers memory. This is great if data on driver needs to be accessible 
ASAP, but not as efficient if access to partitions can only happen 
sequentially, and outright risky if driver doesn't have enough memory to hold 
all data.

- the solution for issue SPARK-25224 partially alleviate this by delaying 
deserialisation of data in InternalRow format, such that only the much smaller 
serialised data needs to be entirely hold by driver memory. This solution does 
not abide O(1) memory consumption, thus does not scale to arbitrarily large 
dataset
 # .toLocalIterator() fetch one partition in 1 job at a time, and fetching of 
the next partition does not start until sequential access to previous partition 
has concluded. This action abides O(1) memory consumption and is great if 
access to data is sequential and significantly slower than the speed where 
partitions can be shipped from a single executor, with 1 thread. It becomes 
inefficient when the sequential access to data has to wait for a relatively 
long time for the shipping of the next partition

The proposed solution is a crossover between two existing implementations: a 
concurrent subroutine that is both CPU and memory bounded. The solution 
allocate a fixed sized resource pool (by default = number of available CPU 
cores) that serves the shipping of partitions concurrently, and block 
sequential access to partitions' data until shipping is finished (which usually 
happens without blocking for partitionID >=2 due to the fact that shipping 
start much earlier and preemptively). Tenants of the resource pool can be GC'ed 
and evicted once sequential access to it's data has finished, which allows more 
partitions to be fetched much earlier than they are accessed. The maximum 
memory consumption is O(m * n), where m is the predefined concurrency and n is 
the size of the largest partition.

The following scala code snippet demonstrates a simple implementation:

 

(requires scala 2.11 + and ScalaTests)

 
{code:java}
package org.apache.spark.spike

import java.util.concurrent.ArrayBlockingQueue

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{FutureAction, SparkContext}
import org.scalatest.FunSpec

import scala.concurrent.Future
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

class ToLocalIteratorPreemptivelySpike extends FunSpec {

  import ToLocalIteratorPreemptivelySpike._

  lazy val sc: SparkContext = 
SparkSession.builder().master("local[*]").getOrCreate().sparkContext

  it("can be much faster than toLocalIterator") {

    val max = 80
    val delay = 100

    val slowRDD = sc.parallelize(1 to max, 8).map { v =>
      Thread.sleep(delay)
      v
    }

    val (r1, t1) = timed {
      slowRDD.toLocalIterator.toList
    }

    val capacity = 4
    val (r2, t2) = timed {
      slowRDD.toLocalIteratorPreemptively(capacity).toList
    }

    assert(r1 == r2)
    println(s"linear: $t1, preemptive: $t2")
    assert(t1 > t2 * 2)
    assert(t2 > max * delay / capacity)
  }
}

object ToLocalIteratorPreemptivelySpike {

  case class PartitionExecution[T: ClassTag](
      @transient self: RDD[T],
      id: Int
  ) {

    def eager: this.type = {
      AsArray.future
      this
    }

    case object AsArray {

      @transient lazy val future: FutureAction[Array[T]] = {
        var result: Array[T] = null

        val future = self.context.submitJob[T, Array[T], Array[T]](
          self,
          _.toArray,
          Seq(id), { (_, data) =>
            result = data
          },
          result
        )

        future
      }

      @transient lazy val now: Array[T] = future.get()
    }
  }

  implicit class RDDFunctions[T: ClassTag](self: RDD[T]) {

    import scala.concurrent.ExecutionContext.Implicits.global

    def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = {
      val executions = self.partitions.indices.map { ii =>
        PartitionExecution(self, ii)
      }

      val buffer = new ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity)

      Future {
        executions.foreach { exe =>
          buffer.put(Success(exe)) // may be blocking due to capacity
          exe.eager // non-blocking
        }
      }.onFailure {
        case e: Throwable =>
          buffer.put(Failure(e))
      }

      self.partitions.indices.toIterator.map { _ =>
        val exe = buffer.take().get
        exe.AsArray.now
      }
    }

    def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = {

      _toLocalIteratorPreemptively(capacity).flatten
    }
  }

  def timed[T](fn: => T): (T, Long) = {
    val startTime = System.currentTimeMillis()
    val result = fn
    val endTime = System.currentTimeMillis()
    (result, endTime - startTime)
  }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to