[ 
https://issues.apache.org/jira/browse/SPARK-11316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15264205#comment-15264205
 ] 

Thomas Graves commented on SPARK-11316:
---------------------------------------

Simple steps to reproduce an RDD with partial preferred locations, Any text 
file will do here:

val textFile = sc.textFile("randomtext2.txt")
val textFile2 = sc.textFile("README.md")
val wordCounts4 = textFile.flatMap(line => line.split(" ")).map(word => (word, 
1)).reduceByKey((a, b) => a + b)
val wordCounts5 = textFile2.flatMap(line => line.split(" ")).map(word => (word, 
1)).reduceByKey((a, b) => a + b)
sc.setCheckpointDir("hdfs:///user/tgraves/test10")
wordCounts5.checkpoint()
wordCounts5.take(1)
val urdd = wordCounts4.union(wordCounts5)
urdd.coalesce(10).count()

> coalesce doesn't handle UnionRDD with partial locality properly
> ---------------------------------------------------------------
>
>                 Key: SPARK-11316
>                 URL: https://issues.apache.org/jira/browse/SPARK-11316
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.1
>            Reporter: Thomas Graves
>            Assignee: Thomas Graves
>            Priority: Critical
>
> So I haven't fully debugged this yet but reporting what I'm seeing and think 
> might be going on.
> I have a graph processing job that is seeing huge slow down in setupGroups in 
> the location iterator where its getting the preferred locations for the 
> coalesce.  They are coalescing from 2400 down to 1200 and its taking 17+ 
> hours to do the calculation.  Killed it at this point so don't know total 
> time.
> It appears that the job is doing an isEmpty call, a bunch of other 
> transformation, then a coalesce (where it takes so long), other 
> transformations, then finally a count to trigger it.   
> It appears that there is only one node that its finding in the setupGroup 
> call and to get to that node it has to first to through the while loop:
>     while (numCreated < targetLen && tries < expectedCoupons2) {
> where expectedCoupons2 is around 19000.  It finds very few or none in this 
> loop.  
> Then it does the second loop:
> while (numCreated < targetLen) {  // if we don't have enough partition 
> groups, create duplicates
>       var (nxt_replica, nxt_part) = rotIt.next()
>       val pgroup = PartitionGroup(nxt_replica)
>       groupArr += pgroup
>       groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
>       var tries = 0
>       while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // 
> ensure at least one part
>         nxt_part = rotIt.next()._2
>         tries += 1
>       }
>       numCreated += 1
>     }
> Where it has an inner while loop and both of those are going 1200 times.  
> 1200*1200 loops.  This is taking a very long time.
> The user can work around the issue by adding in a count() call very close to 
> after the isEmpty call before the coalesce is called.  I also tried putting 
> in a take(10000)  right before the isEmpty call and it seems to work around 
> the issue, took 1 hours with the take vs a few minutes with the count().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to