[
https://issues.apache.org/jira/browse/SPARK-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15227493#comment-15227493
]
Eric Liang commented on SPARK-14252:
------------------------------------
I'm going to take a look at fixing this
> Executors do not try to download remote cached blocks
> -----------------------------------------------------
>
> Key: SPARK-14252
> URL: https://issues.apache.org/jira/browse/SPARK-14252
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.0
> Reporter: Marcelo Vanzin
>
> I noticed this when taking a look at the root cause of SPARK-14209. 2.0.0
> includes SPARK-12817, which changed the caching code a bit to remove
> duplication. But it seems to have removed the part where executors check
> whether other executors contain the needed cached block.
> In 1.6, that was done by the call to {{BlockManager.get}} in
> {{CacheManager.getOrCompute}}. But in the new version, {{RDD.iterator}} calls
> {{BlockManager.getOrElseUpdate}}, which never calls {{BlockManager.get}}, and
> thus the executor never gets block that are cached by other executors,
> causing the blocks to be instead recomputed locally.
> I wrote a small program that shows this. In 1.6, running with
> {{--num-executors 2}}, I get 5 blocks cached on each executor, and messages
> like these in the logs:
> {noformat}
> 16/03/29 13:18:01 DEBUG spark.CacheManager: Looking for partition rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting local block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Block rdd_0_7 not registered
> locally
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
> from BlockManagerId(1, blah, 58831)
> 1
> {noformat}
> On 2.0, I get (almost) all the 10 partitions cached on *both* executors,
> because once the second one fails to find a block locally it just recomputes
> it and caches it. It never tries to download the block from the other
> executor. The log messages above, which still exist in the code, don't show
> up anywhere.
> Here's the code I used for the above (trimmed of some other stuff from my
> little test harness, so might not compile as is):
> {code}
> val sc = new SparkContext(conf)
> try {
> val rdd = sc.parallelize(1 to 10000000, 10)
> rdd.cache()
> rdd.count()
> // Create a single task that will sleep and block, so that a particular
> executor is busy.
> // This should force future tasks to download cached data from that
> executor.
> println("Running sleep job..")
> val thread = new Thread(new Runnable() {
> override def run(): Unit = {
> rdd.mapPartitionsWithIndex { (i, iter) =>
> if (i == 0) {
> Thread.sleep(TimeUnit.MINUTES.toMillis(10))
> }
> iter
> }.count()
> }
> })
> thread.setDaemon(true)
> thread.start()
> // Wait a few seconds to make sure the task is running (too lazy for
> listeners)
> println("Waiting for tasks to start...")
> TimeUnit.SECONDS.sleep(10)
> // Now run a job that will touch everything and should use the cached
> data.
> val cnt = rdd.map(_*2).count()
> println(s"Counted $cnt elements.")
> println("Killing sleep job.")
> thread.interrupt()
> thread.join()
> } finally {
> sc.stop()
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]