[
https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen updated SPARK-22579:
------------------------------
Issue Type: Improvement (was: Bug)
You have to read the data either way. I don't know the code but you'd have to
be sure it never needs to read the data a second time after it's loaded too.
> BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be
> implemented using streaming
> --------------------------------------------------------------------------------------------------
>
> Key: SPARK-22579
> URL: https://issues.apache.org/jira/browse/SPARK-22579
> Project: Spark
> Issue Type: Improvement
> Components: Block Manager, Spark Core
> Affects Versions: 2.1.0
> Reporter: Eyal Farago
>
> when an RDD partition is cached on an executor bu the task requiring it is
> running on another executor (process locality ANY), the cached partition is
> fetched via BlockManager.getRemoteValues which delegates to
> BlockManager.getRemoteBytes, both calls are blocking.
> in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes
> cluster, cached to disk. rough math shows that average partition size is
> 700MB.
> looking at spark UI it was obvious that tasks running with process locality
> 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I
> was able to capture thread dumps of executors executing remote tasks and got
> this stake trace:
> {quote}Thread ID Thread Name Thread State Thread Locks
> 1521 Executor task launch worker-1000 WAITING
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978})
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> scala.concurrent.Await$.result(package.scala:190)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104)
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582)
> org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550)
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:638)
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690)
> org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote}
> digging into the code showed that the block manager first fetches all bytes
> (getRemoteBytes) and then wraps it with a deserialization stream, this has
> several draw backs:
> 1. blocking, requesting executor is blocked while the remote executor is
> serving the block.
> 2. potentially large memory footprint on requesting executor, in my use case
> a 700mb of raw bytes stored in a ChunkedByteBuffer.
> 3. inefficient, requesting side usually don't need all values at once as it
> consumes the values via an iterator.
> 4. potentially large memory footprint on serving executor, in case the block
> is cached in deserialized form the serving executor has to serialize it into
> a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU
> intensive, memory footprint can be reduced by using a limited buffer for
> serialization 'spilling' to the response stream.
> I suggest improving this either by implementing full streaming mechanism or
> some kind of pagination mechanism, in addition the requesting executor should
> be able to make progress with the data it already has, blocking only when
> local buffer is exhausted and remote side didn't deliver the next chunk of
> the stream (or page in case of pagination) yet.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]