[ 
https://issues.apache.org/jira/browse/SPARK-3498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14131256#comment-14131256
 ] 

Reynold Xin commented on SPARK-3498:
------------------------------------

cc [~tdas]

> Block always replicated to the same node
> ----------------------------------------
>
>                 Key: SPARK-3498
>                 URL: https://issues.apache.org/jira/browse/SPARK-3498
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.0.2
>            Reporter: shenhong
>
> When running a spark streaming job, we should replicate receiver blocks, but 
> all the blocks replicated to the  same node. Here is the log.
> 14/09/10 19:55:16 INFO BlockManagerInfo: Added input-0-1410350117000 in 
> memory on 10.196.131.19:42261 (size: 8.9 MB, free: 1050.3 MB)
> 14/09/10 19:55:16 INFO BlockManagerInfo: Added input-0-1410350117000 in 
> memory on tdw-10-196-130-155:51155 (size: 8.9 MB, free: 879.3 MB)
> 14/09/10 19:55:17 INFO BlockManagerInfo: Added input-0-1410350118000 in 
> memory on 10.196.131.19:42261 (size: 7.7 MB, free: 1042.6 MB)
> 14/09/10 19:55:17 INFO BlockManagerInfo: Added input-0-1410350118000 in 
> memory on tdw-10-196-130-155:51155 (size: 7.7 MB, free: 871.6 MB)
> 14/09/10 19:55:18 INFO BlockManagerInfo: Added input-0-1410350119000 in 
> memory on 10.196.131.19:42261 (size: 7.3 MB, free: 1035.3 MB)
> 14/09/10 19:55:18 INFO BlockManagerInfo: Added input-0-1410350119000 in 
> memory on tdw-10-196-130-155:51155 (size: 7.3 MB, free: 864.3 MB)
> The reason is when blockManagerSlave ask blockManagerMaster for a 
> blockManagerId, blockManagerMaster  always return the same blockManagerId.  
> Here is the code:
> private def getPeers(blockManagerId: BlockManagerId, size: Int): 
> Seq[BlockManagerId] = {
>     val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
>     val selfIndex = peers.indexOf(blockManagerId)
>     if (selfIndex == -1) {
>       throw new SparkException("Self index for " + blockManagerId + " not 
> found")
>     }
>     // Note that this logic will select the same node multiple times if there 
> aren't enough peers
>     Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % 
> peers.length) }.toSeq
>   }
> I think the blockManagerMaster should return the size of  blockManagerId with 
> more remain memory .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to