[ 
https://issues.apache.org/jira/browse/SPARK-5140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Corey J. Nolet updated SPARK-5140:
----------------------------------
    Description: 
Not sure if this would change too much of the internals to be included in the 
1.2.1 but it would be very helpful if it could be.

This ticket is from a discussion between myself and [~ilikerps]. Here's the 
result of some testing that [~ilikerps] did:


bq. I did some testing as well, and it turns out the "wait for other guy to 
finish caching" logic is on a per-task basis, and it only works on tasks that 
happen to be executing on the same machine. 

bq. Once a partition is cached, we will schedule tasks that touch that 
partition on that executor. The problem here, though, is that the cache is in 
progress, and so the tasks are still scheduled randomly (or with whatever 
locality the data source has), so tasks which end up on different machines will 
not see that the cache is already in progress.

{code}
Here was my test, by the way:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._

val rdd = sc.parallelize(0 until 8).map(i => { Thread.sleep(10000); i }).cache()
val futures = (0 until 4).map { _ => Future { rdd.count } }
Await.result(Future.sequence(futures), 120.second)
{code}

bq. Note that I run the future 4 times in parallel. I found that the first run 
has all tasks take 10 seconds. The second has about 50% of its tasks take 10 
seconds, and the rest just wait for the first stage to finish. The last two 
runs have no tasks that take 10 seconds; all wait for the first two stages to 
finish.
{code}


What we want is the ability to fire off a job and have the DAG figure out that 
two RDDs depend on the same parent so that when the children are scheduled 
concurrently, the first one to start will activate the parent and both will 
wait on the parent. When the parent is done, they will both be able to finish 
their work concurrently. We are trying to use this pattern by having the parent 
cache results.

  was:
Not sure if this would change too much of the internals to be included in the 
1.2.1 but it would be very helpful if it could be.

This ticket is from a discussion between myself and [~ilikerps]. Here's the 
result of some testing that [~ilikerps] did:

{code}

I did some testing as well, and it turns out the "wait for other guy to finish 
caching" logic is on a per-task basis, and it only works on tasks that happen 
to be executing on the same machine. 

Once a partition is cached, we will schedule tasks that touch that partition on 
that executor. The problem here, though, is that the cache is in progress, and 
so the tasks are still scheduled randomly (or with whatever locality the data 
source has), so tasks which end up on different machines will not see that the 
cache is already in progress.

Here was my test, by the way:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._

val rdd = sc.parallelize(0 until 8).map(i => { Thread.sleep(10000); i }).cache()
val futures = (0 until 4).map { _ => Future { rdd.count } }
Await.result(Future.sequence(futures), 120.second)

Note that I run the future 4 times in parallel. I found that the first run has 
all tasks take 10 seconds. The second has about 50% of its tasks take 10 
seconds, and the rest just wait for the first stage to finish. The last two 
runs have no tasks that take 10 seconds; all wait for the first two stages to 
finish.
{code}


What we want is the ability to fire off a job and have the DAG figure out that 
two RDDs depend on the same parent so that when the children are scheduled 
concurrently, the first one to start will activate the parent and both will 
wait on the parent. When the parent is done, they will both be able to finish 
their work concurrently. We are trying to use this pattern by having the parent 
cache results.


> Two RDDs which are scheduled concurrently should be able to wait on parent in 
> all cases
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-5140
>                 URL: https://issues.apache.org/jira/browse/SPARK-5140
>             Project: Spark
>          Issue Type: New Feature
>            Reporter: Corey J. Nolet
>              Labels: features
>             Fix For: 1.3.0, 1.2.1
>
>
> Not sure if this would change too much of the internals to be included in the 
> 1.2.1 but it would be very helpful if it could be.
> This ticket is from a discussion between myself and [~ilikerps]. Here's the 
> result of some testing that [~ilikerps] did:
> bq. I did some testing as well, and it turns out the "wait for other guy to 
> finish caching" logic is on a per-task basis, and it only works on tasks that 
> happen to be executing on the same machine. 
> bq. Once a partition is cached, we will schedule tasks that touch that 
> partition on that executor. The problem here, though, is that the cache is in 
> progress, and so the tasks are still scheduled randomly (or with whatever 
> locality the data source has), so tasks which end up on different machines 
> will not see that the cache is already in progress.
> {code}
> Here was my test, by the way:
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent._
> import scala.concurrent.duration._
> val rdd = sc.parallelize(0 until 8).map(i => { Thread.sleep(10000); i 
> }).cache()
> val futures = (0 until 4).map { _ => Future { rdd.count } }
> Await.result(Future.sequence(futures), 120.second)
> {code}
> bq. Note that I run the future 4 times in parallel. I found that the first 
> run has all tasks take 10 seconds. The second has about 50% of its tasks take 
> 10 seconds, and the rest just wait for the first stage to finish. The last 
> two runs have no tasks that take 10 seconds; all wait for the first two 
> stages to finish.
> {code}
> What we want is the ability to fire off a job and have the DAG figure out 
> that two RDDs depend on the same parent so that when the children are 
> scheduled concurrently, the first one to start will activate the parent and 
> both will wait on the parent. When the parent is done, they will both be able 
> to finish their work concurrently. We are trying to use this pattern by 
> having the parent cache results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to