[
https://issues.apache.org/jira/browse/SPARK-14252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15216886#comment-15216886
]
Marcelo Vanzin commented on SPARK-14252:
----------------------------------------
Here's a slightly updated piece of code where it's easier to show the problem:
{code}
val sc = new SparkContext(conf)
try {
val mapCount = sc.accumulator(0L)
val rdd = sc.parallelize(1 to 10000000, 10).map { i =>
mapCount += 1
i
}.cache()
rdd.count()
println(s"Map count after first count: $mapCount")
// Create a single task that will sleep and block, so that a particular
executor is busy.
// This should force future tasks to download cached data from that
executor.
println("Running sleep job..")
val thread = new Thread(new Runnable() {
override def run(): Unit = {
rdd.mapPartitionsWithIndex { (i, iter) =>
if (i == 0) {
Thread.sleep(TimeUnit.MINUTES.toMillis(10))
}
iter
}.count()
}
})
thread.setDaemon(true)
thread.start()
// Wait a few seconds to make sure the task is running (too lazy for
listeners)
println("Waiting for tasks to start...")
TimeUnit.SECONDS.sleep(10)
// Now run a job that will touch everything and should use the cached
data.
val cnt = rdd.map(_*2).count()
println(s"Counted $cnt elements.")
println("Killing sleep job.")
thread.interrupt()
thread.join()
println(s"Map count after all tasks finished: $mapCount")
} finally {
sc.stop()
}
{code}
On 1.6. I get:
{noformat}
Map count after first count: 10000000
Map count after all tasks finished: 10000000
{noformat}
On 2.0 I get:
{noformat}
Map count after first count: 10000000
Map count after all tasks finished: 15000000
{noformat}
> Executors do not try to download remote cached blocks
> -----------------------------------------------------
>
> Key: SPARK-14252
> URL: https://issues.apache.org/jira/browse/SPARK-14252
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.0.0
> Reporter: Marcelo Vanzin
>
> I noticed this when taking a look at the root cause of SPARK-14209. 2.0.0
> includes SPARK-12817, which changed the caching code a bit to remove
> duplication. But it seems to have removed the part where executors check
> whether other executors contain the needed cached block.
> In 1.6, that was done by the call to {{BlockManager.get}} in
> {{CacheManager.getOrCompute}}. But in the new version, {{RDD.iterator}} calls
> {{BlockManager.getOrElseUpdate}}, which never calls {{BlockManager.get}}, and
> thus the executor never gets block that are cached by other executors,
> causing the blocks to be instead recomputed locally.
> I wrote a small program that shows this. In 1.6, running with
> {{--num-executors 2}}, I get 5 blocks cached on each executor, and messages
> like these in the logs:
> {noformat}
> 16/03/29 13:18:01 DEBUG spark.CacheManager: Looking for partition rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting local block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Block rdd_0_7 not registered
> locally
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
> 16/03/29 13:18:01 DEBUG storage.BlockManager: Getting remote block rdd_0_7
> from BlockManagerId(1, blah, 58831)
> 1
> {noformat}
> On 2.0, I get (almost) all the 10 partitions cached on *both* executors,
> because once the second one fails to find a block locally it just recomputes
> it and caches it. It never tries to download the block from the other
> executor. The log messages above, which still exist in the code, don't show
> up anywhere.
> Here's the code I used for the above (trimmed of some other stuff from my
> little test harness, so might not compile as is):
> {code}
> val sc = new SparkContext(conf)
> try {
> val rdd = sc.parallelize(1 to 10000000, 10)
> rdd.cache()
> rdd.count()
> // Create a single task that will sleep and block, so that a particular
> executor is busy.
> // This should force future tasks to download cached data from that
> executor.
> println("Running sleep job..")
> val thread = new Thread(new Runnable() {
> override def run(): Unit = {
> rdd.mapPartitionsWithIndex { (i, iter) =>
> if (i == 0) {
> Thread.sleep(TimeUnit.MINUTES.toMillis(10))
> }
> iter
> }.count()
> }
> })
> thread.setDaemon(true)
> thread.start()
> // Wait a few seconds to make sure the task is running (too lazy for
> listeners)
> println("Waiting for tasks to start...")
> TimeUnit.SECONDS.sleep(10)
> // Now run a job that will touch everything and should use the cached
> data.
> val cnt = rdd.map(_*2).count()
> println(s"Counted $cnt elements.")
> println("Killing sleep job.")
> thread.interrupt()
> thread.join()
> } finally {
> sc.stop()
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]