[ 
https://issues.apache.org/jira/browse/SPARK-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216886#comment-15216886
 ] 

Marcelo Vanzin commented on SPARK-14252:
----------------------------------------

Here's a slightly updated piece of code where it's easier to show the problem:

{code}
    val sc = new SparkContext(conf)
    try {
      val mapCount = sc.accumulator(0L)

      val rdd = sc.parallelize(1 to 10000000, 10).map { i =>
        mapCount += 1
        i
      }.cache()
      rdd.count()

      println(s"Map count after first count: $mapCount")

      // Create a single task that will sleep and block, so that a particular 
executor is busy.
      // This should force future tasks to download cached data from that 
executor.
      println("Running sleep job..")
      val thread =  new Thread(new Runnable() {
        override def run(): Unit = {
          rdd.mapPartitionsWithIndex { (i, iter) =>
            if (i == 0) {
              Thread.sleep(TimeUnit.MINUTES.toMillis(10))
            }
            iter
          }.count()
        }
      })
      thread.setDaemon(true)
      thread.start()

      // Wait a few seconds to make sure the task is running (too lazy for 
listeners)
      println("Waiting for tasks to start...")
      TimeUnit.SECONDS.sleep(10)

      // Now run a job that will touch everything and should use the cached 
data.
      val cnt = rdd.map(_*2).count()
      println(s"Counted $cnt elements.")

      println("Killing sleep job.")
      thread.interrupt()
      thread.join()

      println(s"Map count after all tasks finished: $mapCount")
    } finally {
      sc.stop()
    }
{code}

On 1.6. I get:

{noformat}
Map count after first count: 10000000
Map count after all tasks finished: 10000000
{noformat}

On 2.0 I get:

{noformat}
Map count after first count: 10000000
Map count after all tasks finished: 15000000
{noformat}

> Executors do not try to download remote cached blocks
> -----------------------------------------------------
>
>                 Key: SPARK-14252
>                 URL: https://issues.apache.org/jira/browse/SPARK-14252
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Marcelo Vanzin
>
> I noticed this when taking a look at the root cause of SPARK-14209. 2.0.0 
> includes SPARK-12817, which changed the caching code a bit to remove 
> duplication. But it seems to have removed the part where executors check 
> whether other executors contain the needed cached block.
> In 1.6, that was done by the call to {{BlockManager.get}} in 
> {{CacheManager.getOrCompute}}. But in the new version, {{RDD.iterator}} calls 
> {{BlockManager.getOrElseUpdate}}, which never calls {{BlockManager.get}}, and 
> thus the executor never gets block that are cached by other executors, 
> causing the blocks to be instead recomputed locally.
> I wrote a small program that shows this. In 1.6, running with 
> {{--num-executors 2}}, I get 5 blocks cached on each executor, and messages 
> like these in the logs:
> {noformat}
> 16/03/29 13:18:01 DEBUG spark.CacheManager: Looking for partition rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting local block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Block rdd_0_7 not registered 
> locally
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7 
> from BlockManagerId(1, blah, 58831)
> 1
> {noformat}
> On 2.0, I get (almost) all the 10 partitions cached on *both* executors, 
> because once the second one fails to find a block locally it just recomputes 
> it and caches it. It never tries to download the block from the other 
> executor.  The log messages above, which still exist in the code, don't show 
> up anywhere.
> Here's the code I used for the above (trimmed of some other stuff from my 
> little test harness, so might not compile as is):
> {code}
>     val sc = new SparkContext(conf)
>     try {
>       val rdd = sc.parallelize(1 to 10000000, 10)
>       rdd.cache()
>       rdd.count()
>       // Create a single task that will sleep and block, so that a particular 
> executor is busy.
>       // This should force future tasks to download cached data from that 
> executor.
>       println("Running sleep job..")
>       val thread =  new Thread(new Runnable() {
>         override def run(): Unit = {
>           rdd.mapPartitionsWithIndex { (i, iter) =>
>             if (i == 0) {
>               Thread.sleep(TimeUnit.MINUTES.toMillis(10))
>             }
>             iter
>           }.count()
>         }
>       })
>       thread.setDaemon(true)
>       thread.start()
>       // Wait a few seconds to make sure the task is running (too lazy for 
> listeners)
>       println("Waiting for tasks to start...")
>       TimeUnit.SECONDS.sleep(10)
>       // Now run a job that will touch everything and should use the cached 
> data.
>       val cnt = rdd.map(_*2).count()
>       println(s"Counted $cnt elements.")
>       println("Killing sleep job.")
>       thread.interrupt()
>       thread.join()
>     } finally {
>       sc.stop()
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to