[
https://issues.apache.org/jira/browse/SPARK-30786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17034210#comment-17034210
]
Prakhar Jain commented on SPARK-30786:
--------------------------------------
I am working on this.
> Block replication is not retried on other BlockManagers when it fails on 1 of
> the peers
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-30786
> URL: https://issues.apache.org/jira/browse/SPARK-30786
> Project: Spark
> Issue Type: Bug
> Components: Block Manager
> Affects Versions: 2.3.4, 2.4.5, 3.0.0
> Reporter: Prakhar Jain
> Priority: Major
>
> When we cache an RDD with replication > 1, Firstly the RDD block is cached
> locally on one of the BlockManager and then it is replicated to
> (replication-1) number of BlockManagers. While replicating a block, if
> replication fails on one of the peers, it is supposed to retry the
> replication on some other peer (based on
> "spark.storage.maxReplicationFailures" config). But currently this doesn't
> happen because of some issue.
> Logs of 1 of the executor which is trying to replicate:
> {noformat}
> 20/02/10 09:01:47 INFO Executor: Starting executor ID 1 on host
> wn11-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net
> .
> .
> .
> 20/02/10 09:06:45 INFO Executor: Running task 244.0 in stage 3.0 (TID 550)
> 20/02/10 09:06:45 DEBUG BlockManager: Getting local block rdd_13_244
> 20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 was not found
> 20/02/10 09:06:45 DEBUG BlockManager: Getting remote block rdd_13_244
> 20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 not found
> 20/02/10 09:06:46 INFO MemoryStore: Block rdd_13_244 stored as values in
> memory (estimated size 33.3 MB, free 44.2 MB)
> 20/02/10 09:06:46 DEBUG BlockManager: Told master about block rdd_13_244
> 20/02/10 09:06:46 DEBUG BlockManager: Put block rdd_13_244 locally took 947
> ms
> 20/02/10 09:06:46 DEBUG BlockManager: Level for block rdd_13_244 is
> StorageLevel(memory, deserialized, 3 replicas)
> 20/02/10 09:06:46 TRACE BlockManager: Trying to replicate rdd_13_244 of
> 34908552 bytes to BlockManagerId(2,
> wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
> 20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes
> to BlockManagerId(2,
> wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
> in 205.849858 ms
> 20/02/10 09:06:47 TRACE BlockManager: Trying to replicate rdd_13_244 of
> 34908552 bytes to BlockManagerId(5,
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)
> 20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes
> to BlockManagerId(5,
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)
> in 180.501504 ms
> 20/02/10 09:06:47 DEBUG BlockManager: Replicating rdd_13_244 of 34908552
> bytes to 2 peer(s) took 387.381168 ms
> 20/02/10 09:06:47 DEBUG BlockManager: block rdd_13_244 replicated to
> BlockManagerId(5,
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None),
> BlockManagerId(2,
> wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None)
> 20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 remotely took 423
> ms
> 20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 with
> replication took 1371 ms
> 20/02/10 09:06:47 DEBUG BlockManager: Getting local block rdd_13_244
> 20/02/10 09:06:47 DEBUG BlockManager: Level for block rdd_13_244 is
> StorageLevel(memory, deserialized, 3 replicas)
> 20/02/10 09:06:47 INFO Executor: Finished task 244.0 in stage 3.0 (TID 550).
> 2253 bytes result sent to driver
> {noformat}
> Logs of other executor where the block is being replicated to:
> {noformat}
> 20/02/10 09:01:47 INFO Executor: Starting executor ID 5 on host
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net
> .
> .
> .
> 20/02/10 09:06:47 INFO MemoryStore: Will not store rdd_13_244
> 20/02/10 09:06:47 WARN MemoryStore: Not enough space to cache rdd_13_244 in
> memory! (computed 4.2 MB so far)
> 20/02/10 09:06:47 INFO MemoryStore: Memory use = 4.9 GB (blocks) + 7.3 MB
> (scratch space shared across 2 tasks(s)) = 4.9 GB. Storage limit = 4.9 GB.
> 20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 locally took 12 ms
> 20/02/10 09:06:47 WARN BlockManager: Block rdd_13_244 could not be removed as
> it was not found on disk or in memory
> 20/02/10 09:06:47 WARN BlockManager: Putting block rdd_13_244 failed
> 20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 without
> replication took 13 ms
> {noformat}
> Note here that the block replication failed in Executor-5 with log line "Not
> enough space to cache rdd_13_244 in memory!". But Executor-1 shows that block
> is successfully replicated to executor-5 - "Replicated rdd_13_244 of 34908552
> bytes to BlockManagerId(5,
> wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)"
> and so it never retries the replication on some other executor.
> Sample code:
> {noformat}
> sc.setLogLevel("INFO")
> def randomString(length: Int) = {
> val r = new scala.util.Random
> val sb = new StringBuilder
> for (i <- 1 to length) \{ sb.append(r.nextPrintableChar) }
> sb.toString
> }
>
> val df = sc.parallelize(1 to 300000, 300).map\{x => randomString(100000)}.toDF
> import org.apache.spark.storage.StorageLevel
> df.persist(StorageLevel(false, true, false, true, 3))
> df.count()
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]