[ https://issues.apache.org/jira/browse/SPARK-22579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16265021#comment-16265021 ]
Saisai Shao commented on SPARK-22579: ------------------------------------- I think this issue should have already been fixed by SPARK-22062 and PR(https://github.com/apache/spark/pull/19476), what you need to do is to set a proper size for large blocks. > BlockManager.getRemoteValues and BlockManager.getRemoteBytes should be > implemented using streaming > -------------------------------------------------------------------------------------------------- > > Key: SPARK-22579 > URL: https://issues.apache.org/jira/browse/SPARK-22579 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Spark Core > Affects Versions: 2.1.0 > Reporter: Eyal Farago > > when an RDD partition is cached on an executor bu the task requiring it is > running on another executor (process locality ANY), the cached partition is > fetched via BlockManager.getRemoteValues which delegates to > BlockManager.getRemoteBytes, both calls are blocking. > in my use case I had a 700GB RDD spread over 1000 partitions on a 6 nodes > cluster, cached to disk. rough math shows that average partition size is > 700MB. > looking at spark UI it was obvious that tasks running with process locality > 'ANY' are much slower than local tasks (~40 seconds to 8-10 minutes ratio), I > was able to capture thread dumps of executors executing remote tasks and got > this stake trace: > {quote}Thread ID Thread Name Thread State Thread Locks > 1521 Executor task launch worker-1000 WAITING > Lock(java.util.concurrent.ThreadPoolExecutor$Worker@196462978}) > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > scala.concurrent.Await$.result(package.scala:190) > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190) > org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:104) > org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:582) > org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:550) > org.apache.spark.storage.BlockManager.get(BlockManager.scala:638) > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:690) > org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > org.apache.spark.rdd.RDD.iterator(RDD.scala:287){quote} > digging into the code showed that the block manager first fetches all bytes > (getRemoteBytes) and then wraps it with a deserialization stream, this has > several draw backs: > 1. blocking, requesting executor is blocked while the remote executor is > serving the block. > 2. potentially large memory footprint on requesting executor, in my use case > a 700mb of raw bytes stored in a ChunkedByteBuffer. > 3. inefficient, requesting side usually don't need all values at once as it > consumes the values via an iterator. > 4. potentially large memory footprint on serving executor, in case the block > is cached in deserialized form the serving executor has to serialize it into > a ChunkedByteBuffer (BlockManager.doGetLocalBytes). this is both memory & CPU > intensive, memory footprint can be reduced by using a limited buffer for > serialization 'spilling' to the response stream. > I suggest improving this either by implementing full streaming mechanism or > some kind of pagination mechanism, in addition the requesting executor should > be able to make progress with the data it already has, blocking only when > local buffer is exhausted and remote side didn't deliver the next chunk of > the stream (or page in case of pagination) yet. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org